Source code for ppanggolin.nem.rarefaction

#!/usr/bin/env python3
#coding:utf-8

#default libraries
import argparse
from collections import Counter
import logging
import random
import tempfile
import time
from multiprocessing import Pool
import os
import warnings

#installed libraries
from tqdm import tqdm
import gmpy2
import numpy
from pandas import Series, read_csv
import plotly.offline as out_plotly
import plotly.graph_objs as go
import scipy.optimize as optimization

#local libraries
from ppanggolin.pangenome import Pangenome
from ppanggolin.utils import mkOutdir
from ppanggolin.formats import checkPangenomeInfo
import ppanggolin.nem.partition as ppp#import this way to use the global variable pan defined in ppanggolin.nem.partition

samples = []

[docs]def raref_nem(index, tmpdir, beta, sm_degree, free_dispersion, chunk_size, K, krange, seed): samp = samples[index] currtmpdir = tmpdir+"/"+str(index)+"/" if K < 3: K = ppp.evaluate_nb_partitions(samp, sm_degree, free_dispersion, chunk_size, krange, 0.05, False, 1, tmpdir + "/" + str(index) + "_eval", seed, None) if len(samp) <= chunk_size:#all good, just write stuff. edges_weight, nb_fam = ppp.write_nem_input_files(tmpdir=currtmpdir,organisms= set(samp), sm_degree = sm_degree) cpt_partition = ppp.run_partitioning( currtmpdir, len(samp), beta * (nb_fam/edges_weight), free_dispersion, K = K, seed = seed, init = "param_file")[0] else:#going to need multiple partitionnings for this sample... families = set() cpt_partition = {} validated = set() cpt=0 def validate_family(result): for node, nem_class in result[0].items(): cpt_partition[node][nem_class[0]]+=1 sum_partionning = sum(cpt_partition[node].values()) if (sum_partionning > len(samp)/chunk_size and max(cpt_partition[node].values()) >= sum_partionning*0.5) or (sum_partionning > len(samp)): if node not in validated: if max(cpt_partition[node].values()) < sum_partionning*0.5: cpt_partition[node]["U"] = len(samp) validated.add(node) for fam in ppp.pan.geneFamilies: if not samp.isdisjoint(fam.organisms):#otherwise useless to keep track of families.add(fam) cpt_partition[fam.name] = {"P":0,"S":0,"C":0,"U":0} org_nb_sample = Counter() for org in samp: org_nb_sample[org] = 0 condition = len(samp)/chunk_size while len(validated) < len(families): org_samples = [] while not all(val >= condition for val in org_nb_sample.values()):#each family must be tested at least len(select_organisms)/chunk_size times. shuffled_orgs = list(samp)#copy select_organisms random.shuffle(shuffled_orgs)#shuffle the copied list while len(shuffled_orgs) > chunk_size: org_samples.append(set(shuffled_orgs[:chunk_size])) for org in org_samples[-1]: org_nb_sample[org] +=1 shuffled_orgs = shuffled_orgs[chunk_size:] #making arguments for all samples: for samp in org_samples: edges_weight, nb_fam = ppp.write_nem_input_files( currtmpdir+"/"+str(cpt)+"/", samp, sm_degree = sm_degree) validate_family(ppp.run_partitioning( currtmpdir+"/"+str(cpt)+"/", len(samp), beta * (nb_fam/edges_weight), free_dispersion, K = K, seed = seed, init = "param_file")) cpt+=1 if len(cpt_partition) == 0: counts = {"persistent":"NA","shell":"NA","cloud":"NA", "undefined":"NA", "K": K} else: counts = {"persistent":0,"shell":0,"cloud":0, "undefined":0, "K":K} for val in cpt_partition.values(): if isinstance(val, str): part = val else: part = max(val, key=val.get) if part.startswith("P"): counts["persistent"]+=1 elif part.startswith("C"): counts["cloud"]+=1 elif part.startswith("S"): counts["shell"]+=1 else: counts["undefined"]+=1 return (counts, index)
[docs]def launch_raref_nem(args): return raref_nem(*args)
[docs]def drawCurve(output, maxSampling, data): logging.getLogger().info("Drawing the rarefaction curve ...") rarefName = output + "/rarefaction.csv" raref = open(rarefName, "w") raref.write(",".join(["nb_org","persistent","shell","cloud","undefined","exact_core","exact_accessory","soft_core","soft_accessory","pangenome","K"])+"\n") for part in data: raref.write(",".join(map(str,[part["nborgs"], part["persistent"],part["shell"],part["cloud"], part["undefined"], part["exact_core"], part["exact_accessory"], part["soft_core"], part["soft_accessory"], part["exact_core"] + part["exact_accessory"],part["K"]])) + "\n") raref.close() def heap_law(N, kappa, gamma): return kappa*N**(gamma) def PolyArea(x,y): return 0.5*numpy.abs(numpy.dot(x,numpy.roll(y,1))-numpy.dot(y,numpy.roll(x,1))) annotations = [] traces = [] data_raref = read_csv(rarefName,index_col=False) params_file = open(output+"/rarefaction_parameters"+".csv","w") params_file.write("partition,kappa,gamma,kappa_std_error,gamma_std_error,IQR_area\n") for partition in ["persistent","shell","cloud","undefined","exact_core","exact_accessory","soft_core","soft_accessory","pangenome"]: percentiles_75 = Series({i:numpy.nanpercentile(data_raref[data_raref["nb_org"]==i][partition],75) for i in range(1,maxSampling+1)}).dropna() percentiles_25 = Series({i:numpy.nanpercentile(data_raref[data_raref["nb_org"]==i][partition],25) for i in range(1,maxSampling+1)}).dropna() mins = Series({i:numpy.min(data_raref[data_raref["nb_org"]==i][partition]) for i in range(1,maxSampling+1)}).dropna() maxs = Series({i:numpy.max(data_raref[data_raref["nb_org"]==i][partition]) for i in range(1,maxSampling+1)}).dropna() medians = Series({i:numpy.median(data_raref[data_raref["nb_org"]==i][partition]) for i in range(1,maxSampling+1)}).dropna() means = Series({i:numpy.mean(data_raref[data_raref["nb_org"]==i][partition]) for i in range(1,maxSampling+1)}).dropna() initial_kappa_gamma = numpy.array([0.0, 0.0]) x = percentiles_25.index.tolist() x += list(reversed(percentiles_25.index.tolist())) area_IQR = PolyArea(x,percentiles_25.tolist()+percentiles_75.tolist()) nb_org_min_fitting = 15 COLORS = {"pangenome":"black", "exact_accessory":"#EB37ED", "exact_core" :"#FF2828", "soft_core":"#c7c938", "soft_accessory":"#996633","shell": "#00D860", "persistent":"#F7A507", "cloud":"#79DEFF", "undefined":"#828282"} try: all_values = data_raref[data_raref["nb_org"]>nb_org_min_fitting][partition].dropna() res = optimization.curve_fit(heap_law, data_raref.loc[all_values.index]["nb_org"],all_values,initial_kappa_gamma) kappa, gamma = res[0] error_k,error_g = numpy.sqrt(numpy.diag(res[1])) # to calculate the fitting error. The variance of parameters are the diagonal elements of the variance-co variance matrix, and the standard error is the square root of it. source https://stackoverflow.com/questions/25234996/getting-standard-error-associated-with-parameter-estimates-from-scipy-optimize-c if numpy.isinf(error_k) and numpy.isinf(error_g): params_file.write(",".join([partition,"NA","NA","NA","NA",str(area_IQR)])+"\n") else: params_file.write(",".join([partition,str(kappa),str(gamma),str(error_k),str(error_g),str(area_IQR)])+"\n") regression = numpy.apply_along_axis(heap_law, 0, range(nb_org_min_fitting+1,maxSampling+1), kappa, gamma) regression_sd_top = numpy.apply_along_axis(heap_law, 0, range(nb_org_min_fitting+1,maxSampling+1), kappa-error_k, gamma+error_g) regression_sd_bottom = numpy.apply_along_axis(heap_law, 0, range(nb_org_min_fitting+1,maxSampling+1), kappa+error_k, gamma-error_g) traces.append(go.Scatter(x=list(range(nb_org_min_fitting+1,maxSampling+1)), y=regression, name = partition+": Heaps' law", line = dict(color = COLORS[partition], width = 4, dash = 'dash'), visible = "legendonly" if partition == "undefined" else True)) traces.append(go.Scatter(x=list(range(nb_org_min_fitting+1,maxSampling+1)), y=regression_sd_top, name = partition+": Heaps' law error +", line = dict(color = COLORS[partition], width = 1, dash = 'dash'), visible = "legendonly" if partition == "undefined" else True)) traces.append(go.Scatter(x=list(range(nb_org_min_fitting+1,maxSampling+1)), y=regression_sd_bottom, name = partition+": Heaps' law error -", line = dict(color = COLORS[partition], width = 1, dash = 'dash'), visible = "legendonly" if partition == "undefined" else True)) annotations.append(dict(x=maxSampling, y=heap_law(maxSampling,kappa, gamma), ay=0, ax=50, text="F="+str(round(kappa,0))+"N"+"<sup>"+str(round(gamma,5))+"</sup><br>IQRarea="+str(round (area_IQR,2)), showarrow=True, arrowhead=7, font=dict(size=10,color='white'), align='center', arrowcolor=COLORS[partition], bordercolor='#c7c7c7', borderwidth=2, borderpad=4, bgcolor=COLORS[partition], opacity=0.8)) except (TypeError, RuntimeError, ValueError):# if fitting doesn't work params_file.write(",".join([partition,"NA","NA","NA","NA",str(area_IQR)])+"\n") traces.append(go.Scatter(x=medians.index, y=medians, name = partition+" : medians", mode="lines+markers", error_y=dict(type='data', symmetric=False, array=maxs.subtract(medians), arrayminus=medians.subtract(mins), visible=True, color = COLORS[partition], thickness =0.5), line = dict(color = COLORS[partition], width = 1), marker=dict(color = COLORS[partition], symbol=3,size = 8,opacity = 0.5), visible = "legendonly" if partition == "undefined" else True)) traces.append(go.Scatter(x=means.index, y=means, name = partition+" : means", mode="markers", marker=dict(color = COLORS[partition], symbol=4,size= 8,opacity = 0.5), visible = "legendonly" if partition == "undefined" else True)) # up = percentiles_75 # down = percentiles_25 # IQR_area = up.append(down[::-1]) # traces.append(go.Scatter(x=IQR_area.index, # y=IQR_area, # name = "IQR", # fill='toself', # mode="lines", # hoveron="points", # #hovertext=[str(round(e)) for e in half_stds.multiply(2)], # line=dict(color=COLORS[partition]), # marker=dict(color = COLORS[partition]), # visible = "legendonly" if partition == "undefined" else True)) traces.append(go.Scatter(x=percentiles_75.index, y=percentiles_75, name = partition+" : 3rd quartile", mode="lines", hoveron="points", #hovertext=[str(round(e)) for e in half_stds.multiply(2)], line=dict(color=COLORS[partition]), marker=dict(color = COLORS[partition]), visible = "legendonly" if partition == "undefined" else True)) traces.append(go.Scatter(x=percentiles_25.index, y=percentiles_25, name = partition+" : 1st quartile", fill='tonexty', mode="lines", hoveron="points", #hovertext=[str(round(e)) for e in half_stds.multiply(2)], line=dict(color=COLORS[partition]), marker=dict(color = COLORS[partition]), visible = "legendonly" if partition == "undefined" else True)) layout = go.Layout(title = "Rarefaction curve ", titlefont = dict(size = 20), xaxis = dict(title='size of genome subsets (N)'), yaxis = dict(title='# of gene families (F)'), annotations=annotations, plot_bgcolor='#ffffff') fig = go.Figure(data=traces, layout=layout) out_plotly.plot(fig, filename=output+"/rarefaction_curve.html", auto_open=False) params_file.close()
[docs]def makeRarefactionCurve( pangenome, output, tmpdir, beta=2.5, depth = 30, minSampling =1, maxSampling = 100, sm_degree = 10, free_dispersion=False, chunk_size = 500, K=-1, cpu = 1, seed=42, kestimate = False, krange = [3,-1], soft_core = 0.95): ppp.pan = pangenome#use the global from partition to store the pangenome, so that it is usable try: krange[0] = ppp.pan.parameters["partition"]["K"] if krange[0]<0 else krange[0] krange[1] = ppp.pan.parameters["partition"]["K"] if krange[1]<0 else krange[1] except KeyError: krange=[3,20] checkPangenomeInfo(pangenome, needAnnotations=True, needFamilies=True, needGraph=True) tmpdirObj = tempfile.TemporaryDirectory(dir=tmpdir) tmpdir = tmpdirObj.name if float(len(pangenome.organisms)) < maxSampling: maxSampling = len(pangenome.organisms) else: maxSampling = int(maxSampling) if K < 3 and kestimate is False:#estimate K once and for all. try: K = ppp.pan.parameters["partition"]["K"] logging.getLogger().info(f"Reuse the number of partitions {K}") except KeyError: logging.getLogger().info("Estimating the number of partitions...") K = ppp.evaluate_nb_partitions(pangenome.organisms, sm_degree, free_dispersion, chunk_size, krange, 0.05, False, cpu, tmpdir, seed, None) logging.getLogger().info(f"The number of partitions has been evaluated at {K}") logging.getLogger().info("Extracting samples ...") AllSamples = [] for i in range(minSampling,maxSampling):#each point for _ in range(depth):#number of samples per points AllSamples.append(set(random.sample(set(pangenome.organisms), i+1))) logging.getLogger().info(f"Done sampling organisms in the pangenome, there are {len(AllSamples)} samples") SampNbPerPart = [] logging.getLogger().info("Computing bitarrays for each family...") index_org = pangenome.computeFamilyBitarrays() logging.getLogger().info(f"Done computing bitarrays. Comparing them to get exact and soft core stats for {len(AllSamples)} samples...") bar = tqdm( range(len(AllSamples) * len(pangenome.geneFamilies)), unit = "gene family") for samp in AllSamples: #make the sample's organism bitarray. sampBitarray = gmpy2.xmpz(0) for org in samp: sampBitarray[index_org[org]] = 1 part = Counter() part["soft_core"] = 0 part["exact_core"] = 0 part["exact_accessory"] = 0 part["soft_accessory"] = 0 for fam in pangenome.geneFamilies: nbCommonOrg = gmpy2.popcount(fam.bitarray & sampBitarray) part["nborgs"] = len(samp) if nbCommonOrg != 0:#in that case the node 'does not exist' if nbCommonOrg == len(samp): part["exact_core"] +=1 else: part["exact_accessory"] +=1 if float(nbCommonOrg) >= len(samp) * soft_core: part["soft_core"] +=1 else: part["soft_accessory"] +=1 bar.update() SampNbPerPart.append(part) bar.close() #done with frequency of each family for each sample. global samples samples = AllSamples args = [] for index, samp in enumerate(samples): args.append((index, tmpdir, beta, sm_degree, free_dispersion, chunk_size, K, krange, seed)) with Pool(processes = cpu) as p: #launch partitionnings logging.getLogger().info("Partitionning all samples...") bar = tqdm(range(len(args)), unit = "samples partitionned") random.shuffle(args)#shuffling the processing so that the progress bar is closer to reality. for result in p.imap_unordered(launch_raref_nem, args): SampNbPerPart[result[1]] = {**result[0], **SampNbPerPart[result[1]]} bar.update() bar.close() logging.getLogger().info("Done partitionning everything") warnings.filterwarnings("ignore") drawCurve(output, maxSampling, SampNbPerPart ) warnings.resetwarnings() tmpdirObj.cleanup() logging.getLogger().info("Done making the rarefaction curves")
[docs]def launch(args): """ main code when launch partition from the command line. """ mkOutdir(args.output, args.force) pangenome = Pangenome() pangenome.addFile(args.pangenome) makeRarefactionCurve( pangenome = pangenome, output = args.output, tmpdir = args.tmpdir, beta =args.beta, depth = args.depth, minSampling=args.min, maxSampling=args.max, sm_degree=args.max_degree_smoothing, free_dispersion=args.free_dispersion, chunk_size=args.chunk_size, K=args.nb_of_partitions, cpu = args.cpu, seed = args.seed, kestimate=args.reestimate_K, krange = args.krange, soft_core = args.soft_core)
[docs]def rarefactionSubparser(subparser): parser = subparser.add_parser("rarefaction", formatter_class=argparse.ArgumentDefaultsHelpFormatter) required = parser.add_argument_group(title = "Required arguments", description = "One of the following arguments is required :") required.add_argument('-p','--pangenome', required=True, type=str, help="The pangenome .h5 file") optional = parser.add_argument_group(title = "Optional arguments") optional.add_argument("-b","--beta", required = False, default = 2.5, type = float, help = "beta is the strength of the smoothing using the graph topology during partitionning. 0 will deactivate spatial smoothing.") optional.add_argument("--depth",required=False, default = 30, type=int, help = "Number of samplings at each sampling point") optional.add_argument("--min",required=False, default = 1, type=int, help = "Minimum number of organisms in a sample") optional.add_argument("--max", required= False, type=float, default = 100, help = "Maximum number of organisms in a sample (if above the number of provided organisms, the provided organisms will be the maximum)") optional.add_argument("-ms","--max_degree_smoothing",required = False, default = 10, type=float, help = "max. degree of the nodes to be included in the smoothing process.") optional.add_argument('-o','--output', required=False, type=str, default="ppanggolin_output"+time.strftime("_DATE%Y-%m-%d_HOUR%H.%M.%S", time.localtime())+"_PID"+str(os.getpid()), help="Output directory") optional.add_argument("-fd","--free_dispersion",required = False, default = False, action = "store_true",help = "use if the dispersion around the centroid vector of each partition during must be free. It will be the same for all organisms by default.") optional.add_argument("-ck","--chunk_size",required=False, default = 500, type = int, help = "Size of the chunks when performing partitionning using chunks of organisms. Chunk partitionning will be used automatically if the number of genomes is above this number.") optional.add_argument("-K","--nb_of_partitions",required=False, default=-1, type=int, help = "Number of partitions to use. Must be at least 3. By default reuse K if it exists else compute it.") optional.add_argument("--reestimate_K", required=False, action="store_true", help = " Will recompute the number of partitions for each sample (between the values provided by --krange) (VERY intensive. Can take a long time.)") optional.add_argument("-Kmm","--krange",nargs=2,required = False, type=int, default=[3,-1], help="Range of K values to test when detecting K automatically. Default between 3 and the K previously computed if there is one, or 20 if there are none.") optional.add_argument("--soft_core",required=False, type=float, default = 0.95, help = "Soft core threshold") optional.add_argument("-se", "--seed", type = int, default = 42, help="seed used to generate random numbers") return parser