Skip to content

Commit

Permalink
collect outputs (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfornika authored Jun 19, 2024
1 parent 257a00e commit 74b8048
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 59 deletions.
9 changes: 7 additions & 2 deletions bin/fastp_json_to_csv.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#!/usr/bin/env python

import argparse
import csv
import json
import sys

def main(args):
with open(args.fastp_json, 'r') as f:
Expand Down Expand Up @@ -57,7 +59,6 @@ def main(args):
output_fields = ['sample_id'] + output_fields
output_data = [args.sample_id]

print(",".join(output_fields))
output_data = output_data + [
total_reads_before_filtering,
total_reads_after_filtering,
Expand All @@ -80,7 +81,11 @@ def main(args):
adapter_trimmed_reads,
adapter_trimmed_bases,
]
print(",".join(map(str, output_data)))
output_row = dict(zip(output_fields, output_data))
writer = csv.DictWriter(sys.stdout, fieldnames=output_fields, dialect='unix', quoting=csv.QUOTE_MINIMAL, extrasaction='ignore')
writer.writeheader()
writer.writerow(output_row)



if __name__ == "__main__":
Expand Down
5 changes: 3 additions & 2 deletions bin/join_mob_typer_and_abricate_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ def main(args):
"percent_ref_plasmid_coverage_above_depth_threshold",
"num_snps_vs_ref_plasmid",
]
csv.register_dialect('unix-tab', delimiter='\t', doublequote=False, lineterminator='\n', quoting=csv.QUOTE_MINIMAL)
writer = csv.DictWriter(sys.stdout, fieldnames=output_fieldnames, dialect='unix-tab')

writer = csv.DictWriter(sys.stdout, fieldnames=output_fieldnames, dialect='excel-tab', quoting=csv.QUOTE_MINIMAL, extrasaction='ignore')

writer.writeheader()


Expand Down
11 changes: 10 additions & 1 deletion bin/parse_quast_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@

def parse_transposed_quast_report(transposed_quast_report_path):
"""
Parse a transposed QUAST report file and return a list of dictionaries, one for each assembly.
:param transposed_quast_report_path: Path to the transposed QUAST report file.
:return: List of dictionaries, one for each assembly. Keys:
['assembly_id', 'total_length', 'num_contigs', 'largest_contig', 'assembly_N50', 'assembly_N75',
'assembly_L50', 'assembly_L75', 'num_N_per_100_kb', 'num_contigs_gt_0_bp', 'num_contigs_gt_1000_bp', 'num_contigs_gt_5000_bp', 'num_contigs_gt_10000_bp', 'num_contigs_gt_25000_bp', 'num_contigs_gt_50000_bp',
'total_length_gt_0_bp', 'total_length_gt_1000_bp', 'total_length_gt_5000_bp', 'total_length_gt_10000_bp',
'total_length_gt_25000_bp', 'total_length_gt_50000_bp']
:rtype: list[dict]
"""
field_lookup = collections.OrderedDict()
field_lookup['Assembly'] = 'assembly_id'
Expand Down Expand Up @@ -118,7 +127,7 @@ def main():
]

report = parse_transposed_quast_report(args.transposed_quast_report)
writer = csv.DictWriter(sys.stdout, fieldnames=output_fieldnames)
writer = csv.DictWriter(sys.stdout, fieldnames=output_fieldnames, dialect='unix', quoting=csv.QUOTE_MINIMAL, extrasaction='ignore')
writer.writeheader()
for record in report:
writer.writerow(record)
Expand Down
39 changes: 27 additions & 12 deletions main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ nextflow.enable.dsl = 2

