Source code for ppanggolin.formats.readBinaries

#!/usr/bin/env python3
#coding:utf-8

#default libraries
import logging
import sys

#installed libraries
from tqdm import tqdm
import tables

#local libraries
from ppanggolin.genome import Organism, Gene, RNA
from ppanggolin.region import Spot

[docs]def getNumberOfOrganisms(pangenome): """ standalone function to get the number of organisms in a pangenome""" if hasattr(pangenome,"file"): filename = pangenome.file else: raise FileNotFoundError("The provided pangenome does not have an associated .h5 file") h5f = tables.open_file(filename,"r") annotations = h5f.root.annotations table = annotations.genes orgSet = set() for org in read_chunks(table, column = "organism"): orgSet.add(org) h5f.close() return len(orgSet)
[docs]def getStatus(pangenome, pangenomeFile): """ Checks which elements are already present in the file. """ h5f = tables.open_file(pangenomeFile,"r") logging.getLogger().info("Getting the current pangenome's status") statusGroup = h5f.root.status if statusGroup._v_attrs.genomesAnnotated: pangenome.status["genomesAnnotated"] = "inFile" if statusGroup._v_attrs.genesClustered: pangenome.status["genesClustered"] = "inFile" if statusGroup._v_attrs.geneSequences: pangenome.status["geneSequences"] = "inFile" if statusGroup._v_attrs.geneFamilySequences: pangenome.status["geneFamilySequences"] = "inFile" if statusGroup._v_attrs.NeighborsGraph: pangenome.status["neighborsGraph"] = "inFile" if statusGroup._v_attrs.Partitionned: pangenome.status["partitionned"] = "inFile" if hasattr(statusGroup._v_attrs, "predictedRGP"): if statusGroup._v_attrs.predictedRGP: pangenome.status["predictedRGP"] = "inFile" if hasattr(statusGroup._v_attrs, "spots"): if statusGroup._v_attrs.spots: pangenome.status["spots"] = "inFile" if "/info" in h5f: infoGroup = h5f.root.info pangenome.parameters = infoGroup._v_attrs.parameters h5f.close()
[docs]def read_chunks(table, column = None, chunk=10000): """ Reading entirely the provided table (or column if specified) chunk per chunk to limit RAM usage. """ for i in range(0, table.nrows, chunk): for row in table.read(start = i, stop = i + chunk, field = column): yield row
[docs]def getGeneSequencesFromFile(pangenome, fileObj, list_CDS=None): """ Writes the CDS sequences of the Pangenome object to a tmpFile object that can by filtered or not by a list of CDS Loads the sequences from a .h5 pangenome file """ logging.getLogger().info("Extracting and writing all of the CDS sequences from a .h5 pangenome file to a fasta file") h5f = tables.open_file(pangenome.file,"r", driver_core_backing_store=0) table = h5f.root.geneSequences bar = tqdm(range(table.nrows), unit="gene") list_CDS=set(list_CDS) if list_CDS is not None else None for row in read_chunks(table, chunk = 20000):#reading the table chunk per chunk otherwise RAM dies on big pangenomes nameCDS = row["gene"].decode() if row["type"] == b"CDS" and (list_CDS is None or nameCDS in list_CDS): fileObj.write('>' + nameCDS + "\n") fileObj.write(row["dna"].decode() + "\n") bar.update() fileObj.flush() bar.close() h5f.close()
[docs]def launchReadOrganism(args): return readOrganism(*args)
[docs]def readOrganism(pangenome, orgName, contigDict, circularContigs, link = False): org = Organism(orgName) for contigName, geneList in contigDict.items(): contig = org.getOrAddContig(contigName, is_circular=circularContigs[contigName]) for row in geneList: if link:#if the gene families are already computed/loaded the gene exists. gene = pangenome.getGene(row["ID"].decode()) else:#else creating the gene. gene_type = row["type"].decode() if gene_type == "CDS": gene = Gene(row["ID"].decode()) elif "RNA" in gene_type: gene = RNA(row["ID"].decode()) try: local = row["local"].decode() except ValueError: local = "" gene.fill_annotations( start = row["start"], stop = row["stop"], strand = row["strand"].decode(), geneType = row["type"].decode(), position = row["position"], genetic_code= row["genetic_code"], name = row["name"].decode(), product = row["product"].decode(), local_identifier=local) gene.is_fragment = row["is_fragment"] gene.fill_parents(org, contig) if gene_type == "CDS": contig.addGene(gene) elif "RNA" in gene_type: contig.addRNA(gene) else: raise Exception(f"A strange type '{gene_type}', which we do not know what to do with, was met.") pangenome.addOrganism(org)
[docs]def readGraph(pangenome, h5f): table = h5f.root.edges if not pangenome.status["genomesAnnotated"] in ["Computed","Loaded"] or not pangenome.status["genesClustered"] in ["Computed","Loaded"] : raise Exception("It's not possible to read the graph if the annotations and the gene families have not been loaded.") bar = tqdm(range(table.nrows), unit = "contig adjacency") for row in read_chunks(table): source = pangenome.getGene(row["geneSource"].decode()) target = pangenome.getGene(row["geneTarget"].decode()) pangenome.addEdge(source, target) bar.update() bar.close() pangenome.status["neighborsGraph"] = "Loaded"
[docs]def readGeneFamilies(pangenome, h5f): table = h5f.root.geneFamilies link = True if pangenome.status["genomesAnnotated"] in ["Computed", "Loaded"] else False bar = tqdm(range(table.nrows), unit = "gene") for row in read_chunks(table): fam = pangenome.addGeneFamily(row["geneFam"].decode()) if link:#linking if we have loaded the annotations geneObj = pangenome.getGene(row["gene"].decode()) else:#else, no geneObj = Gene(row["gene"].decode()) fam.addGene(geneObj) bar.update() bar.close() pangenome.status["genesClustered"] = "Loaded"
[docs]def readGeneFamiliesInfo(pangenome, h5f): table = h5f.root.geneFamiliesInfo bar = tqdm(range(table.nrows), unit = "gene family") for row in read_chunks(table): fam = pangenome.addGeneFamily(row["name"].decode()) fam.addPartition(row["partition"].decode()) fam.addSequence(row["protein"].decode()) bar.update() bar.close() if h5f.root.status._v_attrs.Partitionned: pangenome.status["partitionned"] = "Loaded" if h5f.root.status._v_attrs.geneFamilySequences: pangenome.status["geneFamilySequences"] = "Loaded"
[docs]def readRGP(pangenome, h5f): table = h5f.root.RGP bar = tqdm(range(table.nrows), unit = "gene") for row in read_chunks(table): region = pangenome.getOrAddRegion(row["RGP"].decode()) region.append(pangenome.getGene(row["gene"].decode())) bar.update() bar.close() #order the genes properly in the regions for region in pangenome.regions: region.genes = sorted(region.genes, key = lambda x : x.position )#order the same way than on the contig pangenome.status["predictedRGP"] = "Loaded"
[docs]def readSpots(pangenome, h5f): table = h5f.root.spots bar = tqdm(range(table.nrows), unit= "region") spots = {} for row in read_chunks(table): curr_spot = spots.get(row["spot"]) if curr_spot is None: curr_spot = Spot(row["spot"]) spots[row["spot"]] = curr_spot curr_spot.addRegion(pangenome.getOrAddRegion(row["RGP"].decode())) bar.update() bar.close() pangenome.addSpots(spots.values()) pangenome.status["spots"] = "Loaded"
[docs]def readAnnotation(pangenome, h5f, filename): annotations = h5f.root.annotations table = annotations.genes bar = tqdm(range(table.nrows), unit="gene") pangenomeDict = {} circularContigs = {} for row in read_chunks(table): try: pangenomeDict[row["organism"].decode()][row["contig"]["name"].decode()].append(row["gene"])#new gene, seen contig, seen org except KeyError: try: pangenomeDict[row["organism"].decode()][row["contig"]["name"].decode()] = [row["gene"]]#new contig, seen org circularContigs[row["organism"].decode()][row["contig"]["name"].decode()] = row["contig"]["is_circular"] except KeyError: pangenomeDict[sys.intern(row["organism"].decode())] = { row["contig"]["name"].decode() : [row["gene"]]}#new org circularContigs[row["organism"].decode()] = {row["contig"]["name"].decode():row["contig"]["is_circular"]} bar.update() bar.close() link = True if pangenome.status["genesClustered"] in ["Computed","Loaded"] else False bar = tqdm(range(len(pangenomeDict)), unit = "organism") for orgName, contigDict in pangenomeDict.items(): readOrganism(pangenome, orgName, contigDict, circularContigs[orgName], link) bar.update() bar.close() pangenome.status["genomesAnnotated"] = "Loaded"
[docs]def readInfo(h5f): if "/info" in h5f: infoGroup = h5f.root.info print(f"Genes : {infoGroup._v_attrs['numberOfGenes']}") if "numberOfOrganisms" in infoGroup._v_attrs._f_list(): print(f"Organisms : {infoGroup._v_attrs['numberOfOrganisms']}") if "numberOfClusters" in infoGroup._v_attrs._f_list(): print(f"Families : {infoGroup._v_attrs['numberOfClusters']}") if "numberOfEdges" in infoGroup._v_attrs._f_list(): print(f"Edges : {infoGroup._v_attrs['numberOfEdges']}") if 'numberOfCloud' in infoGroup._v_attrs._f_list():#then all the others are there print(f"Persistent ( { ', '.join([key + ':' + str(round(val,2)) for key, val in infoGroup._v_attrs['persistentStats'].items()])} ): {infoGroup._v_attrs['numberOfPersistent']}") print(f"Shell ( { ', '.join([key + ':' + str(round(val,2)) for key, val in infoGroup._v_attrs['shellStats'].items()])} ): {infoGroup._v_attrs['numberOfShell']}") print(f"Cloud ( { ', '.join([key + ':' + str(round(val,2)) for key, val in infoGroup._v_attrs['cloudStats'].items()])} ): {infoGroup._v_attrs['numberOfCloud']}") print(f"Number of partitions : {infoGroup._v_attrs['numberOfPartitions']}") if infoGroup._v_attrs['numberOfPartitions'] != 3: for key, val in infoGroup._v_attrs['numberOfSubpartitions'].items(): print(f"Shell {key} : {val}") if 'numberOfRGP' in infoGroup._v_attrs._f_list(): print(f"RGPs : {infoGroup._v_attrs['numberOfRGP']}") if 'numberOfSpots' in infoGroup._v_attrs._f_list(): print(f"Spots : {infoGroup._v_attrs['numberOfSpots']}")
[docs]def readParameters(h5f): if "/info" in h5f: infoGroup = h5f.root.info if "parameters" in infoGroup._v_attrs._f_list(): for key, dic in infoGroup._v_attrs["parameters"].items(): print(f"{key}") for key2, val in dic.items(): print(f" {key2} : {val}")
[docs]def readPangenome(pangenome, annotation = False, geneFamilies = False, graph = False, rgp = False, spots = False): """ Reads a previously written pangenome, with all of its parts, depending on what is asked, with regards to what is filled in the 'status' field of the hdf5 file. """ # compressionFilter = tables.Filters(complevel=1, complib='blosc:lz4') if hasattr(pangenome,"file"): filename = pangenome.file else: raise FileNotFoundError("The provided pangenome does not have an associated .h5 file") h5f = tables.open_file(filename,"r") if annotation: if h5f.root.status._v_attrs.genomesAnnotated: logging.getLogger().info("Reading pangenome annotations...") readAnnotation(pangenome, h5f, filename) else: raise Exception(f"The pangenome in file '{filename}' has not been annotated, or has been improperly filled") if geneFamilies: if h5f.root.status._v_attrs.genesClustered: logging.getLogger().info("Reading pangenome gene families...") readGeneFamilies(pangenome, h5f) readGeneFamiliesInfo(pangenome, h5f) else: raise Exception(f"The pangenome in file '{filename}' does not have gene families, or has been improperly filled") if graph: if h5f.root.status._v_attrs.NeighborsGraph: logging.getLogger().info("Reading the neighbors graph edges...") readGraph(pangenome, h5f) else: raise Exception(f"The pangenome in file '{filename}' does not have graph information, or has been improperly filled") if rgp: if h5f.root.status._v_attrs.predictedRGP: logging.getLogger().info("Reading the RGP...") readRGP(pangenome, h5f) else: raise Exception(f"The pangenome in file '{filename}' does not have RGP information, or has been improperly filled") if spots: if h5f.root.status._v_attrs.spots: logging.getLogger().info("Reading the spots...") readSpots(pangenome, h5f) else: raise Exception(f"The pangenome in file '{filename}' does not have spots information, or has been improperly filled") h5f.close()
[docs]def checkPangenomeInfo(pangenome, needAnnotations = False, needFamilies = False, needGraph = False, needPartitions = False, needRGP = False, needSpots = False): """defines what needs to be read depending on what is needed, and automatically checks if the required elements have been computed with regards to the pangenome.status""" annotation = False geneFamilies = False graph = False rgp = False spots = False if needAnnotations: if pangenome.status["genomesAnnotated"] == "inFile": annotation = True elif not pangenome.status["genomesAnnotated"] in ["Computed","Loaded"]: raise Exception("Your pangenome has no genes. See the 'annotate' subcommand.") if needFamilies: if pangenome.status["genesClustered"] == "inFile": geneFamilies = True elif not pangenome.status["genesClustered"] in ["Computed","Loaded"]: raise Exception("Your pangenome has no gene families. See the 'cluster' subcommand.") if needGraph: if pangenome.status["neighborsGraph"] == "inFile": graph=True elif not pangenome.status["neighborsGraph"] in ["Computed","Loaded"]: raise Exception("Your pangenome does not have a graph (no edges). See the 'graph' subcommand.") if needPartitions: if not pangenome.status["partitionned"] in ["Computed", "Loaded", "inFile"]: raise Exception("Your pangenome has not been partitionned. See the 'partition' subcommand") if needRGP: if pangenome.status["predictedRGP"] == "inFile": rgp = True elif not pangenome.status["predictedRGP"] in ["Computed","Loaded"]: raise Exception("Your pangenome's regions of genomic plasticity have not been predicted. See the 'rgp' subcommand") if needSpots: if pangenome.status["spots"] == "inFile": spots = True elif not pangenome.status["spots"] in ["Computed","Loaded"]: raise Exception("Your pangenome spots of insertion have not been predicted. See the 'spot' subcommand") if annotation or geneFamilies or graph or rgp or spots:#if anything is true, else we need nothing. readPangenome(pangenome, annotation=annotation, geneFamilies=geneFamilies, graph=graph, rgp = rgp, spots = spots)