Source code for ppanggolin.formats.writeBinaries

#!/usr/bin/env python3
#coding:utf-8

#default libraries
import logging
from collections import Counter, defaultdict
import statistics
import pkg_resources

#installed libraries
from tqdm import tqdm
import tables

[docs]def geneDesc(orgLen, contigLen, IDLen, typeLen, nameLen, productLen, maxLocalId): return { 'organism':tables.StringCol(itemsize=orgLen), "contig":{ 'name':tables.StringCol(itemsize=contigLen), "is_circular":tables.BoolCol(dflt = False) }, "gene":{ 'ID':tables.StringCol(itemsize=IDLen), 'start':tables.UInt64Col(), 'stop':tables.UInt64Col(), 'strand':tables.StringCol(itemsize=1), 'type':tables.StringCol(itemsize=typeLen), 'position':tables.UInt32Col(), 'name':tables.StringCol(itemsize=nameLen), 'product':tables.StringCol(itemsize=productLen), 'genetic_code':tables.UInt32Col(dflt = 11), 'is_fragment':tables.BoolCol(dflt = False), 'local':tables.StringCol(itemsize=maxLocalId) } }
[docs]def getMaxLenAnnotations(pangenome): maxOrgLen = 1 maxContigLen = 1 maxGeneIDLen = 1 maxNameLen = 1 maxProductLen = 1 maxTypeLen = 1 maxLocalId = 1 for org in pangenome.organisms: if len(org.name) > maxOrgLen: maxOrgLen = len(org.name) for contig in org.contigs: if len(contig.name) > maxContigLen: maxContigLen = len(contig.name) for gene in contig.genes: if len(gene.ID) > maxGeneIDLen: maxGeneIDLen = len(gene.ID) if len(gene.name) > maxNameLen: maxNameLen = len(gene.name) if len(gene.product) > maxProductLen: maxProductLen = len(gene.product) if len(gene.type) > maxTypeLen: maxTypeLen = len(gene.type) if len(gene.local_identifier) > maxLocalId: maxLocalId = len(gene.local_identifier) for gene in contig.RNAs: if len(gene.ID) > maxGeneIDLen: maxGeneIDLen = len(gene.ID) if len(gene.name) > maxNameLen: maxNameLen = len(gene.name) if len(gene.product) > maxProductLen: maxProductLen = len(gene.product) if len(gene.type) > maxTypeLen: maxTypeLen = len(gene.type) if len(gene.local_identifier) > maxLocalId: maxLocalId = len(gene.local_identifier) return maxOrgLen, maxContigLen, maxGeneIDLen, maxTypeLen, maxNameLen, maxProductLen, maxLocalId
[docs]def writeAnnotations(pangenome, h5f): """ Function writing all of the pangenome's annotations """ annotation = h5f.create_group("/","annotations","Annotations of the pangenome's organisms") geneTable = h5f.create_table(annotation, "genes", geneDesc(*getMaxLenAnnotations(pangenome)), expectedrows=len(pangenome.genes)) nbRNA = 0 for org in pangenome.organisms: for contig in org.contigs: nbRNA += len(contig.RNAs) bar = tqdm(pangenome.organisms, unit="genome") geneRow = geneTable.row for org in bar: for contig in org.contigs: for gene in contig.genes: geneRow["organism"] = org.name geneRow["contig/name"] = contig.name geneRow["contig/is_circular"] = contig.is_circular#this should be somewhere else. geneRow["gene/ID"]= gene.ID geneRow["gene/start"] = gene.start geneRow["gene/stop"] = gene.stop geneRow["gene/strand"] = gene.strand geneRow["gene/type"] = gene.type geneRow["gene/position"] = gene.position geneRow["gene/name"] = gene.name geneRow["gene/product"] = gene.product geneRow["gene/is_fragment"] = gene.is_fragment geneRow["gene/genetic_code"] = gene.genetic_code geneRow["gene/local"] = gene.local_identifier geneRow.append() for rna in contig.RNAs: geneRow["organism"] = org.name geneRow["contig/name"] = contig.name geneRow["contig/is_circular"] = contig.is_circular#this should be somewhere else. geneRow["gene/ID"]= rna.ID geneRow["gene/start"] = rna.start geneRow["gene/stop"] = rna.stop geneRow["gene/strand"] = rna.strand geneRow["gene/type"] = rna.type geneRow["gene/name"] = rna.name geneRow["gene/product"] = rna.product geneRow["gene/is_fragment"] = rna.is_fragment geneRow.append() geneTable.flush() bar.close()
[docs]def getGeneSequencesLen(pangenome): maxSeqLen = 1 maxGeneIDLen = 1 maxGeneType = 1 for gene in pangenome.genes: if len(gene.dna) > maxSeqLen: maxSeqLen = len(gene.dna) if len(gene.ID) > maxGeneIDLen: maxGeneIDLen = len(gene.ID) if len(gene.type) > maxGeneType: maxGeneType = len(gene.type) return maxGeneIDLen, maxSeqLen, maxGeneType
[docs]def geneSequenceDesc(geneIDLen, geneSeqLen, geneTypeLen): return { "gene":tables.StringCol(itemsize=geneIDLen), "dna":tables.StringCol(itemsize=geneSeqLen), "type":tables.StringCol(itemsize=geneTypeLen) }
[docs]def writeGeneSequences(pangenome, h5f): geneSeq = h5f.create_table("/","geneSequences", geneSequenceDesc(*getGeneSequencesLen(pangenome)), expectedrows=len(pangenome.genes)) geneRow = geneSeq.row bar = tqdm(pangenome.genes, unit = "gene") for gene in bar: geneRow["gene"] = gene.ID geneRow["dna"] = gene.dna geneRow["type"] = gene.type geneRow.append() geneSeq.flush() bar.close()
[docs]def geneFamDesc(maxNameLen, maxSequenceLength, maxPartLen): return { "name": tables.StringCol(itemsize = maxNameLen), "protein": tables.StringCol(itemsize=maxSequenceLength), "partition": tables.StringCol(itemsize=maxPartLen) }
[docs]def getGeneFamLen(pangenome): maxGeneFamNameLen = 1 maxGeneFamSeqLen = 1 maxPartLen = 1 for genefam in pangenome.geneFamilies: if len(genefam.sequence) > maxGeneFamSeqLen: maxGeneFamSeqLen = len(genefam.sequence) if len(genefam.name) > maxGeneFamNameLen: maxGeneFamNameLen = len(genefam.name) if len(genefam.partition) > maxPartLen: maxPartLen = len(genefam.partition) return maxGeneFamNameLen, maxGeneFamSeqLen, maxPartLen
[docs]def writeGeneFamInfo(pangenome, h5f, force): """ Writing a table containing the protein sequences of each family """ if '/geneFamiliesInfo' in h5f and force is True: logging.getLogger().info("Erasing the formerly computed gene family representative sequences...") h5f.remove_node('/', 'geneFamiliesInfo')#erasing the table, and rewriting a new one. geneFamSeq = h5f.create_table("/","geneFamiliesInfo",geneFamDesc(*getGeneFamLen(pangenome)), expectedrows=len(pangenome.geneFamilies)) row = geneFamSeq.row bar = tqdm( pangenome.geneFamilies, unit = "gene family") for fam in bar: row["name"] = fam.name row["protein"] = fam.sequence row["partition"] = fam.partition row.append() geneFamSeq.flush() bar.close()
[docs]def gene2famDesc(geneFamNameLen, geneIDLen): return { "geneFam": tables.StringCol(itemsize = geneFamNameLen), "gene":tables.StringCol(itemsize= geneIDLen) }
[docs]def getGene2famLen(pangenome): maxGeneFamName = 1 maxGeneID = 1 for geneFam in pangenome.geneFamilies: if len(geneFam.name)>maxGeneFamName: maxGeneFamName = len(geneFam.name) for gene in geneFam.genes: if len(gene.ID) > maxGeneID: maxGeneID = len(gene.ID) return maxGeneFamName, maxGeneID
[docs]def writeGeneFamilies(pangenome, h5f, force): """ Function writing all of the pangenome's gene families """ if '/geneFamilies' in h5f and force is True: logging.getLogger().info("Erasing the formerly computed gene family to gene associations...") h5f.remove_node('/', 'geneFamilies')#erasing the table, and rewriting a new one. geneFamilies = h5f.create_table("/", "geneFamilies",gene2famDesc(*getGene2famLen(pangenome))) geneRow = geneFamilies.row bar = tqdm(pangenome.geneFamilies, unit = "gene family") for geneFam in bar: for gene in geneFam.genes: geneRow["gene"] = gene.ID geneRow["geneFam"] = geneFam.name geneRow.append() geneFamilies.flush() bar.close()
[docs]def graphDesc(maxGeneIDLen): return { 'geneTarget':tables.StringCol(itemsize = maxGeneIDLen), 'geneSource':tables.StringCol(itemsize = maxGeneIDLen) }
[docs]def getGeneIDLen(pangenome): maxGeneLen = 1 for gene in pangenome.genes: if len(gene.ID) > maxGeneLen: maxGeneLen=len(gene.ID) return maxGeneLen
[docs]def writeGraph(pangenome, h5f, force): #if we want to be able to read the graph without reading the annotations (because it is one of the most time consumming parts to read), it might be good to add the organism name in the table here. #for now, forcing the read of annotations. if '/edges' in h5f and force is True: logging.getLogger().info("Erasing the formerly computed edges") h5f.remove_node("/","edges") edgeTable = h5f.create_table("/","edges", graphDesc(getGeneIDLen(pangenome)), expectedrows=len(pangenome.edges)) edgeRow = edgeTable.row bar = tqdm(pangenome.edges, unit = "edge") for edge in bar: for genePairs in edge.organisms.values(): for gene1, gene2 in genePairs: edgeRow["geneTarget"] = gene1.ID edgeRow["geneSource"] = gene2.ID edgeRow.append() bar.close() edgeTable.flush()
[docs]def RGPDesc(maxRGPLen, maxGeneLen): return { 'RGP': tables.StringCol(itemsize=maxRGPLen), 'gene':tables.StringCol(itemsize=maxGeneLen) }
[docs]def getRGPLen(pangenome): maxGeneLen = 1 maxRGPLen = 1 for region in pangenome.regions: for gene in region.genes: if len(gene.ID) > maxGeneLen: maxGeneLen = len(gene.ID) if len(region.name) > maxRGPLen: maxRGPLen = len(region.name) return maxRGPLen, maxGeneLen
[docs]def writeRGP(pangenome, h5f, force): if '/RGP' in h5f and force is True: logging.getLogger().info("Erasing the formerly computer RGP") h5f.remove_node('/', 'RGP') RGPTable = h5f.create_table('/', 'RGP', RGPDesc(*getRGPLen(pangenome)), expectedrows = sum([ len(region.genes) for region in pangenome.regions ]) ) RGPRow = RGPTable.row bar = tqdm(pangenome.regions, unit="region") for region in bar: for gene in region.genes: RGPRow["RGP"] = region.name RGPRow["gene"] = gene.ID RGPRow.append() bar.close() RGPTable.flush()
[docs]def spotDesc(maxRGPLen): return { 'spot': tables.UInt32Col(), 'RGP':tables.StringCol(itemsize=maxRGPLen) }
[docs]def getSpotDesc(pangenome): maxRGPLen = 1 for spot in pangenome.spots: for region in spot.regions: if len(region.name) > maxRGPLen: maxRGPLen = len(region.name) return maxRGPLen
[docs]def writeSpots(pangenome, h5f, force): if '/spots' in h5f and force is True: logging.getLogger().info("Erasing the formerly computed spots") h5f.remove_node("/","spots") SpoTable = h5f.create_table("/","spots",spotDesc(getSpotDesc(pangenome)), expectedrows= sum([len(spot.regions) for spot in pangenome.spots])) SpotRow = SpoTable.row bar = tqdm(pangenome.spots, unit="spot") for spot in pangenome.spots: for region in spot.regions: SpotRow["spot"] = spot.ID SpotRow["RGP"] = region.name SpotRow.append() bar.update() bar.close() SpoTable.flush()
[docs]def writeStatus(pangenome, h5f): if "/status" in h5f:#if statuses are already written statusGroup = h5f.root.status else:#else create the status group. statusGroup = h5f.create_group("/","status","Statuses of the pangenome's content") statusGroup._v_attrs.genomesAnnotated = True if pangenome.status["genomesAnnotated"] in ["Computed","Loaded","inFile"] else False statusGroup._v_attrs.geneSequences = True if pangenome.status["geneSequences"] in ["Computed","Loaded","inFile"] else False statusGroup._v_attrs.genesClustered = True if pangenome.status["genesClustered"] in ["Computed","Loaded","inFile"] else False statusGroup._v_attrs.geneFamilySequences = True if pangenome.status["geneFamilySequences"] in ["Computed","Loaded","inFile"] else False statusGroup._v_attrs.NeighborsGraph = True if pangenome.status["neighborsGraph"] in ["Computed","Loaded","inFile"] else False statusGroup._v_attrs.Partitionned = True if pangenome.status["partitionned"] in ["Computed","Loaded","inFile"] else False statusGroup._v_attrs.defragmented = True if pangenome.status["defragmented"] in ["Computed","Loaded","inFile"] else False statusGroup._v_attrs.predictedRGP = True if pangenome.status["predictedRGP"] in ["Computed","Loaded","inFile"] else False statusGroup._v_attrs.spots = True if pangenome.status["spots"] in ["Computed","Loaded","inFile"] else False statusGroup._v_attrs.version = pkg_resources.get_distribution("ppanggolin").version
[docs]def writeInfo(pangenome, h5f): """ writes information and numbers to be eventually called with the 'info' submodule """ if "/info" in h5f: infoGroup = h5f.root.info else: infoGroup = h5f.create_group("/","info","Informations about the pangenome's content") if pangenome.status["genomesAnnotated"] in ["Computed","Loaded"]: infoGroup._v_attrs.numberOfGenes = len(pangenome.genes) infoGroup._v_attrs.numberOfOrganisms = len(pangenome.organisms) if pangenome.status["genesClustered"] in ["Computed","Loaded"]: infoGroup._v_attrs.numberOfClusters = len(pangenome.geneFamilies) if pangenome.status["neighborsGraph"] in ["Computed","Loaded"]: infoGroup._v_attrs.numberOfEdges = len(pangenome.edges) if pangenome.status["partitionned"] in ["Computed","Loaded"]: namedPartCounter = Counter() subpartCounter = Counter() partDistribs = defaultdict(list) partSet = set() for fam in pangenome.geneFamilies: namedPartCounter[fam.namedPartition] +=1 partDistribs[fam.namedPartition].append(len(fam.organisms) / len(pangenome.organisms)) if fam.namedPartition == "shell": subpartCounter[fam.partition] +=1 if fam.partition != "S_": partSet.add(fam.partition) def getmean(arg): if len(arg) == 0: return 0 else: return round(statistics.mean(arg),2) def getstdev(arg): if len(arg) == 0: return 0 else: return round(statistics.stdev(arg),2) def getmax(arg): if len(arg) == 0: return 0 else: return round(max(arg),2) def getmin(arg): if len(arg) == 0: return 0 else: return round(min(arg),2) infoGroup._v_attrs.numberOfPersistent = namedPartCounter["persistent"] infoGroup._v_attrs.persistentStats = {"min":getmin(partDistribs["persistent"]), "max":getmax(partDistribs["persistent"]),"sd":getstdev(partDistribs["persistent"]), "mean":getmean(partDistribs["persistent"])} infoGroup._v_attrs.numberOfShell = namedPartCounter["shell"] infoGroup._v_attrs.shellStats = {"min":getmin(partDistribs["shell"]), "max":getmax(partDistribs["shell"]),"sd":getstdev(partDistribs["shell"]), "mean":getmean(partDistribs["shell"])} infoGroup._v_attrs.numberOfCloud = namedPartCounter["cloud"] infoGroup._v_attrs.cloudStats = {"min":getmin(partDistribs["cloud"]), "max":getmax(partDistribs["cloud"]),"sd":getstdev(partDistribs["cloud"]), "mean":getmean(partDistribs["cloud"])} infoGroup._v_attrs.numberOfPartitions = len(partSet) infoGroup._v_attrs.numberOfSubpartitions = subpartCounter if pangenome.status["predictedRGP"] in ["Computed", "Loaded"]: infoGroup._v_attrs.numberOfRGP = len(pangenome.regions) if pangenome.status["spots"] in ["Computed", "Loaded"]: infoGroup._v_attrs.numberOfSpots = len(pangenome.spots) infoGroup._v_attrs.parameters = pangenome.parameters#saving the pangenome parameters
[docs]def updateGeneFamPartition(pangenome, h5f): logging.getLogger().info("Updating gene families with partition information") table = h5f.root.geneFamiliesInfo bar = tqdm(range(table.nrows), unit = "gene family") for row in table: row["partition"] = pangenome.getGeneFamily(row["name"].decode()).partition row.update() bar.update() bar.close()
[docs]def updateGeneFragments(pangenome, h5f): """ updates the annotation table with the fragmentation informations from the defrag pipeline """ logging.getLogger().info("Updating annotations with fragment information") table = h5f.root.annotations.genes row = table.row bar = tqdm(range(table.nrows), unit="gene") for row in table: if row['gene/type'].decode() == 'CDS': row['gene/is_fragment'] = pangenome.getGene(row['gene/ID'].decode()).is_fragment row.update() bar.update() bar.close() table.flush()
[docs]def ErasePangenome(pangenome, graph=False, geneFamilies = False, partition = False, rgp = False, spots = False): """ erases tables from a pangenome .h5 file """ compressionFilter = tables.Filters(complevel=1, complib='blosc:lz4') h5f = tables.open_file(pangenome.file,"a", filters=compressionFilter) statusGroup = h5f.root.status if '/edges' in h5f and (graph or geneFamilies): logging.getLogger().info("Erasing the formerly computed edges") h5f.remove_node("/","edges") statusGroup._v_attrs.NeighborsGraph = False pangenome.status["neighborsGraph"] = "No" if '/geneFamilies' in h5f and geneFamilies: logging.getLogger().info("Erasing the formerly computed gene family to gene associations...") h5f.remove_node('/', 'geneFamilies')#erasing the table, and rewriting a new one. pangenome.status["defragmented"] = "No" pangenome.status["genesClustered"] = "No" statusGroup._v_attrs.defragmented = False statusGroup._v_attrs.genesClustered = False if '/geneFamiliesInfo' in h5f and geneFamilies: logging.getLogger().info("Erasing the formerly computed gene family representative sequences...") h5f.remove_node('/', 'geneFamiliesInfo')#erasing the table, and rewriting a new one. pangenome.status["partitionned"] = "No" pangenome.status["geneFamilySequences"] = "No" statusGroup._v_attrs.geneFamilySequences = False statusGroup._v_attrs.Partitionned = False if '/RGP' in h5f and (geneFamilies or partition or rgp): logging.getLogger().info("Erasing the formerly computer RGP...") pangenome.status["predictedRGP"] = "No" statusGroup._v_attrs.predictedRGP = False h5f.remove_node("/", "RGP") if '/spots' in h5f and (geneFamilies or partition or rgp or spots): logging.getLogger().info("Erasing the formerly computed spots...") pangenome.status["spots"] = "No" statusGroup._v_attrs.spots = False h5f.remove_node("/","spots") h5f.close()
[docs]def writePangenome(pangenome, filename, force): """ Writes or updates a pangenome file pangenome is the corresponding pangenome object, filename the h5 file and status what has been modified. """ compressionFilter = tables.Filters(complevel=1, complib='blosc:lz4')#test the other compressors from blosc, this one was arbitrarily chosen. if pangenome.status["genomesAnnotated"] == "Computed": h5f = tables.open_file(filename,"w", filters=compressionFilter) logging.getLogger().info("Writing genome annotations...") writeAnnotations(pangenome, h5f) pangenome.status["genomesAnnotated"] = "Loaded" h5f.close() elif pangenome.status["genomesAnnotated"] in ["Loaded", "inFile"]: pass else:#if the pangenome is not Computed not Loaded, it's probably not really in a good state ( or something new was coded). raise NotImplementedError("Something REALLY unexpected and unplanned for happened here. Please post an issue on github with what you did to reach this error.") #from there, appending to existing file. h5f = tables.open_file(filename,"a", filters=compressionFilter) if pangenome.status["geneSequences"] == "Computed": logging.getLogger().info("writing the protein coding gene dna sequences") writeGeneSequences(pangenome, h5f) pangenome.status["geneSequences"] = "Loaded" if pangenome.status["genesClustered"] == "Computed": logging.getLogger().info("Writing gene families and gene associations...") writeGeneFamilies(pangenome, h5f, force) logging.getLogger().info("Writing gene families information...") writeGeneFamInfo(pangenome, h5f, force) if pangenome.status["genomesAnnotated"] in ["Loaded", "inFile"] and pangenome.status["defragmented"] == "Computed": #if the annotations have not been computed in this run, and there has been a clustering with defragmentation, then the annotations can be updated updateGeneFragments(pangenome,h5f) pangenome.status["genesClustered"] = "Loaded" if pangenome.status["neighborsGraph"] == "Computed": logging.getLogger().info("Writing the edges...") writeGraph(pangenome, h5f, force) pangenome.status["neighborsGraph"] = "Loaded" if pangenome.status["partitionned"] == "Computed" and pangenome.status["genesClustered"] in ["Loaded","inFile"]:#otherwise it's been written already. updateGeneFamPartition(pangenome, h5f) pangenome.status["partitionned"] = "Loaded" if pangenome.status['predictedRGP'] == "Computed": logging.getLogger().info("Writing Regions of Genomic Plasticity...") writeRGP(pangenome, h5f, force) pangenome.status['predictedRGP'] = "Loaded" if pangenome.status["spots"] == "Computed": logging.getLogger().info("Writing Spots of Insertion...") writeSpots(pangenome, h5f, force) pangenome.status['spots'] = "Loaded" writeStatus(pangenome, h5f) writeInfo(pangenome, h5f) h5f.close() logging.getLogger().info(f"Done writing the pangenome. It is in file : {filename}")