include { hash_files as hash_files_fastq } from './modules/hash_files.nf'
include { hash_files as hash_files_assemblies } from './modules/hash_files.nf'
include { trim_reads } from './modules/fastp.nf'
include { fastp_json_to_csv } from './modules/fastp.nf'
include { fastp } from './modules/fastp.nf'
include { unicycler } from './modules/unicycler.nf'
include { mash_screen } from './modules/mash_screen.nf'
include { quast } from './modules/quast.nf'
include { parse_quast_report } from './modules/quast.nf'
include { mob_recon } from './modules/mob_recon.nf'
include { abricate as abricate_ncbi } from './modules/abricate.nf'
include { abricate as abricate_plasmidfinder } from './modules/abricate.nf'
Expand Down Expand Up @@ -53,9 +51,7 @@ workflow {

hash_files_fastq(ch_fastq.map{ it -> [it[0], [it[1], it[2]]] }.combine(Channel.of("fastq_input")))

trim_reads(ch_fastq)

fastp_json_to_csv(trim_reads.out.json)
fastp(ch_fastq)

if (params.pre_assembled) {
if (params.samplesheet_input != 'NO_FILE') {
Expand All @@ -65,15 +61,13 @@ workflow {
}
hash_files_assemblies(ch_assemblies.map{ it -> [it[0], [it[1]]] }.combine(Channel.of("assembly_input")))
} else {
unicycler(trim_reads.out.reads)
unicycler(fastp.out.reads)
ch_assemblies = unicycler.out.assembly
}

quast(ch_assemblies)

parse_quast_report(quast.out.report)

mash_screen(trim_reads.out.reads.combine(ch_mob_db))
mash_screen(fastp.out.reads.combine(ch_mob_db))

ch_mob_recon = mob_recon(ch_assemblies)

Expand All @@ -94,13 +88,34 @@ workflow {

ch_reference_plasmid = get_reference_plasmid(ch_reference_plasmid_id.combine(ch_mob_db))

align_reads_to_reference_plasmid(trim_reads.out.reads.cross(ch_reference_plasmid).map{ it -> it[0] + it[1].drop(1) })
align_reads_to_reference_plasmid(fastp.out.reads.cross(ch_reference_plasmid).map{ it -> it[0] + it[1].drop(1) })

call_snps(align_reads_to_reference_plasmid.out.alignment)

join_resistance_plasmid_and_snp_reports(ch_combined_abricate_mobtyper_report.cross(call_snps.out.num_snps.join(align_reads_to_reference_plasmid.out.coverage, by: [0, 1])).map{ it -> it[0] + it[1].drop(1) })
concatenate_resistance_reports(select_resistance_chromosomes.out.join(join_resistance_plasmid_and_snp_reports.out, remainder: true).groupTuple().map{ it -> [it[0], (it[1] - null) + (it[2] - null)]}.map{ it -> [it[0]] + [it[1].unique{ path -> path.getFileName() }] })

if (params.collect_outputs) {
fastp.out.csv.map{ it -> it[1] }.collectFile(
name: params.collected_outputs_prefix + "_fastp.csv",
storeDir: params.outdir,
keepHeader: true,
sort: true,
)
quast.out.csv.map{ it -> it[1] }.collectFile(
name: params.collected_outputs_prefix + "_quast.csv",
storeDir: params.outdir,
keepHeader: true,
sort: true,
)
concatenate_resistance_reports.out.map{ it -> it[1] }.collectFile(
name: params.collected_outputs_prefix + "_resistance_gene_report.tsv",
storeDir: params.outdir,
keepHeader: true,
sort: true,
)
}


//
// Provenance collection processes
Expand All @@ -114,7 +129,7 @@ workflow {
} else {
ch_provenance = ch_provenance.join(unicycler.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
}
ch_provenance = ch_provenance.join(trim_reads.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(fastp.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(quast.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(mash_screen.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
ch_provenance = ch_provenance.join(mob_recon.out.provenance).map{ it -> [it[0], it[1] << it[2]] }
Expand Down
25 changes: 5 additions & 20 deletions modules/fastp.nf
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
process trim_reads {
process fastp {

tag { sample_id }

publishDir "${params.outdir}/${sample_id}", pattern: "${sample_id}_fastp.{json,csv}", mode: 'copy'

input:
tuple val(sample_id), path(reads_1), path(reads_2)

output:
tuple val(sample_id), path("${sample_id}_trimmed_R1.fastq.gz"), path("${sample_id}_trimmed_R2.fastq.gz"), emit: reads
tuple val(sample_id), path("${sample_id}_fastp.json"), emit: json
tuple val(sample_id), path("${sample_id}_fastp.csv"), emit: csv
tuple val(sample_id), path("${sample_id}*_provenance.yml"), emit: provenance

script:
Expand All @@ -31,25 +34,7 @@ process trim_reads {
--unpaired2 ${sample_id}_unpaired.fastq.gz
mv fastp.json ${sample_id}_fastp.json
"""
}

process fastp_json_to_csv {

tag { sample_id }

executor 'local'
publishDir "${params.outdir}/${sample_id}", pattern: "${sample_id}_fastp.csv", mode: 'copy'

input:
tuple val(sample_id), path(fastp_json)

output:
tuple val(sample_id), path("${sample_id}_fastp.csv")

script:
"""
fastp_json_to_csv.py -s ${sample_id} ${fastp_json} > ${sample_id}_fastp.csv
fastp_json_to_csv.py -s ${sample_id} ${sample_id}_fastp.json > ${sample_id}_fastp.csv
"""
}
6 changes: 4 additions & 2 deletions modules/join_reports.nf
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ process concatenate_resistance_reports {

script:
"""
head -n 1 ${resistance_reports[0]} > ${sample_id}_resistance_gene_report.tsv
tail -qn+2 ${resistance_reports} >> ${sample_id}_resistance_gene_report.tsv
# tr -d '\015' removes carriage return characters, which can cause issues when concatenating files
head -n 1 ${resistance_reports[0]} | tr -d '\015' >> ${sample_id}_resistance_gene_report.tsv
tail -qn+2 ${resistance_reports} | tr -d '\015' >> ${sample_id}_resistance_gene_report.tsv
"""
}
24 changes: 4 additions & 20 deletions modules/quast.nf
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ process quast {

tag { sample_id }

publishDir "${params.outdir}/${sample_id}", pattern: "${sample_id}_quast.csv", mode: 'copy'

input:
tuple val(sample_id), path(assembly)

output:
tuple val(sample_id), path("${sample_id}_quast.tsv"), emit: report
tuple val(sample_id), path("${sample_id}_quast.csv"), emit: csv
tuple val(sample_id), path("${sample_id}*_provenance.yml"), emit: provenance

script:
Expand All @@ -19,25 +21,7 @@ process quast {
quast --threads ${task.cpus} ${assembly}
cp quast_results/latest/transposed_report.tsv ${sample_id}_quast.tsv
"""
}

process parse_quast_report {

tag { sample_id }

executor 'local'
publishDir "${params.outdir}/${sample_id}", pattern: "${sample_id}_quast.csv", mode: 'copy'

input:
tuple val(sample_id), path(quast_report)

output:
tuple val(sample_id), path("${sample_id}_quast.csv")

script:
"""
parse_quast_report.py ${quast_report} > ${sample_id}_quast.csv
parse_quast_report.py ${sample_id}_quast.tsv > ${sample_id}_quast.csv
"""
}
2 changes: 2 additions & 0 deletions nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ params {
mashthreshold = 0.996
min_plasmid_coverage_depth = 10
min_plasmid_coverage_breadth = 95
collect_outputs = false
collected_outputs_prefix = 'collected'
pipeline_short_name = parsePipelineName(manifest.toMap().get('name'))
pipeline_minor_version = parseMinorVersion(manifest.toMap().get('version'))
}
Expand Down

0 comments on commit 74b8048

Please sign in to comment.