From 4d21277630fe192999105b1fbc305200cffe515f Mon Sep 17 00:00:00 2001 From: Sam Sims Date: Wed, 15 Oct 2025 15:23:31 +0000 Subject: [PATCH 1/9] feat: introduce spike removal via mapping adds a step to preprocess workflow that removes any spiked reads before running anything else. onyx percentages will now be percentages excluding spike --- bin/prep_spike_refs.py | 57 +++++++++++++ modules/preprocess.nf | 118 +++++++++++++++++++++++++- tests/modules/preprocess.nf.test | 21 +++++ tests/modules/preprocess.nf.test.snap | 35 ++++++++ 4 files changed, 228 insertions(+), 3 deletions(-) create mode 100755 bin/prep_spike_refs.py diff --git a/bin/prep_spike_refs.py b/bin/prep_spike_refs.py new file mode 100755 index 00000000..fa1c5e8f --- /dev/null +++ b/bin/prep_spike_refs.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +import argparse +import gzip +import json +from pathlib import Path + + +def load_spike_in_dict(ref_file): + with open(ref_file) as handle: + return json.load(handle) + + +def concatenate_references(ref_paths, output_file): + with open(output_file, "w") as f: + for ref_path in ref_paths: + with gzip.open(ref_path, "rt") as gz_file: + for line in gz_file: + f.write(line) + + +def main(): + parser = argparse.ArgumentParser( + description="Combine spike-in reference sequences" + ) + parser.add_argument( + "--spike_ins", + required=True, + help="Comma-separated list of spike-in names", + ) + parser.add_argument( + "--spike_in_dict", + required=True, + help="JSON file mapping spike-in names to reference files", + ) + parser.add_argument( + "--spike_in_ref_dir", + required=True, + help="Directory containing spike-in reference files", + ) + parser.add_argument( + "-o", + "--output", + default="combined_spikes.fa", + help="Output FASTA file", + ) + + args = parser.parse_args() + + spike_names = [name.strip() for name in args.spike_ins.split(",") if name.strip()] + spike_map = load_spike_in_dict(args.spike_in_dict) + base_dir = Path(args.spike_in_ref_dir) + ref_paths = [str(base_dir / spike_map[name]["ref"]) for name in spike_names] + concatenate_references(ref_paths, args.output) + + +if __name__ == "__main__": + main() diff --git a/modules/preprocess.nf b/modules/preprocess.nf index 0fc4a3fe..edcfebc3 100644 --- a/modules/preprocess.nf +++ b/modules/preprocess.nf @@ -1,3 +1,82 @@ +process prepare_spike_references { + label "process_single" + + conda "python=3.10" + container "biocontainers/python:3.10" + + input: + val spike_ins + path spike_in_dict + path spike_in_ref_dir + + output: + path "combined_spikes.fa", emit: combined_ref, optional: true + + script: + """ + prep_spike_refs.py --spike_ins ${spike_ins} --spike_in_dict ${spike_in_dict} --spike_in_ref_dir ${spike_in_ref_dir} -o combined_spikes.fa + """ +} + +process remove_spike_paired { + label "process_medium" + + conda "bioconda::minimap2 bioconda::samtools" + container "community.wave.seqera.io/library/minimap2_samtools:c2863f226d833ac8" + publishDir "${params.outdir}/${unique_id}/classifications", mode: "copy", overwrite: true, pattern: "*spikes*.fastq" + publishDir "${params.outdir}/${unique_id}/qc", mode: "copy", overwrite: true, pattern: "*.txt" + + input: + tuple val(unique_id), path(fastq1), path(fastq2) + path combined_reference + + output: + tuple val(unique_id), path("${unique_id}_removed_1.fastq"), path("${unique_id}_removed_2.fastq"), emit: removed_paired + tuple val(unique_id), path("${unique_id}_spikes_1.fastq"), path("${unique_id}_spikes_2.fastq"), emit: spike_reads + tuple val(unique_id), path("read_count_removed.txt"), path("read_count_spikes.txt"), emit: stats + + script: + """ + minimap2 -ax sr $combined_reference ${fastq1} ${fastq2} | samtools view -bS - | samtools sort -o sorted.bam + + samtools view -b -f 4 sorted.bam | samtools fastq -1 ${unique_id}_removed_1.fastq -2 ${unique_id}_removed_2.fastq - + samtools view -b -F 4 sorted.bam | samtools fastq -1 ${unique_id}_spikes_1.fastq -2 ${unique_id}_spikes_2.fastq - + + samtools view -c -f 4 sorted.bam > read_count_removed.txt + samtools view -c -F 4 sorted.bam > read_count_spikes.txt + """ +} + +process remove_spike_single { + label "process_medium" + + conda "bioconda::minimap2 bioconda::samtools" + container "community.wave.seqera.io/library/minimap2_samtools:c2863f226d833ac8" + + publishDir "${params.outdir}/${unique_id}/classifications", mode: "copy", overwrite: true, pattern: "*spikes*.fastq" + publishDir "${params.outdir}/${unique_id}/qc", mode: "copy", overwrite: true, pattern: "*.txt" + + input: + tuple val(unique_id), path(fastq) + path combined_reference + + output: + tuple val(unique_id), path("${unique_id}_removed.fastq"), emit: removed_single + tuple val(unique_id), path("${unique_id}_spikes.fastq"), emit: spike_reads + tuple val(unique_id), path("read_count_unmapped.txt"), path("read_count_mapped.txt"), emit: stats + + script: + """ + minimap2 -ax map-ont $combined_reference ${fastq} | samtools view -bS - | samtools sort -o sorted.bam + + samtools view -b -f 4 sorted.bam | samtools fastq - > ${unique_id}_removed.fastq + samtools view -b -F 4 sorted.bam | samtools fastq - > ${unique_id}_spikes.fastq + + samtools view -c -f 4 sorted.bam > read_count_unmapped.txt + samtools view -c -F 4 sorted.bam > read_count_mapped.txt + """ +} + process fastp_paired { label "process_medium" @@ -117,7 +196,18 @@ workflow preprocess { fastq2 = file(params.fastq2, type: "file", checkIfExists: true) input_ch = Channel.from([[unique_id, fastq1, fastq2]]) - fastp_paired(input_ch) + if (params.spike_ins) { + spike_in_dict = file(params.spike_in_dict, type: "file", checkIfExists: true) + spike_in_ref_dir = file(params.spike_in_ref_dir, type: "dir", checkIfExists: true) + + prepare_spike_references(params.spike_ins, spike_in_dict, spike_in_ref_dir) + remove_spike_paired(input_ch, prepare_spike_references.out.combined_ref) + spike_removed_ch = remove_spike_paired.out.removed_paired + } else { + spike_removed_ch = input_ch + } + + fastp_paired(spike_removed_ch) fastp_paired.out.fastq.set { processed_fastq_ch } paired_concatenate(fastp_paired.out.fastq) @@ -127,7 +217,18 @@ workflow preprocess { fastq = file(params.fastq, type: "file", checkIfExists: true) input_ch = Channel.from([[unique_id, fastq]]) - fastp_single(input_ch) + if (params.spike_ins) { + spike_in_dict = file(params.spike_in_dict, type: "file", checkIfExists: true) + spike_in_ref_dir = file(params.spike_in_ref_dir, type: "dir", checkIfExists: true) + + prepare_spike_references(params.spike_ins, spike_in_dict, spike_in_ref_dir) + remove_spike_single(input_ch, prepare_spike_references.out.combined_ref) + spike_removed_ch = remove_spike_single.out.removed_single + } else { + spike_removed_ch = input_ch + } + + fastp_single(spike_removed_ch) fastp_single.out.fastq .tap { processed_fastq_ch } @@ -139,7 +240,18 @@ workflow preprocess { .set { fastq_ch } input_ch = fastq_ch.map { fastq -> [unique_id, fastq] } - fastp_single(input_ch) + if (params.spike_ins) { + spike_in_dict = file(params.spike_in_dict, type: "file", checkIfExists: true) + spike_in_ref_dir = file(params.spike_in_ref_dir, type: "dir", checkIfExists: true) + + prepare_spike_references(params.spike_ins, spike_in_dict, spike_in_ref_dir) + remove_spike_single(input_ch, prepare_spike_references.out.combined_ref) + spike_removed_ch = remove_spike_single.out.removed_single + } else { + spike_removed_ch = input_ch + } + + fastp_single(spike_removed_ch) fastp_single.out.fastq .map { unique_id, fastq -> [unique_id + ".fq.gz", fastq] } .collectFile() diff --git a/tests/modules/preprocess.nf.test b/tests/modules/preprocess.nf.test index a676bc2f..b4cb509f 100644 --- a/tests/modules/preprocess.nf.test +++ b/tests/modules/preprocess.nf.test @@ -26,4 +26,25 @@ nextflow_workflow { } + test("preprocess_filtering_with_spike_success") { + + when { + params { + fastq = "${projectDir}/tests/test-data/spike_in/pass/tmv_mock.fastq" + spike_ins = "tobacco_mosaic_virus" + } + workflow { + """ + input[0] = "tmv_mock" + """ + } + } + + then { + assert workflow.success + assert snapshot(workflow.out).match() + } + + } + } diff --git a/tests/modules/preprocess.nf.test.snap b/tests/modules/preprocess.nf.test.snap index cd252199..a4555f94 100644 --- a/tests/modules/preprocess.nf.test.snap +++ b/tests/modules/preprocess.nf.test.snap @@ -33,5 +33,40 @@ "nextflow": "25.04.6" }, "timestamp": "2025-09-15T13:24:49.03481093" + }, + "preprocess_filtering_with_spike_success": { + "content": [ + { + "0": [ + [ + "tmv_mock", + "tmv_mock.fastp.fastq.gz:md5,5b6778684bac2ad70fa3c47f19881b68" + ] + ], + "1": [ + [ + "tmv_mock", + "tmv_mock.fastp.fastq.gz:md5,5b6778684bac2ad70fa3c47f19881b68" + ] + ], + "combined_fastq": [ + [ + "tmv_mock", + "tmv_mock.fastp.fastq.gz:md5,5b6778684bac2ad70fa3c47f19881b68" + ] + ], + "processed_fastq": [ + [ + "tmv_mock", + "tmv_mock.fastp.fastq.gz:md5,5b6778684bac2ad70fa3c47f19881b68" + ] + ] + } + ], + "meta": { + "nf-test": "0.9.2", + "nextflow": "25.04.6" + }, + "timestamp": "2025-10-15T12:07:27.231809467" } } \ No newline at end of file From 11419f83044d39f27e39157ecc2adb4aa4da0ce9 Mon Sep 17 00:00:00 2001 From: Sam Sims Date: Fri, 17 Oct 2025 16:25:18 +0100 Subject: [PATCH 2/9] feat: update check_spike_status with new mapping workflow --- bin/handle_spike_ins.py | 89 +++++++++++++---------------- modules/check_spike_status.nf | 17 ++---- modules/preprocess.nf | 44 +++++++------- subworkflows/classify_and_report.nf | 3 +- subworkflows/ingest.nf | 2 +- 5 files changed, 73 insertions(+), 82 deletions(-) diff --git a/bin/handle_spike_ins.py b/bin/handle_spike_ins.py index 4ab346ec..5bf4b37f 100755 --- a/bin/handle_spike_ins.py +++ b/bin/handle_spike_ins.py @@ -6,7 +6,7 @@ import json import os from collections import defaultdict -import mappy as mp +import gzip def parse_spike_in_refs(ref_file, ref_dir): @@ -50,6 +50,35 @@ def expand_spike_in_input(list_spike_ins, spike_in_dict): return spike_taxids, spike_refs +def parse_idxstats(idxstats_file): + counts = defaultdict(int) + total_unmapped = 0 + with open(idxstats_file, "r") as f: + for line in f: + parts = line.strip().split("\t") + name, _, mapped, unmapped = parts + mapped = int(mapped) + unmapped = int(unmapped) + if name == "*": + total_unmapped = unmapped + continue + counts[name] = mapped + counts["total"] = sum(counts.values()) + total_unmapped + return counts + + +def read_reference_headers(spike_refs): + ref_headers = {} + for ref_path in spike_refs: + headers = [] + with gzip.open(ref_path, "rt") as f: + for line in f: + if line.startswith(">"): + headers.append(line[1:].strip().split()[0]) + ref_headers[ref_path] = headers + return ref_headers + + def parse_depth(name): parse_name = name.split(" ") depth = 0 @@ -162,43 +191,16 @@ def parse_report_file(report_file, spike_in, save_json): return spike_entries -def map_to_refs(query, reference, counts, preset): - a = mp.Aligner(reference, best_n=1, preset=preset) # load or build index - if not a: - raise Exception(f"ERROR: failed to load/build index for {reference}") - - read_count = 0 - for name, seq, qual in mp.fastx_read(query): # read a fasta/q sequence - read_count += 1 - for hit in a.map(seq): # traverse alignments - counts[hit.ctg] += 1 - # print("{}\t{}\t{}\t{}\t{}".format(name, hit.ctg, hit.r_st, hit.r_en, hit.cigar_str)) - break - # if read_count % 1000000 == 0: - # break - counts["total"] = read_count - return a.seq_names - - -def identify_spike_map_counts(query, spike_refs, preset): - map_counts = defaultdict(int) - map_ids = defaultdict(list) - for reference in spike_refs: - map_ids[reference] = map_to_refs(query, reference, map_counts, preset) - return map_counts, map_ids - - def combine_report_and_map_counts( - list_spike_ins, spike_in_dict, report_entries, map_counts, map_ids + list_spike_ins, spike_in_dict, report_entries, map_counts, ref_headers ): spike_summary = defaultdict(lambda: {}) for spike in list_spike_ins: spike_dict = defaultdict(lambda: {}) if spike in spike_in_dict: spike_name = spike - if spike_in_dict[spike]["ref"]: - for long_name in map_ids[spike_in_dict[spike]["ref"]]: + for long_name in ref_headers[spike_in_dict[spike]["ref"]]: name, taxid, taxon_name = long_name.split("|") taxon_name = taxon_name.replace("_", " ") @@ -226,7 +228,7 @@ def combine_report_and_map_counts( spike_dict[spike].update(entry) elif spike.endswith("f*a") or spike.endswith("f*a.gz"): spike_name = spike.split("/")[-1].split(".")[0] - for long_name in map_ids[spike]: + for long_name in ref_headers[spike]: name, taxid, taxon_name = long_name.split("|") taxon_name = taxon_name.replace("_", " ") @@ -325,10 +327,10 @@ def main(): help="Kraken or Bracken file of taxon relationships and quantities", ) parser.add_argument( - "-i", - dest="fastq_file", + "--idxstats", + dest="idxstats_file", required=True, - help="Read file", + help="samtools idxstats output generated during spike removal", ) parser.add_argument( "--spike_ins", @@ -355,22 +357,12 @@ def main(): required=False, help="Save the kraken report in JSON format", ) - parser.add_argument( - "--illumina", - action="store_true", - required=False, - help="Use the short read minimap preset", - ) args = parser.parse_args() spike_ins = [] for spike in args.spike_ins: spike_ins.extend(spike.split(",")) - preset = None - if args.illumina: - preset = "sr" - # Start Program now = datetime.now() time = now.strftime("%m/%d/%Y, %H:%M:%S") @@ -383,13 +375,12 @@ def main(): args.report_file, spike_taxids, args.save_json ) - if len(spike_taxids) > 0 or len(spike_refs) > 0: - map_counts, map_ids = identify_spike_map_counts( - args.fastq_file, spike_refs, preset - ) + map_counts = parse_idxstats(args.idxstats_file) + ref_headers = read_reference_headers(spike_refs) + if spike_ins: spike_summary = combine_report_and_map_counts( - spike_ins, spike_in_dict, spike_kraken_entries, map_counts, map_ids + spike_ins, spike_in_dict, spike_kraken_entries, map_counts, ref_headers ) check_spike_summary(spike_summary) diff --git a/modules/check_spike_status.nf b/modules/check_spike_status.nf index 826ae665..c2ca2ab9 100644 --- a/modules/check_spike_status.nf +++ b/modules/check_spike_status.nf @@ -16,6 +16,7 @@ process check_spike_ins { val spike_ins path spike_in_dict path spike_in_ref_dir + path spike_mapping_stats output: tuple val(unique_id), path("spike_summary.json"), emit: status, optional: true @@ -23,21 +24,14 @@ process check_spike_ins { tuple val(unique_id), path("${kreport.baseName}*.json"), emit: kreport script: - preset = "" - if (params.read_type == "illumina") { - preset = "--illumina" - } - else if (params.paired) { - preset = "--illumina" - } """ handle_spike_ins.py \ -r ${kreport} \ - -i ${reads} \ --spike_ins ${spike_ins} \ --spike_in_dict ${spike_in_dict} \ --spike_in_ref_dir ${spike_in_ref_dir} \ - --save_json ${preset} + --idxstats ${spike_mapping_stats} \ + --save_json """ } @@ -45,7 +39,8 @@ workflow check_spike_status { take: kreport_ch fastq_ch - + spike_mapping_stats_ch + main: spike_ins = "${params.spike_ins}" println(spike_ins) @@ -53,7 +48,7 @@ workflow check_spike_status { spike_in_ref_dir = file("${params.spike_in_ref_dir}", type: "dir", checkIfExists: true) kreport_ch.join(fastq_ch).set { input_ch } - check_spike_ins(input_ch, spike_ins, spike_in_dict, spike_in_ref_dir) + check_spike_ins(input_ch, spike_ins, spike_in_dict, spike_in_ref_dir, spike_mapping_stats_ch) empty_file = file("${baseDir}/resources/empty_file") kreport_ch diff --git a/modules/preprocess.nf b/modules/preprocess.nf index edcfebc3..05e275f8 100644 --- a/modules/preprocess.nf +++ b/modules/preprocess.nf @@ -24,7 +24,7 @@ process remove_spike_paired { conda "bioconda::minimap2 bioconda::samtools" container "community.wave.seqera.io/library/minimap2_samtools:c2863f226d833ac8" publishDir "${params.outdir}/${unique_id}/classifications", mode: "copy", overwrite: true, pattern: "*spikes*.fastq" - publishDir "${params.outdir}/${unique_id}/qc", mode: "copy", overwrite: true, pattern: "*.txt" + publishDir "${params.outdir}/${unique_id}/qc", mode: "copy", overwrite: true, pattern: "*_spike_mapping_stats.tsv" input: tuple val(unique_id), path(fastq1), path(fastq2) @@ -33,7 +33,7 @@ process remove_spike_paired { output: tuple val(unique_id), path("${unique_id}_removed_1.fastq"), path("${unique_id}_removed_2.fastq"), emit: removed_paired tuple val(unique_id), path("${unique_id}_spikes_1.fastq"), path("${unique_id}_spikes_2.fastq"), emit: spike_reads - tuple val(unique_id), path("read_count_removed.txt"), path("read_count_spikes.txt"), emit: stats + tuple val(unique_id), path("${unique_id}_spike_mapping_stats.tsv"), emit: idxstats script: """ @@ -42,8 +42,7 @@ process remove_spike_paired { samtools view -b -f 4 sorted.bam | samtools fastq -1 ${unique_id}_removed_1.fastq -2 ${unique_id}_removed_2.fastq - samtools view -b -F 4 sorted.bam | samtools fastq -1 ${unique_id}_spikes_1.fastq -2 ${unique_id}_spikes_2.fastq - - samtools view -c -f 4 sorted.bam > read_count_removed.txt - samtools view -c -F 4 sorted.bam > read_count_spikes.txt + samtools idxstats sorted.bam > ${unique_id}_spike_mapping_stats.tsv """ } @@ -54,7 +53,7 @@ process remove_spike_single { container "community.wave.seqera.io/library/minimap2_samtools:c2863f226d833ac8" publishDir "${params.outdir}/${unique_id}/classifications", mode: "copy", overwrite: true, pattern: "*spikes*.fastq" - publishDir "${params.outdir}/${unique_id}/qc", mode: "copy", overwrite: true, pattern: "*.txt" + publishDir "${params.outdir}/${unique_id}/qc", mode: "copy", overwrite: true, pattern: "*_spike_mapping_stats.tsv" input: tuple val(unique_id), path(fastq) @@ -63,7 +62,7 @@ process remove_spike_single { output: tuple val(unique_id), path("${unique_id}_removed.fastq"), emit: removed_single tuple val(unique_id), path("${unique_id}_spikes.fastq"), emit: spike_reads - tuple val(unique_id), path("read_count_unmapped.txt"), path("read_count_mapped.txt"), emit: stats + tuple val(unique_id), path("${unique_id}_spike_mapping_stats.tsv"), emit: idxstats script: """ @@ -72,8 +71,7 @@ process remove_spike_single { samtools view -b -f 4 sorted.bam | samtools fastq - > ${unique_id}_removed.fastq samtools view -b -F 4 sorted.bam | samtools fastq - > ${unique_id}_spikes.fastq - samtools view -c -f 4 sorted.bam > read_count_unmapped.txt - samtools view -c -F 4 sorted.bam > read_count_mapped.txt + samtools idxstats sorted.bam > ${unique_id}_spike_mapping_stats.tsv """ } @@ -191,6 +189,8 @@ workflow preprocess { unique_id main: + def spike_mapping_stats_ch = Channel.empty() + if (params.paired) { fastq1 = file(params.fastq1, type: "file", checkIfExists: true) fastq2 = file(params.fastq2, type: "file", checkIfExists: true) @@ -203,6 +203,7 @@ workflow preprocess { prepare_spike_references(params.spike_ins, spike_in_dict, spike_in_ref_dir) remove_spike_paired(input_ch, prepare_spike_references.out.combined_ref) spike_removed_ch = remove_spike_paired.out.removed_paired + spike_mapping_stats_ch = remove_spike_paired.out.idxstats } else { spike_removed_ch = input_ch } @@ -221,12 +222,13 @@ workflow preprocess { spike_in_dict = file(params.spike_in_dict, type: "file", checkIfExists: true) spike_in_ref_dir = file(params.spike_in_ref_dir, type: "dir", checkIfExists: true) - prepare_spike_references(params.spike_ins, spike_in_dict, spike_in_ref_dir) - remove_spike_single(input_ch, prepare_spike_references.out.combined_ref) - spike_removed_ch = remove_spike_single.out.removed_single - } else { - spike_removed_ch = input_ch - } + prepare_spike_references(params.spike_ins, spike_in_dict, spike_in_ref_dir) + remove_spike_single(input_ch, prepare_spike_references.out.combined_ref) + spike_removed_ch = remove_spike_single.out.removed_single + spike_mapping_stats_ch = remove_spike_single.out.idxstats + } else { + spike_removed_ch = input_ch + } fastp_single(spike_removed_ch) @@ -244,12 +246,13 @@ workflow preprocess { spike_in_dict = file(params.spike_in_dict, type: "file", checkIfExists: true) spike_in_ref_dir = file(params.spike_in_ref_dir, type: "dir", checkIfExists: true) - prepare_spike_references(params.spike_ins, spike_in_dict, spike_in_ref_dir) - remove_spike_single(input_ch, prepare_spike_references.out.combined_ref) - spike_removed_ch = remove_spike_single.out.removed_single - } else { - spike_removed_ch = input_ch - } + prepare_spike_references(params.spike_ins, spike_in_dict, spike_in_ref_dir) + remove_spike_single(input_ch, prepare_spike_references.out.combined_ref) + spike_removed_ch = remove_spike_single.out.removed_single + spike_mapping_stats_ch = remove_spike_single.out.idxstats + } else { + spike_removed_ch = input_ch + } fastp_single(spike_removed_ch) fastp_single.out.fastq @@ -263,4 +266,5 @@ workflow preprocess { emit: processed_fastq = processed_fastq_ch combined_fastq = combined_fastq_ch + spike_mapping_stats = spike_mapping_stats_ch } diff --git a/subworkflows/classify_and_report.nf b/subworkflows/classify_and_report.nf index 1a7af2dd..fc62c9c0 100644 --- a/subworkflows/classify_and_report.nf +++ b/subworkflows/classify_and_report.nf @@ -12,6 +12,7 @@ workflow classify_and_report { fastq_ch concat_fastq_ch raise_server + spike_mapping_stats_ch main: qc_checks(fastq_ch) @@ -22,7 +23,7 @@ workflow classify_and_report { check_hcid_status(classify.out.kreport, concat_fastq_ch, setup_taxonomy.out.taxonomy) if (params.spike_ins) { - check_spike_status(classify.out.kreport, concat_fastq_ch) + check_spike_status(classify.out.kreport, concat_fastq_ch, spike_mapping_stats_ch) } kraken_to_json(classify.out.kreport, setup_taxonomy.out.taxonomy) diff --git a/subworkflows/ingest.nf b/subworkflows/ingest.nf index 3096b9b7..8109191b 100644 --- a/subworkflows/ingest.nf +++ b/subworkflows/ingest.nf @@ -13,7 +13,7 @@ workflow ingest { preprocess(unique_id) - classify_and_report(preprocess.out.processed_fastq, preprocess.out.combined_fastq, params.raise_server) + classify_and_report(preprocess.out.processed_fastq, preprocess.out.combined_fastq, params.raise_server, preprocess.out.spike_mapping_stats) extract_all(preprocess.out.processed_fastq, classify_and_report.out.assignments, classify_and_report.out.kreport, classify_and_report.out.taxonomy) emit: From e5056090d6a40feb13c216cbebb7b3a53372afe3 Mon Sep 17 00:00:00 2001 From: Sam Sims Date: Fri, 17 Oct 2025 16:39:49 +0100 Subject: [PATCH 3/9] fix: adjust input channels in check_spike_ins to properly merge spike_mapping_stats tuple --- modules/check_spike_status.nf | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/check_spike_status.nf b/modules/check_spike_status.nf index c2ca2ab9..d0d1f2b2 100644 --- a/modules/check_spike_status.nf +++ b/modules/check_spike_status.nf @@ -12,11 +12,10 @@ process check_spike_ins { publishDir "${params.outdir}/${unique_id}/classifications", mode: "copy", overwrite: true, pattern: "*.json" input: - tuple val(unique_id), val(database_name), path(kreport), path(reads) + tuple val(unique_id), val(database_name), path(kreport), path(reads), path(spike_mapping_stats) val spike_ins path spike_in_dict path spike_in_ref_dir - path spike_mapping_stats output: tuple val(unique_id), path("spike_summary.json"), emit: status, optional: true @@ -47,8 +46,9 @@ workflow check_spike_status { spike_in_dict = file("${params.spike_in_dict}", type: "file", checkIfExists: true) spike_in_ref_dir = file("${params.spike_in_ref_dir}", type: "dir", checkIfExists: true) - kreport_ch.join(fastq_ch).set { input_ch } - check_spike_ins(input_ch, spike_ins, spike_in_dict, spike_in_ref_dir, spike_mapping_stats_ch) + kreport_ch.join(fastq_ch).join(spike_mapping_stats_ch) .set { input_ch } + + check_spike_ins(input_ch, spike_ins, spike_in_dict, spike_in_ref_dir) empty_file = file("${baseDir}/resources/empty_file") kreport_ch From 6b28cc0a91afccebe75ece071e4b6f9d9424d342 Mon Sep 17 00:00:00 2001 From: Sam Sims Date: Fri, 17 Oct 2025 20:02:52 +0100 Subject: [PATCH 4/9] refactor: rename pass/fail to detected/absent no longer semantically a "pass" or "fail" since reads should be removed --- bin/handle_spike_ins.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bin/handle_spike_ins.py b/bin/handle_spike_ins.py index 5bf4b37f..e07454bd 100755 --- a/bin/handle_spike_ins.py +++ b/bin/handle_spike_ins.py @@ -291,25 +291,25 @@ def check_spike_summary(spike_summary): found_mapped_all = False if found_all: - check[spike] = "pass" + check[spike] = "detected" elif found_any: check[spike] = "partial" else: - check[spike] = "fail" + check[spike] = "absent" if found_classified_all: print(f"Spike {spike} found all refs by classification") elif found_classified_any: print(f"Spike {spike} found some refs by classification") else: - print(f"Spike {spike} failed by classification") + print(f"Spike {spike} absent by classification") if found_mapped_all: print(f"Spike {spike} found all refs by mapping") elif found_mapped_any: print(f"Spike {spike} found some refs by mapping") else: - print(f"Spike {spike} failed by mapping") + print(f"Spike {spike} absent by mapping") with open("spike_summary.json", "w") as outfile: json.dump(check, outfile, indent=4, sort_keys=False) From 86ffd2d31a3c1abeb0b20f3936884ec9311c1a07 Mon Sep 17 00:00:00 2001 From: Sam Sims Date: Fri, 17 Oct 2025 20:03:33 +0100 Subject: [PATCH 5/9] tests(spike): update tests for new process --- tests/modules/check_spike_status.nf.test | 16 ++++++++++++++++ tests/test-data/spike_in/fail/tmv_mock.idxstats | 2 ++ tests/test-data/spike_in/pass/tmv_mock.idxstats | 2 ++ 3 files changed, 20 insertions(+) create mode 100644 tests/test-data/spike_in/fail/tmv_mock.idxstats create mode 100644 tests/test-data/spike_in/pass/tmv_mock.idxstats diff --git a/tests/modules/check_spike_status.nf.test b/tests/modules/check_spike_status.nf.test index f1e7ad54..c0f9f145 100644 --- a/tests/modules/check_spike_status.nf.test +++ b/tests/modules/check_spike_status.nf.test @@ -27,6 +27,12 @@ nextflow_workflow { file("${projectDir}/tests/test-data/spike_in/pass/tmv_mock.fastq", checkIfExists: true) ) ) + input[2] = Channel.of( + tuple( + "spikein_test", + file("${projectDir}/tests/test-data/spike_in/pass/tmv_mock.idxstats", checkIfExists: true) + ) + ) """ } } @@ -34,6 +40,7 @@ nextflow_workflow { then { def spike_summary_json = path("${outputDir}/spikein_test/classifications/spike_count_summary.json").json def data = spike_summary_json["tobacco_mosaic_virus"]["NC_001367.1"] + def spike_status = path("${outputDir}/spikein_test/classifications/spike_summary.json").json assertAll ( { assert workflow.success }, { assert path("${outputDir}/spikein_test/classifications/spike_count_summary.json").exists() }, @@ -44,6 +51,7 @@ nextflow_workflow { { assert data.classified_percentage == 50.0 }, { assert data.mapped_count == 10 }, { assert data.mapped_percentage == 50.0 }, + { assert spike_status["tobacco_mosaic_virus"] == "detected" }, ) } @@ -70,6 +78,12 @@ nextflow_workflow { file("${projectDir}/tests/test-data/spike_in/fail/tmv_mock.fastq", checkIfExists: true) ) ) + input[2] = Channel.of( + tuple( + "spikein_test", + file("${projectDir}/tests/test-data/spike_in/fail/tmv_mock.idxstats", checkIfExists: true) + ) + ) """ } } @@ -77,6 +91,7 @@ nextflow_workflow { then { def spike_summary_json = path("${outputDir}/spikein_test/classifications/spike_count_summary.json").json def data = spike_summary_json["tobacco_mosaic_virus"]["NC_001367.1"] + def spike_status = path("${outputDir}/spikein_test/classifications/spike_summary.json").json assertAll( { assert workflow.success }, { assert path("${outputDir}/spikein_test/classifications/spike_count_summary.json").exists() }, @@ -86,6 +101,7 @@ nextflow_workflow { { assert data.classified_percentage == 0.0 }, { assert data.mapped_count == 0 }, { assert data.mapped_percentage == 0.0 }, + { assert spike_status["tobacco_mosaic_virus"] == "absent" }, ) } diff --git a/tests/test-data/spike_in/fail/tmv_mock.idxstats b/tests/test-data/spike_in/fail/tmv_mock.idxstats new file mode 100644 index 00000000..ec736108 --- /dev/null +++ b/tests/test-data/spike_in/fail/tmv_mock.idxstats @@ -0,0 +1,2 @@ +NC_001367.1|12242|Tobacco_mosaic_virus 6395 0 0 +* 0 0 20 diff --git a/tests/test-data/spike_in/pass/tmv_mock.idxstats b/tests/test-data/spike_in/pass/tmv_mock.idxstats new file mode 100644 index 00000000..65f83fc1 --- /dev/null +++ b/tests/test-data/spike_in/pass/tmv_mock.idxstats @@ -0,0 +1,2 @@ +NC_001367.1|12242|Tobacco_mosaic_virus 6395 10 0 +* 0 0 10 From 04c0e34acdd23bda336da411b00fd90a63ad4041 Mon Sep 17 00:00:00 2001 From: Sam Sims Date: Mon, 20 Oct 2025 11:02:16 +0000 Subject: [PATCH 6/9] tests: update test cases when mapping and have both kraken or no kraken classifications --- tests/modules/check_spike_status.nf.test | 55 +++++++++++++++++++++++- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/tests/modules/check_spike_status.nf.test b/tests/modules/check_spike_status.nf.test index c0f9f145..be10af2a 100644 --- a/tests/modules/check_spike_status.nf.test +++ b/tests/modules/check_spike_status.nf.test @@ -6,7 +6,58 @@ nextflow_workflow { tag "unit" tag "ci" - test("tobacco_mosaic_virus_detection_success") { + test("tobacco_mosaic_virus_detection_map_no_kraken") { + + when { + params { + spike_ins = "tobacco_mosaic_virus" + } + workflow { + """ + input[0] = Channel.of( + tuple( + "spikein_test", + "PlusPF", + file("${projectDir}/tests/test-data/spike_in/fail/tmv_kraken_report.txt", checkIfExists: true) + ) + ) + input[1] = Channel.of( + tuple( + "spikein_test", + file("${projectDir}/tests/test-data/spike_in/pass/tmv_mock.fastq", checkIfExists: true) + ) + ) + input[2] = Channel.of( + tuple( + "spikein_test", + file("${projectDir}/tests/test-data/spike_in/pass/tmv_mock.idxstats", checkIfExists: true) + ) + ) + """ + } + } + + then { + def spike_summary_json = path("${outputDir}/spikein_test/classifications/spike_count_summary.json").json + def data = spike_summary_json["tobacco_mosaic_virus"]["NC_001367.1"] + def spike_status = path("${outputDir}/spikein_test/classifications/spike_summary.json").json + assertAll ( + { assert workflow.success }, + { assert path("${outputDir}/spikein_test/classifications/spike_count_summary.json").exists() }, + { assert path("${outputDir}/spikein_test/classifications/spike_summary.json").exists() }, + { assert snapshot(workflow.out, path("${outputDir}/spikein_test/classifications/spike_count_summary.json")).match() }, + { assert data.human_readable == "Tobacco mosaic virus" }, + { assert data.classified_count == 0 }, + { assert data.classified_percentage == 0.0 }, + { assert data.mapped_count == 10 }, + { assert data.mapped_percentage == 50.0 }, + { assert spike_status["tobacco_mosaic_virus"] == "detected" }, + ) + } + + } + + test("tobacco_mosaic_virus_detection_map_with_kraken") { when { params { @@ -57,7 +108,7 @@ nextflow_workflow { } - test("tobacco_mosaic_virus_detection_fail") { + test("tobacco_mosaic_virus_detection_absent") { when { params { From 0068c3feb759f1aaaf54d8dcc52d27c5975a245d Mon Sep 17 00:00:00 2001 From: Sam Sims Date: Mon, 20 Oct 2025 11:29:44 +0000 Subject: [PATCH 7/9] feat: disallow secondary alignments --- modules/preprocess.nf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/preprocess.nf b/modules/preprocess.nf index 05e275f8..8a20a8e2 100644 --- a/modules/preprocess.nf +++ b/modules/preprocess.nf @@ -37,7 +37,7 @@ process remove_spike_paired { script: """ - minimap2 -ax sr $combined_reference ${fastq1} ${fastq2} | samtools view -bS - | samtools sort -o sorted.bam + minimap2 --secondary=no -ax sr $combined_reference ${fastq1} ${fastq2} | samtools view -bS - | samtools sort -o sorted.bam samtools view -b -f 4 sorted.bam | samtools fastq -1 ${unique_id}_removed_1.fastq -2 ${unique_id}_removed_2.fastq - samtools view -b -F 4 sorted.bam | samtools fastq -1 ${unique_id}_spikes_1.fastq -2 ${unique_id}_spikes_2.fastq - @@ -66,7 +66,7 @@ process remove_spike_single { script: """ - minimap2 -ax map-ont $combined_reference ${fastq} | samtools view -bS - | samtools sort -o sorted.bam + minimap2 --secondary=no -ax map-ont $combined_reference ${fastq} | samtools view -bS - | samtools sort -o sorted.bam samtools view -b -f 4 sorted.bam | samtools fastq - > ${unique_id}_removed.fastq samtools view -b -F 4 sorted.bam | samtools fastq - > ${unique_id}_spikes.fastq From b214f7623ad97036f1b8ff8719fd6c900dab6f95 Mon Sep 17 00:00:00 2001 From: Sam Sims Date: Mon, 20 Oct 2025 12:27:19 +0000 Subject: [PATCH 8/9] tests: update snapshots --- tests/modules/check_spike_status.nf.test.snap | 72 +++++++++++++++++++ tests/modules/preprocess.nf.test.snap | 22 +++++- 2 files changed, 92 insertions(+), 2 deletions(-) diff --git a/tests/modules/check_spike_status.nf.test.snap b/tests/modules/check_spike_status.nf.test.snap index 77268ff6..a271aa8f 100644 --- a/tests/modules/check_spike_status.nf.test.snap +++ b/tests/modules/check_spike_status.nf.test.snap @@ -46,5 +46,77 @@ "nextflow": "25.04.6" }, "timestamp": "2025-09-15T13:14:49.795239021" + }, + "tobacco_mosaic_virus_detection_absent": { + "content": [ + { + "0": [ + [ + "spikein_test", + "spikein_test:md5,4918dccd57cf10339ad8db4f6568fd1c" + ] + ], + "status_ch": [ + [ + "spikein_test", + "spikein_test:md5,4918dccd57cf10339ad8db4f6568fd1c" + ] + ] + }, + "spike_count_summary.json:md5,efaafe722640048e0f02988d3a60d162" + ], + "meta": { + "nf-test": "0.9.2", + "nextflow": "25.04.6" + }, + "timestamp": "2025-10-20T12:25:03.936482188" + }, + "tobacco_mosaic_virus_detection_map_with_kraken": { + "content": [ + { + "0": [ + [ + "spikein_test", + "spikein_test:md5,04b9444c3ec8b1ea773ba8ff80b9d9cf" + ] + ], + "status_ch": [ + [ + "spikein_test", + "spikein_test:md5,04b9444c3ec8b1ea773ba8ff80b9d9cf" + ] + ] + }, + "spike_count_summary.json:md5,1daa49bcc4f01fae08656c836db8d109" + ], + "meta": { + "nf-test": "0.9.2", + "nextflow": "25.04.6" + }, + "timestamp": "2025-10-20T12:24:44.438455784" + }, + "tobacco_mosaic_virus_detection_map_no_kraken": { + "content": [ + { + "0": [ + [ + "spikein_test", + "spikein_test:md5,04b9444c3ec8b1ea773ba8ff80b9d9cf" + ] + ], + "status_ch": [ + [ + "spikein_test", + "spikein_test:md5,04b9444c3ec8b1ea773ba8ff80b9d9cf" + ] + ] + }, + "spike_count_summary.json:md5,10b2b8e2813059baae5a33b38d23f239" + ], + "meta": { + "nf-test": "0.9.2", + "nextflow": "25.04.6" + }, + "timestamp": "2025-10-20T12:24:25.453682103" } } \ No newline at end of file diff --git a/tests/modules/preprocess.nf.test.snap b/tests/modules/preprocess.nf.test.snap index a4555f94..1d1f1ade 100644 --- a/tests/modules/preprocess.nf.test.snap +++ b/tests/modules/preprocess.nf.test.snap @@ -13,6 +13,9 @@ "barcode01", "barcode01.fastp.fastq.gz:md5,2c00a99a41249ed138cbf4d5cf67e595" ] + ], + "2": [ + ], "combined_fastq": [ [ @@ -25,6 +28,9 @@ "barcode01", "barcode01.fastp.fastq.gz:md5,2c00a99a41249ed138cbf4d5cf67e595" ] + ], + "spike_mapping_stats": [ + ] } ], @@ -32,7 +38,7 @@ "nf-test": "0.9.2", "nextflow": "25.04.6" }, - "timestamp": "2025-09-15T13:24:49.03481093" + "timestamp": "2025-10-20T12:19:34.111930399" }, "preprocess_filtering_with_spike_success": { "content": [ @@ -49,6 +55,12 @@ "tmv_mock.fastp.fastq.gz:md5,5b6778684bac2ad70fa3c47f19881b68" ] ], + "2": [ + [ + "tmv_mock", + "tmv_mock_spike_mapping_stats.tsv:md5,5ae10cf230a9f985bf0e2bced9fdf7f4" + ] + ], "combined_fastq": [ [ "tmv_mock", @@ -60,6 +72,12 @@ "tmv_mock", "tmv_mock.fastp.fastq.gz:md5,5b6778684bac2ad70fa3c47f19881b68" ] + ], + "spike_mapping_stats": [ + [ + "tmv_mock", + "tmv_mock_spike_mapping_stats.tsv:md5,5ae10cf230a9f985bf0e2bced9fdf7f4" + ] ] } ], @@ -67,6 +85,6 @@ "nf-test": "0.9.2", "nextflow": "25.04.6" }, - "timestamp": "2025-10-15T12:07:27.231809467" + "timestamp": "2025-10-20T12:20:06.734421901" } } \ No newline at end of file From bb23c7b6baacec87694567a71ff8c5f7dbf99bd1 Mon Sep 17 00:00:00 2001 From: Sam Sims Date: Mon, 20 Oct 2025 12:28:19 +0000 Subject: [PATCH 9/9] revert: restore pass/fail terminology in spike report --- bin/handle_spike_ins.py | 8 ++++---- tests/modules/check_spike_status.nf.test | 6 +++--- tests/modules/check_spike_status.nf.test.snap | 18 +++++++++--------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/bin/handle_spike_ins.py b/bin/handle_spike_ins.py index e07454bd..5bf4b37f 100755 --- a/bin/handle_spike_ins.py +++ b/bin/handle_spike_ins.py @@ -291,25 +291,25 @@ def check_spike_summary(spike_summary): found_mapped_all = False if found_all: - check[spike] = "detected" + check[spike] = "pass" elif found_any: check[spike] = "partial" else: - check[spike] = "absent" + check[spike] = "fail" if found_classified_all: print(f"Spike {spike} found all refs by classification") elif found_classified_any: print(f"Spike {spike} found some refs by classification") else: - print(f"Spike {spike} absent by classification") + print(f"Spike {spike} failed by classification") if found_mapped_all: print(f"Spike {spike} found all refs by mapping") elif found_mapped_any: print(f"Spike {spike} found some refs by mapping") else: - print(f"Spike {spike} absent by mapping") + print(f"Spike {spike} failed by mapping") with open("spike_summary.json", "w") as outfile: json.dump(check, outfile, indent=4, sort_keys=False) diff --git a/tests/modules/check_spike_status.nf.test b/tests/modules/check_spike_status.nf.test index be10af2a..378241e2 100644 --- a/tests/modules/check_spike_status.nf.test +++ b/tests/modules/check_spike_status.nf.test @@ -51,7 +51,7 @@ nextflow_workflow { { assert data.classified_percentage == 0.0 }, { assert data.mapped_count == 10 }, { assert data.mapped_percentage == 50.0 }, - { assert spike_status["tobacco_mosaic_virus"] == "detected" }, + { assert spike_status["tobacco_mosaic_virus"] == "pass" }, ) } @@ -102,7 +102,7 @@ nextflow_workflow { { assert data.classified_percentage == 50.0 }, { assert data.mapped_count == 10 }, { assert data.mapped_percentage == 50.0 }, - { assert spike_status["tobacco_mosaic_virus"] == "detected" }, + { assert spike_status["tobacco_mosaic_virus"] == "pass" }, ) } @@ -152,7 +152,7 @@ nextflow_workflow { { assert data.classified_percentage == 0.0 }, { assert data.mapped_count == 0 }, { assert data.mapped_percentage == 0.0 }, - { assert spike_status["tobacco_mosaic_virus"] == "absent" }, + { assert spike_status["tobacco_mosaic_virus"] == "fail" }, ) } diff --git a/tests/modules/check_spike_status.nf.test.snap b/tests/modules/check_spike_status.nf.test.snap index a271aa8f..62db1287 100644 --- a/tests/modules/check_spike_status.nf.test.snap +++ b/tests/modules/check_spike_status.nf.test.snap @@ -53,13 +53,13 @@ "0": [ [ "spikein_test", - "spikein_test:md5,4918dccd57cf10339ad8db4f6568fd1c" + "spikein_test:md5,b68e7df287a4950ca87c2f9781766df2" ] ], "status_ch": [ [ "spikein_test", - "spikein_test:md5,4918dccd57cf10339ad8db4f6568fd1c" + "spikein_test:md5,b68e7df287a4950ca87c2f9781766df2" ] ] }, @@ -69,7 +69,7 @@ "nf-test": "0.9.2", "nextflow": "25.04.6" }, - "timestamp": "2025-10-20T12:25:03.936482188" + "timestamp": "2025-10-20T12:27:58.716462463" }, "tobacco_mosaic_virus_detection_map_with_kraken": { "content": [ @@ -77,13 +77,13 @@ "0": [ [ "spikein_test", - "spikein_test:md5,04b9444c3ec8b1ea773ba8ff80b9d9cf" + "spikein_test:md5,936f0f9158f6e8bad2f9a957e7694b0f" ] ], "status_ch": [ [ "spikein_test", - "spikein_test:md5,04b9444c3ec8b1ea773ba8ff80b9d9cf" + "spikein_test:md5,936f0f9158f6e8bad2f9a957e7694b0f" ] ] }, @@ -93,7 +93,7 @@ "nf-test": "0.9.2", "nextflow": "25.04.6" }, - "timestamp": "2025-10-20T12:24:44.438455784" + "timestamp": "2025-10-20T12:27:37.257290452" }, "tobacco_mosaic_virus_detection_map_no_kraken": { "content": [ @@ -101,13 +101,13 @@ "0": [ [ "spikein_test", - "spikein_test:md5,04b9444c3ec8b1ea773ba8ff80b9d9cf" + "spikein_test:md5,936f0f9158f6e8bad2f9a957e7694b0f" ] ], "status_ch": [ [ "spikein_test", - "spikein_test:md5,04b9444c3ec8b1ea773ba8ff80b9d9cf" + "spikein_test:md5,936f0f9158f6e8bad2f9a957e7694b0f" ] ] }, @@ -117,6 +117,6 @@ "nf-test": "0.9.2", "nextflow": "25.04.6" }, - "timestamp": "2025-10-20T12:24:25.453682103" + "timestamp": "2025-10-20T12:27:17.41332216" } } \ No newline at end of file