import json
import sys
import tarfile
import tempfile
import time
import warnings
from multiprocessing import Pool
from multiprocessing import cpu_count
from pathlib import Path
from typing import Optional
import numpy as np
import pandas as pd
from padmet.classes import PadmetRef
from padmet.utils.sbmlPlugin import convert_from_coded_id as cfci
from scipy import stats
from scipy.spatial.distance import pdist
from scipy.spatial.distance import squareform
from skbio.stats.ordination import pcoa
from statsmodels.stats.multitest import multipletests
try:
import polars as pl
except Exception as e:
print(e)
print("CALLING subprocess to uninstall polars and polars-lts-cpu. Then reinstall polars-lts-cpu.")
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "uninstall", "-y", "polars"]) # noqa: S603
subprocess.check_call([sys.executable, "-m", "pip", "uninstall", "-y", "polars-lts-cpu"]) # noqa: S603
subprocess.check_call([sys.executable, "-m", "pip", "install", "polars-lts-cpu"]) # noqa: S603
import polars as pl
[docs]
def file_exist(filename: str, directory_path: Path) -> bool:
for file in directory_path.iterdir():
if filename == file.name:
return True
return False
[docs]
def get_size(obj, seen=None):
"""Recursively finds size of objects"""
size = sys.getsizeof(obj)
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
# Important mark as seen *before* entering recursion to gracefully handle
# self-referential objects
seen.add(obj_id)
if isinstance(obj, dict):
size += sum([get_size(v, seen) for v in obj.values()])
size += sum([get_size(k, seen) for k in obj.keys()])
elif hasattr(obj, "__dict__"):
size += get_size(obj.__dict__, seen)
elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes, bytearray)):
size += sum([get_size(i, seen) for i in obj])
return size
[docs]
def is_valid_dir(dirpath: Path):
"""Return True if directory exists or not
Args:
dirpath (str): path of directory
Returns:
bool: True if dir exists, False otherwise
"""
if dirpath.is_dir:
return True
else:
return False
[docs]
def has_only_unique_value(dataframe , input1, input2: str = "None"):
"""
Return True if the dataframe's column(s) only has unique value, False otherwise.
Args:
dataframe (pd.DataFrame): _description_
column_value (_type_): _description_
input1 (_type_): _description_
input2 (str, optional): _description_. Defaults to "None".
"""
nb_row = len(dataframe)
if isinstance(dataframe, pl.DataFrame):
if input2 == "None":
return True if nb_row == len(dataframe.get_column(input1).unique()) else False
else:
return True if nb_row == len(dataframe.get_column(input1).unique()) and nb_row == len(dataframe.get_column(input2).unique()) else False
if isinstance(dataframe, pd.DataFrame):
if input2 == "None":
return True if nb_row == len(dataframe[input1].unique()) else False
else:
return True if nb_row == len(dataframe[input1].unique()) and nb_row == len(dataframe[input2].unique()) else False
[docs]
def relative_abundance(abundance_path: Path, save_path: Path, cscope_dir: Path, scope: str):
"""Generate a second main_dataframe with the production based on weight from the abundance matrix.
Args:
abundance_matrix (Path): Pathlib Path of the abundance file.
sample_cscope (Path): Pathlib Path of the cscope directory.
save_path (Path): Pathlib Path of the output directory.
Raises:
RuntimeError: If more than one column of type other than numeric.
Returns:
Dataframe: production dataframe with sample in rows and compounds in column. Weighted by abundance.
"""
if scope == "cscope":
filename_to_check = "normalised_abundance_dataframe_postaviz.tsv"
if scope == "iscope":
filename_to_check = "normalised_iscope_abundance_dataframe_postaviz.tsv"
if file_exist(filename_to_check, save_path) and file_exist("abundance_file.tsv", save_path):
print("Abundance dataframe already exist in save directory.")
return
# Read csv with pandas to avoid Polars schema lenght limit. Reset index and transform to polars.
abundance_df = pd.read_csv(abundance_path, sep="\t").reset_index(names="pd_index")
abundance_df = pl.DataFrame(abundance_df)
# Check to see if reset index produced a default Int64 index. If yes remove it.
if abundance_df.get_column("pd_index").dtype == pl.Int64:
abundance_df = abundance_df.drop("pd_index")
# The list of columns dtype String MAYBE USE NON NUMERIC ?
non_numeric_col = abundance_df.select(pl.col(pl.String)).columns
# Only one column should be Str dtype. (indentifier column)
if len(non_numeric_col) == 1:
abundance_df = abundance_df.with_columns(pl.col(non_numeric_col[0]).alias("smplID")).drop(non_numeric_col[0])
else:
raise RuntimeError(f"The numbers of non numeric columns ({len(non_numeric_col)}) isn't equal to one.\n Only one STR (non-numeric at least) column must be present in the abundance dataframe to be used as identifier index.")
# Abundance_file.tsv needed in another function. Save with pandas index.
abundance_matrix = abundance_df.to_pandas().set_index("smplID")
abundance_matrix.to_csv(Path(save_path,"abundance_file.tsv"),sep="\t")
# Loop replacing Pandas apply and to normalise dataframe.
abundance_df = abundance_df.with_columns((pl.col(col)/pl.col(col).sum()).round(7) for col in abundance_df.select(pl.exclude("smplID")).columns)
# Abundance_file_normalised needed in another function. Save it with pandas.
abundance_matrix_normalised = abundance_df.to_pandas().set_index("smplID")
abundance_matrix_normalised.to_csv(Path(save_path,"abundance_file_normalised.tsv"),sep="\t")
# List filled with one rows dataframe from each samples.
res = []
for sample_path in cscope_dir.iterdir():
sample_id = sample_path.name.split(".parquet")[0]
sample_data = pl.read_parquet(sample_path)
tmp_abundance_df = abundance_df.select(["smplID",sample_id])
# Transpose allow to work on columns rather than rows which is more efficient.
sample_df = sample_data.transpose(include_header=True, header_name="Compounds", column_names="smplID")
sample_df = sample_df.with_columns(pl.col(col).mul(tmp_abundance_df.filter(pl.col("smplID") == col).get_column(sample_id).item()).name.keep() for col in sample_df.select(pl.exclude("Compounds")).columns)
sample_df = sample_df.transpose(include_header=True, header_name="smplID", column_names="Compounds")
sample_df = sample_df.sum().with_columns(pl.col("smplID").fill_null(sample_id))
res.append(sample_df)
final_df = pl.concat(res, how="diagonal").fill_null(0)
final_df = final_df.to_pandas()
final_df.set_index("smplID", inplace=True)
if scope == "cscope":
final_df.to_csv(Path(save_path,"normalised_abundance_dataframe_postaviz.tsv"),sep="\t")
else:
final_df.to_csv(Path(save_path,"normalised_iscope_abundance_dataframe_postaviz.tsv"),sep="\t")
print("Abundance done.")
[docs]
def open_tsv(file_name: str, convert_cpd_id: bool = False, rename_columns: bool = False, first_col: str = "smplID"):
"""Open tsv file as a pandas dataframe.
Args:
file_name (str): Path of the file
rename_columns (bool, optional): Rename the first column and decode the metabolites names in sbml format into readable format. Defaults to False.
first_col (str, optional): Label of the first col if rename_columns is True. Defaults to "smplID".
Returns:
Dataframe: Pandas dataframe
"""
data = pd.read_csv(file_name, sep="\t")
if rename_columns:
data.columns.values[0] = first_col
if convert_cpd_id:
data.set_index(first_col,inplace=True,drop=True)
data.columns = sbml_to_classic(data.columns.values)
return data
[docs]
def get_scopes(file_name, path: Path) -> pd.DataFrame:
for root, _dirs, files in path.walk():
if file_name in files:
scope_matrix = open_tsv(Path(root, file_name), convert_cpd_id=True, rename_columns=True)
return scope_matrix
[docs]
def is_indexed_by_id(df: pd.DataFrame):
if df.index.name == "smplID":
return True
else:
return False
[docs]
def sbml_to_classic(compounds_list):
uncoded = []
for coded in compounds_list:
id, id_type, compart = cfci(coded)
new_value = str(id)+"["+str(compart)+"]"
uncoded.append(new_value)
return uncoded
[docs]
def retrieve_all_cscope(sample, dir_path: Path, cscope_directoy: Path, cscope_file_format):
"""Retrieve iscope, cscope, added_value and contribution_of_microbes files in the path given using os.listdir().
Args:
path (str): Directory path
Returns:
dict: Return a nested dict object where each key is a dictionnary of a sample. The key of those second layer dict [iscope, cscope, advalue, contribution] give acces to these files.
"""
sample_directory_path = Path(dir_path, sample)
cscope_dataframe = get_scopes("rev_cscope.tsv", sample_directory_path)
if isinstance(cscope_dataframe,pd.DataFrame):
bin_list = cscope_dataframe.index.tolist()
cscope_dataframe.to_parquet(Path(cscope_directoy, sample + cscope_file_format), compression="gzip")
return bin_list, sample
else:
return None, sample
[docs]
def retrieve_all_iscope(sample, dir_path, iscope_directoy, iscope_file_format):
"""Retrieve iscope, cscope, added_value and contribution_of_microbes files in the path given using os.listdir().
Args:
path (str): Directory path
Returns:
dict: Return a nested dict object where each key is a dictionnary of a sample. The key of those second layer dict [iscope, cscope, advalue, contribution] give acces to these files.
"""
sample_directory_path = Path(dir_path, sample)
if sample_directory_path.is_dir():
iscope_dataframe = get_scopes("rev_iscope.tsv", sample_directory_path)
if isinstance(iscope_dataframe,pd.DataFrame):
iscope_dataframe.to_parquet(Path(iscope_directoy, sample+iscope_file_format), compression="gzip")
[docs]
def load_sample_cscope_data(dir_path: Path, cscope_directory: Path, cscope_file_format, save_path: Path): # Need rework
"""Open all directories given in -d path input. Get all cscopes, load and save them a dataframe in parquet.gzip format.
No RAM used during process that way.
Args:
path (str): Path of directory
Returns:
dict: sample_data dictionnary
"""
if Path(save_path,"sample_info.json").is_file():
return
print("Load/Save cscope data as parquet format...")
nb_cpu = cpu_count() - 1
if type(nb_cpu) is not int or nb_cpu < 1:
nb_cpu = 1
pool = Pool(nb_cpu)
pool = Pool(nb_cpu)
results_list = pool.starmap(retrieve_all_cscope,[(sample.name, dir_path, cscope_directory, cscope_file_format) for sample in dir_path.iterdir()])
pool.close()
pool.join()
sample_info = {}
sample_info["bins_list"] = []
sample_info["bins_count"] = {}
sample_info["bins_sample_list"] = {}
for all_bins_in_sample, sample in results_list:
if type(all_bins_in_sample) != list: # noqa: E721
continue
sample_info["bins_list"] = sample_info["bins_list"] + all_bins_in_sample
for bin in all_bins_in_sample:
if bin not in sample_info["bins_count"]:
sample_info["bins_count"][bin] = 0
sample_info["bins_count"][bin] += 1
if bin not in sample_info["bins_sample_list"]:
sample_info["bins_sample_list"][bin] = []
sample_info["bins_sample_list"][bin].append(str(sample))
# Remove duplicate from list
sample_info["bins_list"] = list(dict.fromkeys(sample_info["bins_list"]))
with open(Path(save_path,"sample_info.json"), "w") as f:
json.dump(sample_info, f)
print("Done.")
return sample_info
[docs]
def load_sample_iscope_data(dir_path: Path, iscope_directory: Path, iscope_file_format):
"""Load and save iscope data as dataframe in parquet.gzip format.
Args:
dir_path (Path): Directory path given in cli (-d)
iscope_directory (Path): Path of newly created save directory.
iscope_file_format (bool): Format to save.
"""
print("Load/Save iscope data as parquet format...")
nb_cpu = cpu_count() - 1
if type(nb_cpu) is not int or nb_cpu < 1:
nb_cpu = 1
pool = Pool(nb_cpu)
pool = Pool(nb_cpu)
pool.starmap(retrieve_all_iscope,[(sample.name, dir_path, iscope_directory, iscope_file_format) for sample in dir_path.iterdir()])
pool.close()
pool.join()
[docs]
def build_main_dataframe(save_path: Path, cscope_directory: Path):
"""Create and save the main dataframe. Samples in rows and compounds in columns.
It takes the compounds production in each samples cscope and return a pandas Series with 1 produced or 0 absent for each compounds.
Merge all the series returned into a dataframe.
Args:
sample_data (dict): Samples's cscope.
save_path (_type_): Save path given in CLI.
"""
if file_exist("main_dataframe_postaviz.tsv", save_path):
print("main dataframe already in save directory.")
return
print("Building main dataframe...")
all_series = []
for sample_filename in cscope_directory.iterdir():
sample_id = sample_filename.name.split(".parquet")[0]
current_sample_df = pd.read_parquet(Path(cscope_directory, sample_filename))
# Get all the compounds produced in this cscope.
serie_index = current_sample_df.columns.values
serie_data = []
# Since at least one bin must produce all of those compounds it is considered produced in the entire sample.
for _i in range(len(serie_index)):
serie_data.append(1)
# Make the Pandas Series with compounds as index and 1 as value and append in global for concatenation.
all_series.append(pd.Series(data=serie_data,index=serie_index,name=sample_id))
# Concatenation of all the serie into one dataframe.
results = pd.concat(all_series, axis=1).T
results.fillna(0,inplace=True)
results = results.astype(int)
results.index.name = "smplID"
results.to_csv(Path(save_path, "main_dataframe_postaviz.tsv"),sep="\t")
print("Main dataframe done and saved.")
[docs]
def build_dataframes(dir_path: Path, metadata_path: Path, abundance_path: Optional[Path] = None, taxonomic_path: Optional[Path] = None, save_path: Optional[Path] = None, metacyc: Optional[Path] = None):
"""Main function.
dir_path, metadata_path and save_path are necessary.
Generate most of the core dataframes to avoid calculation on the application side.
Args:
dir_path (Path): Directory path containing M2M output.
metadata_path (Path): Metadata file path.
abundance_path (Optional[Path], optional): Abundance file path. Defaults to None.
taxonomic_path (Optional[Path], optional): Taxonomic file path. Defaults to None.
save_path (Optional[Path], optional): Output path. Defaults to None.
metacyc (Optional[Path], optional): Metacyc DB file path. Defaults to None.
"""
if not is_valid_dir(dir_path):
print(dir_path, "Sample directory path is not a valid directory")
sys.exit(1)
if save_path is None:
sys.exit(1)
if not is_valid_dir(save_path):
save_path.mkdir(parents=True,exist_ok=True)
metadata_processing(metadata_path, save_path)
cscope_directory = Path(save_path,"sample_cscope_directory")
if not cscope_directory.is_dir():
cscope_directory.mkdir()
cscope_file_format = ".parquet.gzip"
load_sample_cscope_data(dir_path, cscope_directory, cscope_file_format, save_path)
producers_dataframe(cscope_directory, save_path, "cscope")
build_main_dataframe(save_path, cscope_directory)
build_compounds_index(save_path)
if abundance_path is not None:
relative_abundance(abundance_path, save_path, cscope_directory, "cscope")
if taxonomic_path is not None:
taxonomy_processing(taxonomic_path, save_path)
total_production_by_sample(save_path, abundance_path)
build_pcoa_dataframe(save_path)
iscope_directory = Path(save_path,"sample_iscope_directory")
if not iscope_directory.is_dir():
iscope_directory.mkdir()
iscope_file_format = ".parquet.gzip"
load_sample_iscope_data(dir_path, iscope_directory, iscope_file_format)
if abundance_path is not None:
relative_abundance(abundance_path, save_path, iscope_directory, "iscope")
producers_dataframe(iscope_directory, save_path, "iscope")
iscope_cscope_fill_difference(save_path)
bin_dataframe_build(cscope_directory, "cscope", abundance_path, taxonomic_path, save_path)
concat_bin_dataframe(save_path, "cscope")
bin_dataframe_build(iscope_directory, "iscope", abundance_path, taxonomic_path, save_path)
concat_bin_dataframe(save_path, "iscope")
# Metacyc database TREE
if metacyc is not None:
padmet_to_tree(save_path, metacyc)
[docs]
def total_production_by_sample(save_path: Path, abundance_path: Optional[Path] = None):
"""Create and save the total production dataframe. This dataframe contain all samples in row and all compounds in columns.
For each samples the compounds produced by each bins is sum up to get the estimated total production of compound by samples
and the number of bins who produced those compounds.
If the abundance is provided, each production (1) of bins is multiplied by their abundance in their sample which gives an
estimated production of compounds weighted by the abundance of the bin producer.
Args:
save_path (_type_): Save path given in CLI
abundance_path (str, optional): Abundance file path fiven in CLI. Defaults to None.
"""
if file_exist("total_production_dataframe_postaviz.tsv", save_path):
print("total_production_dataframe_postaviz already in save directory")
return
print("Building total production dataframe...")
main_dataframe = open_tsv(Path(save_path, "main_dataframe_postaviz.tsv"))
metadata_dataframe = pd.read_parquet(Path(save_path, "metadata_dataframe_postaviz.parquet.gzip"))
if not is_indexed_by_id(main_dataframe):
main_dataframe.set_index("smplID",inplace=True,drop=True)
main_dataframe["Total_production"] = main_dataframe.apply(lambda row: row.to_numpy().sum(), axis=1)
results = pd.DataFrame(main_dataframe["Total_production"])
if abundance_path is not None:
abundance_dataframe = open_tsv(Path(save_path, "normalised_abundance_dataframe_postaviz.tsv"))
if not is_indexed_by_id(abundance_dataframe):
abundance_dataframe.set_index("smplID",inplace=True,drop=True)
abundance_dataframe["Total_abundance_weighted"] = abundance_dataframe.apply(lambda row: row.to_numpy().sum(), axis=1)
abundance_dataframe = abundance_dataframe["Total_abundance_weighted"]
results = pd.concat([results,abundance_dataframe], axis=1)
results.reset_index(inplace=True)
results = results.merge(metadata_dataframe,"inner","smplID")
results.to_csv(Path(save_path, "total_production_dataframe_postaviz.tsv"),sep="\t", index=False)
print("Total production dataframe done and saved.")
return
[docs]
def preprocessing_for_statistical_tests(dataframe: pd.DataFrame, y_value, input1, input2 = None, multipletests: bool = False, multipletests_method: str = "bonferroni"):
"""Create dataframe for each y_value in the list, to separate them and use wilcoxon_man_whitney function.
Concat all results into one dataframe.
Args:
dataframe (pd.DataFrame): Dataframe to test.
y_value (_type_): list of columns labels to separate into several dataframe. Must be at least of lenght 1.
input1 (_type_): First user's input.
input2 (_type_, optional): Second user's input. Defaults to None.
Returns:
Dataframe: Dataframe of statistical test.
"""
all_results = []
for y in y_value:
if input2 is None:
all_results.append(wilcoxon_man_whitney(dataframe[[y, input1]], y, input1, None, multipletests, multipletests_method))
else:
all_results.append(wilcoxon_man_whitney(dataframe[[y, input1, input2]], y , input1, input2, multipletests, multipletests_method))
return pd.concat(all_results)
[docs]
def wilcoxon_man_whitney(dataframe: pd.DataFrame, y, first_factor: str, second_factor: Optional[str] = None, multiple_correction: bool = False, correction_method: str = "hs"):
"""
Takes one dataframe with only one value column y and return a dataframe of statistical tests.
First all sub arrays by the first input then the second input are made and convert to numpy array.
Then Wilcoxon or Mann Whitney test are run on each pair without doublon.
If pairs array have the same lenght -> Wilcoxon, if not -> Mann Whitney
Args:
dataframe (pd.Dataframe): Pandas dataframe
y (str): Column label containing the values to test.
first_factor (str): Column label of the first user's input.
second_factor (str): Column label of the second user's input. Default to None
Returns:
Dataframe: Dataframe of test's results.
"""
# Array sorting by the first input and the second input if NOT None.
sub_dataframes = {}
for first_factor_array in dataframe[first_factor].unique():
if second_factor is None:
sub_dataframes[first_factor_array] = dataframe.loc[dataframe[first_factor] == first_factor_array][y].to_numpy()
continue
sub_dataframes[first_factor_array] = {}
for second_factor_array in dataframe[second_factor].unique():
sub_dataframes[first_factor_array][second_factor_array] = dataframe.loc[(dataframe[first_factor] == first_factor_array) & (dataframe[second_factor] == second_factor_array)][y].to_numpy()
# Dataframe's structure declaration, Axis column added if second input is NOT None.
if second_factor is None:
results = pd.DataFrame(columns=["Compound", "Factor1", "Sample size1", "Factor2", "Sample size2", "Method", "Statistic", "Pvalue", "Significance"])
else:
results = pd.DataFrame(columns=["Compound", "Axis", "Factor1", "Sample size1", "Factor2", "Sample size2", "Method", "Statistic", "Pvalue", "Significance"])
# Test each pairs avoiding useless duplicates and Array of lenght <= 1.
for name in sub_dataframes.keys(): # noqa: PLC0206
if second_factor is None: # One input selected
for name2 in sub_dataframes.keys(): # noqa: PLC0206
if name == name2:
continue
if len(sub_dataframes[name]) < 1 or len(sub_dataframes[name2]) < 1:
continue
if name2 in results["Factor1"].tolist():
continue
if len(sub_dataframes[name]) == len(sub_dataframes[name2]):
test_value, pvalue = stats.wilcoxon(sub_dataframes[name], sub_dataframes[name2])
test_method = "Wilcoxon"
else:
test_value, pvalue = stats.mannwhitneyu(sub_dataframes[name], sub_dataframes[name2])
test_method = "Mann-Whitney"
if pvalue >= 0.05:
symbol = "ns"
elif pvalue >= 0.01:
symbol = "*"
elif pvalue >= 0.001:
symbol = "**"
else:
symbol = "***"
results.loc[len(results)] = {"Compound": y,
"Factor1": name, "Sample size1": len(sub_dataframes[name]),
"Factor2": name2, "Sample size2": len(sub_dataframes[name2]),
"Statistic": test_value, "Pvalue": pvalue, "Significance": symbol, "Method": test_method}
else: # Two inputs selected
for name2 in sub_dataframes[name].keys():
for name3 in sub_dataframes[name].keys():
if name2 == name3:
continue
if len(sub_dataframes[name][name2]) < 1 or len(sub_dataframes[name][name3]) < 1:
continue
if len(results.loc[(results["Factor1"] == str(second_factor+": "+str(name3))) & (results["Axis"] == name)]) > 0: # Avoid duplicate
continue
if len(sub_dataframes[name][name2]) == len(sub_dataframes[name][name3]):
test_value, pvalue = stats.wilcoxon(sub_dataframes[name][name2], sub_dataframes[name][name3])
test_method = "Wilcoxon"
else:
test_value, pvalue = stats.mannwhitneyu(sub_dataframes[name][name2], sub_dataframes[name][name3])
test_method = "Mann-Whitney"
if pvalue >= 0.05:
symbol = "ns"
elif pvalue >= 0.01:
symbol = "*"
elif pvalue >= 0.001:
symbol = "**"
else:
symbol = "***"
results.loc[len(results)] = {"Compound": y, "Axis": name,
"Factor1": str(second_factor+": "+str(name2)), "Sample size1": len(sub_dataframes[name][name2]),
"Factor2": str(second_factor+": "+str(name3)), "Sample size2": len(sub_dataframes[name][name3]),
"Statistic": test_value, "Pvalue": pvalue, "Significance": symbol, "Method": test_method}
if multiple_correction:
pvals_before_correction = results["Pvalue"].to_numpy()
reject, pvals_after_correction, _, __ = multipletests(pvals_before_correction, method = correction_method)
results["Pvalue corrected"] = pvals_after_correction
results["Significance corrected"] = results["Pvalue corrected"].apply(lambda x:get_significance_symbol(x))
results["Correction method"] = correction_method
return results
[docs]
def get_significance_symbol(pval: float) -> str:
"""Return Significance symbol depending on pvalue given.
Args:
pval (float): Pvalue of the test
Returns:
str: Significance's symbol
"""
if pval >= 0.05:
return "ns"
elif pval >= 0.01:
return "*"
elif pval >= 0.001:
return "**"
else:
return "***"
[docs]
def build_pcoa_dataframe(save_path: Path) -> pd.DataFrame:
"""Compute Principal Coordinate Analysis from the main_dataframe given in input. Merge with metadata from the smplID column or index.
Args:
main_dataframe (pd.DataFrame): Dataframe from which the pcoa will be made.
metadata (pd.DataFrame): Metadata dataframe (Must have smplID identifer column for the merge to work)
Returns:
pd.DataFrame: Pcoa dataframe with sample ID as index, PC1 and PC2 results and all metadata.
"""
if file_exist("pcoa_dataframe_postaviz.tsv", save_path):
print("Pcoa dataframe already in save directory.")
return
main_dataframe = open_tsv(Path(save_path, "main_dataframe_postaviz.tsv"))
metadata = pd.read_parquet(Path(save_path, "metadata_dataframe_postaviz.parquet.gzip"))
if not is_indexed_by_id(main_dataframe):
main_dataframe = main_dataframe.set_index("smplID")
if is_indexed_by_id(metadata):
metadata = metadata.reset_index("smplID")
dmatrix = main_dataframe.to_numpy()
dist_m = pdist(dmatrix, "jaccard")
square_df = squareform(dist_m)
pcoa_result = pcoa(square_df,number_of_dimensions=2)
coordinate = pcoa_result.samples
df_pcoa = coordinate[["PC1","PC2"]]
df_pcoa["smplID"] = main_dataframe.index.to_numpy()
df_pcoa = df_pcoa.merge(metadata, "inner", "smplID")
df_pcoa.set_index("smplID",inplace=True)
df_pcoa.to_csv(Path(save_path,"pcoa_dataframe_postaviz.tsv"),sep="\t")
return
[docs]
def correlation_test(value_array, factor_array, factor_name, method:str = "pearson"):
if method == "pearson":
res = stats.pearsonr(value_array, factor_array)
else:
res = stats.spearmanr(value_array, factor_array)
if res.pvalue >= 0.05:
symbol = "ns"
elif res.pvalue >= 0.01:
symbol = "*"
elif res.pvalue >= 0.001:
symbol = "**"
else:
symbol = "***"
return pd.DataFrame([[factor_name, len(value_array), res.statistic, res.pvalue, symbol, method]],columns=["Factor", "Sample size", "Statistic", "Pvalue", "Significance", "Method"])
[docs]
def taxonomy_processing(taxonomy_filepath: Path, save_path: Path):
"""Open and save taxonomy file.
Args:
taxonomy_filepath (str): TSV or TXT format
Raises:
RuntimeError: Wrong file's format
Returns:
pd.DataFrame: Pandas dataframe
"""
if file_exist("taxonomic_dataframe_postaviz.tsv", save_path):
print("Taxonomic dataframe already exist in save directory.")
return
print("Building taxonomic dataframe...")
if taxonomy_filepath.suffix == ".tsv":
df = open_tsv(taxonomy_filepath)
df = df.rename(columns={df.columns[0]: "mgs"})
df.to_csv(Path(save_path,"taxonomic_dataframe_postaviz.tsv"), sep="\t", index=False)
return
if not taxonomy_filepath.suffix == ".txt":
raise RuntimeError("Taxonomy file must be either a txt file or tsv file.")
with open(taxonomy_filepath) as f:
lines = f.readlines()
df = pd.DataFrame(columns=["mgs","kingdom","phylum","class","order","family","genus"])
del lines[0] # Delete header line
for row in lines:
mgs = row.split("\t")[0]
genus = row.split("\t")[1:]
k, p, c, o, f, g = genus[0].strip("\n").split(";")
df.loc[len(df)] = [mgs,k,p,c,o,f,g]
df.to_csv(Path(save_path,"taxonomic_dataframe_postaviz.tsv"), sep="\t", index=False)
print("Taxonomic dataframe done and saved.")
[docs]
def bin_dataframe_build(scope_directory: Path, scope_mode: str = "cscope", abundance_path: Optional[Path] = None, taxonomy_path: Optional[Path] = None, savepath: Optional[Path] = None):
"""Build a large dataframe with all the bins of the different samples as index, the dataframe contain the list of production, the abundance fot he bin in the sample
and the count of production with or without abundance.
Args:
sample_info (dict): _description_
sample_data (dict): _description_
metadata (Dataframe): _description_
abundance_file (Dataframe, optional): _description_. Defaults to None.
taxonomy_path (Dataframe, optional): _description_. Defaults to None.
Returns:
pd.DataFrame: Pandas dataframe
"""
if file_exist("bin_dataframe_chunk_1.parquet.gzip",savepath):
print("Chunk of bin_dataframe already in save directory")
return
print("Building bin dataframe...")
warnings.simplefilter(action='ignore', category=pd.errors.PerformanceWarning)
start = time.time()
cpd_index = pd.read_csv(Path(savepath,"compounds_index.tsv"), sep="\t").squeeze()
##### Abundance normalisation, give percentage of abundance of bins in samples.
if abundance_path is not None:
abundance_file_normalised = pd.read_csv(Path(savepath,"abundance_file_normalised.tsv"),sep="\t",index_col=0)
if taxonomy_path is not None:
taxonomy_file = open_tsv(Path(savepath,"taxonomic_dataframe_postaviz.tsv"))
##### Checks if taxonomy file has default index which mean it is not indexed. TEMPORARY until found better way to deal with open/save from -t option OR load taxonomic_df option which return non indexed / indexed df
if not pd.Index(np.arange(0, len(taxonomy_file))).equals(taxonomy_file.index):
taxonomy_file = taxonomy_file.reset_index()
mgs_col_taxonomy = taxonomy_file.columns[0]
sample_unique_list = list(scope_directory.iterdir())
##### Create Generator to process data by chunks.
print("Making chunk of sample list...")
chunk_generator = chunks(sample_unique_list, 250)
##### Loop throught generator
chunk_index = 0
for current_chunk in chunk_generator:
chunk_index += 1
list_of_dataframe = []
logs = []
for sample in current_chunk:
sample_id = sample.name.split(".parquet")[0]
try:
df = pd.read_parquet(Path(scope_directory, sample))
df.insert(0 , "smplID", sample_id)
df.index.name = "binID"
list_of_dataframe.append(df)
except Exception as e:
logs.append(f"No dataframe named {sample} in scope_directory.\n{e}")
results = pd.concat(list_of_dataframe)
results.fillna(0,inplace=True)
tmp = results.apply(lambda x: x.index[x == 1].tolist(), axis=1)
results["Production"] = tmp.apply(lambda value: get_cpd_index(cpd_index, value))
results["Count"] = results.apply(lambda x: len(x.index[x == 1].tolist()), axis=1)
results = results[["smplID","Production","Count"]]
del tmp
if abundance_path is not None: # If abundance is provided, multiply each Count column with the relative abundance of the bins in their samples.
abundance = results.apply(lambda row: abundance_file_normalised.at[row.name,row["smplID"]],axis=1)
abundance.name = "Abundance"
final_result = pd.concat([results,abundance], axis=1)
del results
final_result["Count_with_abundance"] = final_result["Count"] * final_result["Abundance"]
else: # Seems quite useless but felt cute might deleted later.
final_result = results
del results
final_result = final_result.reset_index() # .merge(metadata, "inner", "smplID")
if taxonomy_path is not None: # If taxonomy is provided, merge the dataframe with the taxonomic_dataframe.
final_result = final_result.merge(taxonomy_file, "inner", left_on="binID", right_on=mgs_col_taxonomy)
##### Save current chunks into parquet file
filename = "bin_dataframe_chunk_"+str(chunk_index)+"-"+str(scope_mode)+".parquet.gzip"
filepath = Path(savepath,filename)
if len(final_result) == 0:
print(f"Chunks {chunk_index} is empty !")
final_result.to_parquet(filepath, compression="gzip")
del final_result
print("Took: ", time.time() - start, "Before saving")
if len(logs) != 0:
with open(Path(savepath,"bin_dataframe_logs.txt"),"w") as log_file:
for line in logs:
log_file.write(f"{line}\n")
return
[docs]
def chunks(lst, n):
"""Yield successive n-sized chunks from list."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
[docs]
def padmet_to_tree(save_path: Path, metacyc_file_path: Path):
"""Build a tree to be used in the Shiny application.
Allow the user to select directly a compounds or a category of compounds and fill a list
with all the compounds corresponding to that category.
Use the function build_parent_child_dataframe to create a 2 columns (child_id/parent_id) dataframe.
With the relation dataframe, build the tree using build_tree_from_root.
Args:
save_path (str): Path of the save directory.
"""
if file_exist("padmet_compounds_category_tree.json",save_path):
print("Padmet category tree already exist.")
return
print("Building compounds category tree...")
padmet = PadmetRef(str(metacyc_file_path))
cpd_id = [node.id for node in padmet.dicOfNode.values() if node.type == "compound"]
# classes_id = [node.id for node in padmet.dicOfNode.values() if node.type == "class"]
df = pd.DataFrame(columns=["child_id", "parent_id"])
for mid in cpd_id:
build_parent_child_dataframe(padmet, df, mid)
if mid not in df["child_id"].values:
print("ERROR for : ",mid)
root = {}
root["FRAMES"] = {}
build_tree_from_root(root["FRAMES"], "FRAMES", df)
root["All_metabolites"] = root.pop("FRAMES")
with open(Path(save_path, "padmet_compounds_category_tree.json"), "w") as fp:
json.dump(root, fp)
print("Compounds category tree done.")
[docs]
def build_parent_child_dataframe(padmet: PadmetRef, dataframe: pd.DataFrame, current_id, child_column = "child_id", parent_column = "parent_id"):
"""Build a child /parent relation dataframe between compounds category from a metacyc database in a padmet file format.
Args:
padmet (PadmetRef): PadmetRef object from padmet package.
dataframe (pd.DataFrame): Transmission of the dataframe between recursive call.
current_id (_type_): ID of the current category / compound.
child_column (str, optional): Column label of the child column of the dataframe. Defaults to "child_id".
parent_column (str, optional): Column label of the parent column of the dataframe. Defaults to "parent_id".
"""
# current_id can be a Compound id or a class id children of the cpd_id / class_id.
# Check of current_id is already in child column. STOP condition.
if current_id in dataframe[child_column].values:
return
# Get list of relations of current_id
try:
rlt_classes = [rlt.id_out for rlt in padmet.dicOfRelationIn[current_id] if rlt.type == "is_a_class"]
except KeyError as e:
print(e)
return
if len(rlt_classes) == 0:
return
# Loop in classes id children of cpd_id OR one of its classes id.
for rlt_c in rlt_classes:
# If current_id is NOT in child columns. Write current_id in child column with its FIRST child as parent in parent column.
if current_id not in dataframe[child_column].values:
dataframe.loc[len(dataframe)] = {child_column : current_id, parent_column : rlt_c}
# If the current child is already in child column pass to the next.
if rlt_c in dataframe[child_column].values:
continue
# Then continue with the first child of current_id who is not in child column. Until no more child then pass to next CPD_ID.
build_parent_child_dataframe(padmet, dataframe = dataframe, current_id = rlt_c)
[docs]
def build_tree_from_root(node, id, df):
"""Build a tree from a dataframe and a dictionary with the first key as root.
The first key is the first parent node from which the tree will be built starting with its first child.
Any node that is not connected indirectly with the root node won't be in the tree.
Example : root = {}
root["FRAMES"] = {}
build_tree_from_root(root["FRAMES"], "FRAMES", dataframe)
Args:
node (dict): root node
id (str): Root node key id, correspond to the string of the first node in the dataframe.
df (pd.DataFrame): Dataframe with 2 columns: columns names must be child_id and parent_id. child_id column has only unique values.
"""
children = df.loc[df["parent_id"] == id]["child_id"].tolist()
if len(children) == 0:
return
for child in children:
node[child] = {}
build_tree_from_root(node[child], child, df)
[docs]
def build_compounds_index(save_path: Path):
main_dataframe = pd.read_csv(Path(save_path, "main_dataframe_postaviz.tsv"), sep="\t")
cpd_list = main_dataframe.columns.tolist()
if "smplID" in cpd_list:
cpd_list.remove("smplID")
cpd_index = pd.Series(cpd_list, range(len(cpd_list)))
saving_path = Path(save_path, "compounds_index.tsv")
cpd_index.to_csv(saving_path, sep="\t", index=False)
[docs]
def get_cpd_label(cpd_index: pd.Series, cpd_list_index):
return cpd_index[cpd_list_index]
[docs]
def get_cpd_index(cpd_index: pd.Series, cpd_list_label):
return cpd_index.index[cpd_index.isin(cpd_list_label)].tolist()
[docs]
def concat_chunk(chunk_dir: Path, save_path: Path, scope_type: str):
"""Concatenation of all sub_dataframes produced.
Args:
chunk_dir (Path): Directory path where the chunk are.
save_path (Path): Save result path
scope_type (str): Cscope or Iscope
"""
filename = "producers_"+scope_type+"_dataframe.parquet.gzip"
df = None
for chunk in chunk_dir.iterdir():
if df is None:
df = pl.read_parquet(chunk)
else:
tmp_df = pl.read_parquet(chunk)
df = pl.concat([df,tmp_df], how="diagonal")
df = df.fill_null(0)
df.write_parquet(Path(save_path,filename), compression="gzip")
[docs]
def sum_and_concat_by_chunk(directory_path: Path):
"""Produce dataframe from chunk of 250 samples, BETTER memory usage small performance price.
Args:
directory_path (Path): _description_
"""
tmp_dir = tempfile.TemporaryDirectory(directory_path.name)
tmp_dir_path = Path(tmp_dir.name)
sample_unique_list = list(directory_path.iterdir())
chunk_generator = chunks(sample_unique_list, 250)
chunk_index = 0
for current_chunk in chunk_generator:
chunk_index += 1
res = []
for sample in current_chunk:
sample_id = sample.name.split(".parquet")[0]
sample_dataframe = pl.read_parquet(sample)
sample_dataframe = sample_dataframe.sum()
sample_dataframe._replace("smplID", pl.Series("smplID",[sample_id])) # replace_column(-1, pl.Series("smplID",[sample_id])) Could also replace smplID col by index since polar put index from parquet to the last column.
res.append(sample_dataframe)
final_dataframe = pl.concat(res, how="diagonal")
final_dataframe = final_dataframe.drop("smplID").insert_column(0, final_dataframe.get_column("smplID")) # Not necessary at all but i'm used to smplID col index with pandas.
final_dataframe = final_dataframe.fill_null(0)
filename = "producers_dataframe_chunk_"+str(chunk_index)+".parquet.gzip"
final_dataframe.write_parquet(Path(tmp_dir_path,filename), compression = "gzip")
return tmp_dir_path, tmp_dir
[docs]
def producers_dataframe(scope_directory: Path, save_path: Path, scope_type: str):
tmp_chunks_dir_path, tmp_dir = sum_and_concat_by_chunk(scope_directory)
concat_chunk(tmp_chunks_dir_path, save_path, scope_type)
tmp_dir.cleanup()
[docs]
def iscope_cscope_fill_difference(save_path: Path):
iscope_df = pl.read_parquet(Path(save_path, "producers_iscope_dataframe.parquet.gzip"))
cscope_df = pl.read_parquet(Path(save_path, "producers_cscope_dataframe.parquet.gzip"))
ccol = cscope_df.columns
icol = iscope_df.columns
columns_difference = list(set(ccol) - set(icol))
for col in columns_difference:
iscope_df = iscope_df.with_columns(pl.lit(0).alias(col))
iscope_df.write_parquet(Path(save_path, "producers_iscope_dataframe.parquet.gzip"), compression="gzip")
[docs]
def concat_bin_dataframe(save_path: Path, scope_mode: str = "cscope"):
df = None
for file in save_path.iterdir():
if file.stem.startswith("bin_dataframe_chunk") and file.stem.split("-")[1].split(".")[0] == scope_mode: # Check if file is a bin_dataframe_chunk AND check the scope of this file.
if df is None:
df = pl.read_parquet(file)
file.unlink()
else:
tmp_df = pl.read_parquet(file)
df = pl.concat([df, tmp_df], how="diagonal")
file.unlink()
print("Size of the full bin_dataframe: ",df.estimated_size("gb")," Gb")
df.write_parquet(Path(save_path, f"bin_dataframe_{scope_mode}.parquet.gzip"), compression="gzip")