From ac05a440f4810915bd2588b20c9111c48e789d3c Mon Sep 17 00:00:00 2001 From: iquasere Date: Wed, 8 Sep 2021 11:58:00 +0100 Subject: [PATCH] Updated dependencies They were not enough in docker Fonts problem of pdftoppm fixed ("Helvetica not found...") Also improved imports --- kegg_charter.py | 152 ++++++++++++++++++++------------------------ kegg_pathway_map.py | 4 +- meta.yaml | 5 +- 3 files changed, 74 insertions(+), 87 deletions(-) diff --git a/kegg_charter.py b/kegg_charter.py index 70d78a8..47c9918 100644 --- a/kegg_charter.py +++ b/kegg_charter.py @@ -1,38 +1,34 @@ #!/usr/bin/env python -import PIL -import argparse +from PIL import Image +from argparse import ArgumentParser import numpy as np import os import pandas as pd -import pathlib -import re -import subprocess +from pathlib import Path +from re import search +from subprocess import run import sys from io import StringIO -from time import gmtime, strftime -import time -import matplotlib.pyplot as plt +from time import time, gmtime, strftime, sleep from Bio.KEGG.REST import kegg_link, kegg_list, kegg_get -from matplotlib import colors, cm -from progressbar import ProgressBar -import glob from Bio.KEGG.KGML import KGML_parser +from matplotlib import pyplot as plt, colors, cm +from tqdm import tqdm +from glob import glob from kegg_pathway_map import KEGGPathwayMap -__version__ = "0.2.4" +__version__ = "0.2.5" def get_arguments(): - parser = argparse.ArgumentParser( + parser = ArgumentParser( description="""KEGGCharter - A tool for representing genomic potential and transcriptomic expression into KEGG pathways""", epilog="Input file must be specified.") parser.add_argument("-o", "--output", type=str, help="Output directory", default='KEGGCharter_results') - parser.add_argument("--tsv", action="store_true", default=False, - help="Results will be outputed in TSV format (and not EXCEL).") parser.add_argument("-rd", "--resources-directory", type=str, default=sys.path[0], help="Directory for storing KGML and CSV files.") parser.add_argument("-mm", "--metabolic-maps", type=str, @@ -67,7 +63,7 @@ def get_arguments(): special_functions = parser.add_argument_group('Special functions') special_functions.add_argument( "--show-available-maps", action="store_true", default=False, - help="""Outputs KEGG maps IDs and descriptions to the console (so you may pick the ones you want!)""") + help="Outputs KEGG maps IDs and descriptions to the console (so you may pick the ones you want!)") args = parser.parse_args() @@ -75,7 +71,7 @@ def get_arguments(): for directory in [args.output]: if not os.path.isdir(directory): - pathlib.Path(directory).mkdir(parents=True, exist_ok=True) + Path(directory).mkdir(parents=True, exist_ok=True) print('Created ' + directory) return args @@ -131,13 +127,13 @@ def taxa_colors(hex_values=None, ncolor=1): return [colors.to_hex(color_scheme(i)) for i in range(ncolor)] for hex_value in hex_values: - if not re.search(r'^#(?:[0-9a-fA-F]{3}){1,2}$', hex_value): + if not search(r'^#(?:[0-9a-fA-F]{3}){1,2}$', hex_value): sys.exit(Exception("Colors aren't valid hex codes")) return hex_values # has validated hex values and returns the original list # Conversion functions -def id2id(input_ids, column, output_column, output_ids_type, step=150): +def id2id(input_ids, column, output_column, output_ids_type, step=150, desc=''): """ Converts KEGG_ID genes to Ortholog KO ID from KEGG :param kegg_ids: (list) - KEGG ID genes @@ -146,14 +142,13 @@ def id2id(input_ids, column, output_column, output_ids_type, step=150): :return: (list) - (list,list) - KEGG ID genes converted and ko IDs """ result = pd.DataFrame(columns=[column, output_column]) - pbar = ProgressBar() - for i in pbar(range(0, len(input_ids), step)): + for i in tqdm(range(0, len(input_ids), step), desc=desc): j = min(i + step, len(input_ids)) try: result = pd.concat([result, pd.read_csv( kegg_link(output_ids_type, input_ids[i:j]), sep='\t', names=[column, output_column])]) except: - print('IDs conversion broke at index: {}'.format(i)) + print(f'IDs conversion broke at index: {i}') if output_ids_type == 'ko': result[output_column] = result[output_column].apply(lambda x: x.strip('ko:')) result[column] = result[column].apply(lambda x: x.strip('ec:')) @@ -165,25 +160,23 @@ def id2id(input_ids, column, output_column, output_ids_type, step=150): def ids_interconversion(data, column, ids_type='kegg'): ids = list(set(data[data[column].notnull()][column])) - message = 'Converting %i %s to %s through the KEGG API.' + base_desc = 'Converting %i %s to %s through the KEGG API' if ids_type == 'kegg': # sometimes Kegg IDs come as mfc:BRM9_0145;mfi:DSM1535_1468; (e.g. from UniProt IDs mapping). # Should be no problem since both IDs likely will always represent same functions trimmed_ids = [ide.split(';')[0] for ide in ids] relational = pd.DataFrame([ids, trimmed_ids]).transpose() - timed_message(message % (len(ids), 'KEGG IDs', 'KOs')) - new_ids = id2id(trimmed_ids, column, 'KO (KEGGCharter)', 'ko') + new_ids = id2id(trimmed_ids, column, 'KO (KEGGCharter)', 'ko', desc=base_desc % (len(ids), 'KEGG IDs', 'KOs')) new_ids = pd.merge(new_ids, relational, left_on=column, right_on=1) # mcj:MCON_3003; mcj:MCON_3003 del new_ids[column] # mcj:MCON_3003 K07486 mcj:MCON_3003; mcj:MCON_3003 del new_ids[1] new_ids.columns = ['KO (KEGGCharter)', column] elif ids_type == 'ko': - timed_message(message % (len(ids), 'KOs', 'EC numbers')) - new_ids = id2id(ids, column, 'EC number (KEGGCharter)', 'enzyme') + new_ids = id2id( + ids, column, 'EC number (KEGGCharter)', 'enzyme', desc=base_desc % (len(ids), 'KOs', 'EC numbers')) elif ids_type == 'ec': - ids = ['ec:{}'.format(ide) for ide in ids] - timed_message(message % (len(ids), 'EC numbers', 'KOs')) - new_ids = id2id(ids, column, 'KO (KEGGCharter)', 'ko') + ids = [f'ec:{ide}' for ide in ids] + new_ids = id2id(ids, column, 'KO (KEGGCharter)', 'ko', desc=base_desc % (len(ids), 'EC numbers', 'KOs')) return pd.merge(data, new_ids, on=column, how='left') @@ -265,8 +258,8 @@ def add_blank_space(image_pil, width, height, image_mode='RGB'): resize_width = round(ratio_h * image_pil.width) resize_height = height image_resize = image_pil.resize((resize_width, resize_height), - PIL.Image.ANTIALIAS) - background = PIL.Image.new('RGB', (width, height), (255, 255, 255, 255)) + Image.ANTIALIAS) + background = Image.new('RGB', (width, height), (255, 255, 255, 255)) offset = (round((width - resize_width) / 2), round((height - resize_height) / 2)) background.paste(image_resize, offset) @@ -283,9 +276,9 @@ def resize_image(image_pil, ratio=None, width=None, height=None): """ if ratio: return image_pil.resize((image_pil.width * ratio, - image_pil.height * ratio), PIL.Image.ANTIALIAS) + image_pil.height * ratio), Image.ANTIALIAS) elif width and height: - return image_pil.resize((width, height), PIL.Image.ANTIALIAS) + return image_pil.resize((width, height), Image.ANTIALIAS) else: return None @@ -295,21 +288,19 @@ def pdf2png(pdf_filename): Converts a pdf file to a png file, RGB format - name changes, .pdf to .png :param pdf_filename: str - filename of PDF file """ - bash_command = 'pdftoppm {} {} -png'.format(pdf_filename, pdf_filename.split('.pdf')[0]) - subprocess.run(bash_command.split()) + bash_command = f'pdftoppm {pdf_filename} {pdf_filename.split(".pdf")[0]} -png' + run(bash_command.split()) os.rename(pdf_filename.replace('.pdf', '-1.png'), pdf_filename.replace('.pdf', '.png')) def add_legend(kegg_map_file, legend_file, output): """ - Merges the two files - KEGG metabolic map and respective legend - into - one file file + Merges the two files - KEGG metabolic map and respective legend - into one file :param kegg_map_file: str - filename of PDF kegg metabolic map :param legend_file: str - filename of PNG legend """ pdf2png(kegg_map_file) - imgs = [PIL.Image.open(file) for file in - [kegg_map_file.replace('.pdf', '.png'), legend_file]] + imgs = [Image.open(file) for file in [kegg_map_file.replace('.pdf', '.png'), legend_file]] imgs[0] = imgs[0].convert( 'RGB') # KEGG Maps are converted to RGB by pdftoppm, dunno if converting to RGBA adds any transparency imgs[1] = resize_image(imgs[1], ratio=5) @@ -317,7 +308,7 @@ def add_legend(kegg_map_file, legend_file, output): imgs_comb = np.hstack([np.asarray(i) for i in imgs]) # save that beautiful picture - imgs_comb = PIL.Image.fromarray(imgs_comb) + imgs_comb = Image.fromarray(imgs_comb) imgs_comb.save(output) for file in [kegg_map_file, kegg_map_file.replace('.pdf', '.png'), legend_file]: os.remove(file) @@ -390,7 +381,7 @@ def genomic_potential_taxa(kegg_pathway_map, data, samples, dic_colors, ko_colum kegg_pathway_map.grey_boxes(grey_boxes) name = kegg_pathway_map.pathway.name.split(':')[-1] - name_pdf = '{}_{}.pdf'.format(output_basename, name) + name_pdf = f'{output_basename}_{name}.pdf' kegg_pathway_map.to_pdf(name_pdf) # a new taxon: "Other taxa" @@ -440,7 +431,7 @@ def differential_expression_sample(kegg_pathway_map, data, samples, ko_column, kegg_pathway_map.pathway_boxes_differential(df, log) name = kegg_pathway_map.pathway.name.split(':')[-1] - name_pdf = '{}_{}.pdf'.format(output_basename, name) + name_pdf = f'{output_basename}_{name}.pdf' kegg_pathway_map.to_pdf(name_pdf) differential_colorbar(df, name_pdf.replace(".pdf", '_legend.png')) @@ -451,76 +442,68 @@ def differential_expression_sample(kegg_pathway_map, data, samples, ko_column, return 0 -def write_results(data, output_dir, output_type='excel'): - if output_type == 'excel': - data.to_excel('{}/KEGGCharter_results.xlsx'.format(output_dir), index=False) - else: - data.to_csv('{}/KEGGCharter_results.tsv'.format(output_dir), sep='\t', index=False) - - def write_kgml(mmap, out_dir): data = kegg_get("ko" + mmap, "kgml").read() - with open('{}/map{}.xml'.format(out_dir, mmap), 'w') as f: + with open(f'{out_dir}/map{mmap}.xml', 'w') as f: if len(data) > 1: f.write(data) def write_kgmls(mmaps, out_dir, max_tries=3): - print('[{}] maps inputted'.format(len(mmaps))) - maps_done = [filename.split('/map')[-1].rstrip('.xml') for filename in glob.glob('{}/*.xml'.format(out_dir))] - print('[{}] KGMLs already obtained'.format(len(maps_done))) + print(f'[{len(mmaps)}] maps inputted') + maps_done = [filename.split('/map')[-1].rstrip('.xml') for filename in glob(f'{out_dir}/*.xml')] + print(f'[{len(maps_done)}] KGMLs already obtained') mmaps = [map for map in mmaps if map not in maps_done] i = 1 for mmap in mmaps: - print('[{}/{}] Obtaining KGML for: {}'.format(i, len(mmaps), mmap)) + print(f'[{i}/{len(mmaps)}] Obtaining KGML for: {mmap}') tries = 0 done = False while tries < max_tries and not done: try: write_kgml(mmap, out_dir) done = True - print('[{}]: success'.format(mmap)) + print(f'[{mmap}]: success') except: - if os.path.isfile('{}/map{}.xml'.format(out_dir, mmap)): - os.remove('{}/map{}.xml'.format(out_dir, mmap)) + if os.path.isfile(f'{out_dir}/map{mmap}.xml'): + os.remove(f'{out_dir}/map{mmap}.xml') tries += 1 - print('[{}]: failure'.format(mmap)) + print(f'[{mmap}]: failure') i += 1 -def set_text_boxes_kgml(kgml_filename): +def set_text_boxes_kgml(kgml_filename, desc=''): handler = KGML_parser.read(open(kgml_filename)) # Set text in boxes to EC numbers - pbar = ProgressBar() with open(kgml_filename.replace('xml', 'csv'), 'w') as f: - for ortholog_rec in pbar(handler.orthologs): + for ortholog_rec in tqdm(handler.orthologs, desc=desc): lines = list() kos = ortholog_rec.name.split() lines += kegg_link("enzyme", kos).read().split('\n') ecs = [line.split('\t')[1] for line in lines if len(line) > 0] - f.write('{}\n'.format(','.join(ecs))) + f.write(f'{",".join(ecs)}\n') def set_text_boxes_kgmls(mmaps, out_dir, max_tries=3): - maps_done = [filename.split('/map')[-1].rstrip('.csv') for filename in glob.glob('{}/*.csv'.format(out_dir))] + maps_done = [filename.split('/map')[-1].rstrip('.csv') for filename in glob(f'{out_dir}/*.csv')] print(f"[{len(maps_done)}] maps already have boxes' text set") mmaps = [map for map in mmaps if map not in maps_done] i = 1 for mmap in mmaps: - print(f'[{i}/{len(mmaps)}] Getting EC numbers for: {mmap}') tries = 0 done = False while tries < max_tries and not done: try: - set_text_boxes_kgml('{}/map{}.xml'.format(out_dir, mmap)) + set_text_boxes_kgml( + f'{out_dir}/map{mmap}.xml', desc=f'[{i}/{len(mmaps)}] Getting EC numbers for: {mmap}') done = True except: - print('\nFailed for map. Attempt: {}'.format(tries + 1)) - if os.path.isfile('{}/map{}.csv'.format(out_dir, mmap)): - os.remove('{}/map{}.csv'.format(out_dir, mmap)) + print(f'\nFailed for map. Attempt: {tries + 1}') + if os.path.isfile(f'{out_dir}/map{mmap}.csv'): + os.remove(f'{out_dir}/map{mmap}.csv') tries += 1 - time.sleep(10) + sleep(10) i += 1 @@ -560,8 +543,8 @@ def main(): args.ko_column if args.ko_column is not None else args.ec_column) data = condense_data(data, main_column) - write_results(data, args.output, output_type=('tsv' if args.tsv else 'excel')) - timed_message('Results saved to {}/KEGGCharter_results.{}'.format(args.output, 'tsv' if args.tsv else 'xlsx')) + data.to_csv(f'{args.output}/KEGGCharter_results.tsv', sep='\t', index=False) + timed_message(f'Results saved to {args.output}/KEGGCharter_results.tsv') ko_column = 'KO (KEGGCharter)' # TODO - set ko_column to user defined value @@ -578,7 +561,7 @@ def main(): # Begin dat chart magic metabolic_maps = args.metabolic_maps.split(',') - timed_message('Creating KEGG Pathway representations for {} metabolic pathways.'.format(len(metabolic_maps))) + timed_message(f'Creating KEGG Pathway representations for {len(metabolic_maps)} metabolic pathways.') write_kgmls(metabolic_maps, args.resources_directory) set_text_boxes_kgmls(metabolic_maps, args.resources_directory) @@ -600,35 +583,36 @@ def main(): args.transcriptomic_columns = args.transcriptomic_columns.split(',') for i in range(len(metabolic_maps)): - pathway = KGML_parser.read(open('{}/map{}.xml'.format(args.resources_directory, metabolic_maps[i]))) + pathway = KGML_parser.read(open(f'{args.resources_directory}/map{metabolic_maps[i]}.xml')) - with open('{}/map{}.csv'.format(args.resources_directory, metabolic_maps[i])) as f: + with open(f'{args.resources_directory}/map{metabolic_maps[i]}.csv') as f: ec_list = f.read().split('\n') if len(pathway.orthologs) != len(ec_list) - 1: # -1 because of newline at the end - print('Number of ECs was different from the number of orthologs for map [{}]. ' - 'Will retrieve the right files now.'.format(metabolic_maps[i])) + print(f'Number of ECs was different from the number of orthologs for map [{metabolic_maps[i]}]. ' + f'Will retrieve the right files now.') write_kgml(metabolic_maps[i], args.resources_directory) - set_text_boxes_kgml('f{args.resources_directory}/map{metabolic_maps[i]}.xml') + set_text_boxes_kgml(f'{args.resources_directory}/map{metabolic_maps[i]}.xml') - with open('{}/map{}.csv'.format(args.resources_directory, metabolic_maps[i])) as f: + with open(f'{args.resources_directory}/map{metabolic_maps[i]}.csv') as f: ec_list = f.read().split('\n') - timed_message('[{}/{}] {}'.format(i + 1, len(metabolic_maps), pathway.title)) + timed_message(f'[{i + 1}/{len(metabolic_maps)}] {pathway.title}') chart_map(f'{args.resources_directory}/map{metabolic_maps[i]}.xml', ec_list, data, output=args.output, ko_column=ko_column, taxa_column=args.taxa_column, dic_colors=dic_colors, genomic_columns=args.genomic_columns, transcriptomic_columns=args.transcriptomic_columns) + + # TODO - implement multiprocessing for map generation? ''' - with multiprocessing.Pool() as p: p.starmap(chart_map, [(handler, data, args.output, ko_column, ec_column, args.taxa_column, dic_colors, args.genomic_columns, args.transcriptomic_columns, - '{}/failed_maps.txt'.format(args.output)) for handler in pathways]) + f'{args.output}/failed_maps.txt') for handler in pathways]) ''' if __name__ == '__main__': - start_time = time.time() + start_time = time() main() - print('KEGGCharter analysis finished in {}'.format(strftime("%Hh%Mm%Ss", gmtime(time.time() - start_time)))) + print(f'KEGGCharter analysis finished in {strftime("%Hh%Mm%Ss", gmtime(time() - start_time))}') diff --git a/kegg_pathway_map.py b/kegg_pathway_map.py index 5f0699c..2dc78f8 100644 --- a/kegg_pathway_map.py +++ b/kegg_pathway_map.py @@ -3,7 +3,7 @@ from Bio.KEGG.KGML import KGML_parser, KGML_pathway from Bio.Graphics.KGML_vis import KGMLCanvas from matplotlib import colors, cm -import time +from time import sleep def set_bgcolor(pathway_element, color): @@ -156,7 +156,7 @@ def to_pdf(self, filename, imagemap=True, orthologs=True, compounds=True, maps=T drawn = True except: print('Draw failed! Waiting 10 secs...') - time.sleep(10) + sleep(10) def pathway_box_list(self, taxa_in_box, dic_colors, maxshared=10): """ diff --git a/meta.yaml b/meta.yaml index 0186cbd..ec8dd41 100644 --- a/meta.yaml +++ b/meta.yaml @@ -24,10 +24,13 @@ build: requirements: run: - matplotlib-base - - progressbar + - tqdm - pandas - biopython - reportlab + - openpyxl + - poppler + - mscorefonts test: commands: