Skip to content

Commit 2664248

Browse files
authored
Merge pull request #395 from AdamaJava/qsnp_unfiltered_read_bug
fix(qsnp): OOM caused by large number of failed filter reads at same location
2 parents c730209 + bc3936e commit 2664248

3 files changed

Lines changed: 44 additions & 52 deletions

File tree

qsnp/src/org/qcmg/snp/Pipeline.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
import java.io.File;
3030
import java.io.IOException;
31-
import java.nio.ByteBuffer;
3231
import java.text.DateFormat;
3332
import java.text.SimpleDateFormat;
3433
import java.util.ArrayList;
@@ -450,7 +449,7 @@ private void checkHeadersSortOrder(List<SAMFileHeader> headers, boolean isNormal
450449
}
451450

452451
List<SAMSequenceDictionary> getSequenceDictionaries(List<SAMFileHeader> headers) {
453-
final List<SAMSequenceDictionary> seqDictionaries = new ArrayList<SAMSequenceDictionary>();
452+
final List<SAMSequenceDictionary> seqDictionaries = new ArrayList<>();
454453
for (final SAMFileHeader header : headers) {
455454
seqDictionaries.add(header.getSequenceDictionary());
456455
}
@@ -485,7 +484,7 @@ private void checkSequenceDictionaries(List<SAMFileHeader> normalHeaders, List<S
485484

486485

487486
// pick the first normal sequence to check against fasta
488-
final List<SAMSequenceRecord> normalSequences = normalSeqDictionaries.get(0).getSequences();
487+
final List<SAMSequenceRecord> normalSequences = normalSeqDictionaries.getFirst().getSequences();
489488

490489
// now check against the supplied reference file
491490
final FastaSequenceFile ref = new FastaSequenceFile(new File(referenceFile), true);
@@ -633,7 +632,7 @@ void walkBams() throws Exception {
633632
/**
634633
* Sets up 2 Producer threads, 2 Consumer threads and a Cleaner thread, along with the concurrent collections, queues, and barriers used by them all
635634
*
636-
* @param ignoreDuplicates indicates whether duplicate records should be discarded out right. Not useful for torrent mode
635+
* @param includeDups indicates whether duplicate records should be discarded out right. Not useful for torrent mode
637636
* @throws Exception
638637
*/
639638
void walkBams(boolean includeDups) throws Exception {
@@ -736,7 +735,10 @@ public class Producer implements Runnable {
736735
private XXHash64 xxhash64;
737736
private final static int seed = 0x9747b28c; // used to initialize the hash value, use whatever value you want, but always the same
738737
private final static int ONE_MILLION = 1_000_000;
739-
738+
private int failedFilterStartPosition = 0;
739+
private int failedFilterCountAtStartPosition = 0;
740+
private static final int MAX_FAILED_RECORDS_PER_POSITION = 1000;
741+
740742
public Producer(final String[] bamFiles, final CountDownLatch latch, final boolean isNormal,
741743
final Queue<SAMRecordFilterWrapper> samQueue, final Thread mainThread, final String query,
742744
final CyclicBarrier barrier, boolean includeDups, Accumulator [] accum) throws Exception {
@@ -874,7 +876,24 @@ private void processRecord(SAMRecord record) throws Exception {
874876
/*
875877
* we now want to keep track of reads that don't pass the filter for test as well as control
876878
*/
877-
addRecordToQueue(record, passesFilter);
879+
if ( ! passesFilter) {
880+
/*
881+
There are instances in the genome where there are a large number of poor quality reads mapping to the same location.
882+
This is causing memory issues when trying to process these reads, and so will ignore these reads once we have seen 100 reads at the same start position that fail the filter.
883+
*/
884+
int start = record.getAlignmentStart();
885+
if (start == failedFilterStartPosition) {
886+
failedFilterCountAtStartPosition++;
887+
} else {
888+
failedFilterCountAtStartPosition = 1;
889+
failedFilterStartPosition = start;
890+
}
891+
if (failedFilterCountAtStartPosition <= MAX_FAILED_RECORDS_PER_POSITION) {
892+
addRecordToQueue(record, false);
893+
}
894+
} else {
895+
addRecordToQueue(record, true);
896+
}
878897
} else {
879898
// didn't have any filtering defined - add all
880899
addRecordToQueue(record, true);
@@ -986,7 +1005,7 @@ public void processSAMRecord(final SAMRecordFilterWrapper record) {
9861005
* @param length
9871006
* @param referenceOffset
9881007
* @param passesFilter
989-
* @param readStartPosition start position of the read - depends on strand as to whether this is the alignemtnEnd or alignmentStart
1008+
* @param readStartPosition start position of the read - depends on strand as to whether this is the alignmentEnd or alignmentStart
9901009
*/
9911010
public void updateMapWithAccums(int startPosition, final byte[] bases, final byte[] qualities,
9921011
boolean forwardStrand, int offset, int length, int referenceOffset, final boolean passesFilter, final int readEndPosition, long readNameHash) {
@@ -1275,10 +1294,10 @@ private void interrogateAccumulations(Accumulator control, Accumulator test) {
12751294
*/
12761295
if (null != control && null != test) {
12771296
if (control.getPosition() != test.getPosition()) {
1278-
throw new IllegalArgumentException("Control and test accumulator positions do not match!!! control: " + control.toString() + ", and test: " + test.toString());
1297+
throw new IllegalArgumentException("Control and test accumulator positions do not match!!! control: " + control + ", and test: " + test);
12791298
}
12801299
}
1281-
final int position = control != null ? control.getPosition() : test.getPosition();
1300+
final int position = control != null ? control.getPosition() : test != null ? test.getPosition() : Integer.MAX_VALUE;
12821301

12831302
// if we are over the length of this particular sequence - return
12841303
if (position - 1 >= referenceBasesLength) return;
@@ -1377,7 +1396,7 @@ private void interrogateAccumulations(Accumulator control, Accumulator test) {
13771396
/*
13781397
* populate adjacentAccumulators so that compound snp decision can be made
13791398
*/
1380-
adjacentAccumulators.put(v, new Pair<Accumulator, Accumulator>(control, test));
1399+
adjacentAccumulators.put(v, new Pair<>(control, test));
13811400
}
13821401
}
13831402
}

qsnp/test/org/qcmg/snp/PipelineTest.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package org.qcmg.snp;
22

3-
import static org.junit.Assert.assertEquals;
4-
53
import java.io.IOException;
64
import java.util.*;
5+
import java.util.concurrent.ConcurrentLinkedQueue;
76
import java.util.concurrent.CountDownLatch;
7+
import java.util.concurrent.CyclicBarrier;
88
import java.util.concurrent.atomic.AtomicInteger;
99

10+
import org.junit.Before;
1011
import org.junit.Rule;
1112
import org.junit.Test;
1213
import org.junit.rules.TemporaryFolder;
@@ -21,21 +22,22 @@
2122
import htsjdk.samtools.SAMFileHeader;
2223
import htsjdk.samtools.SAMRecord;
2324

25+
import static org.junit.Assert.*;
26+
2427
public class PipelineTest {
2528

2629
@Rule
2730
public final TemporaryFolder testFolder = new TemporaryFolder();
28-
31+
2932
@Test
3033
public void getHeader() {
3134
VcfHeader h = Pipeline.getHeaderForQSnp("abc", "123", "456", "qsnp", null, null, "xyz");
32-
assertEquals(true, null != h.getFormatRecord(VcfHeaderUtils.FORMAT_OBSERVED_ALLELES_BY_STRAND));
33-
assertEquals(true, h.getFormatRecord(VcfHeaderUtils.FORMAT_OBSERVED_ALLELES_BY_STRAND).toString().contains(VcfHeaderUtils.FORMAT_OBSERVED_ALLELES_BY_STRAND_DESC));
35+
assertNotNull(h.getFormatRecord(VcfHeaderUtils.FORMAT_OBSERVED_ALLELES_BY_STRAND));
36+
assertTrue(h.getFormatRecord(VcfHeaderUtils.FORMAT_OBSERVED_ALLELES_BY_STRAND).toString().contains(VcfHeaderUtils.FORMAT_OBSERVED_ALLELES_BY_STRAND_DESC));
3437
}
35-
36-
38+
3739
@Test
38-
public void getAccumulatorsFromReads() throws IOException {
40+
public void getAccumulatorsFromReads() {
3941
final Pipeline pipeline = new TestPipeline();
4042

4143
CountDownLatch consumerLatch = new CountDownLatch(1);

qsnp/test/org/qcmg/snp/StandardPipelineTest.java

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,14 @@
1212
import java.nio.file.Paths;
1313
import java.util.Arrays;
1414
import java.util.List;
15-
import java.util.concurrent.ConcurrentHashMap;
16-
import java.util.concurrent.ConcurrentMap;
1715
import java.util.concurrent.atomic.AtomicInteger;
18-
import java.util.stream.Collectors;
1916
import java.util.stream.IntStream;
2017

2118
import org.junit.Before;
22-
import org.junit.Ignore;
2319
import org.junit.Rule;
2420
import org.junit.Test;
2521
import org.junit.rules.TemporaryFolder;
2622
import org.qcmg.common.commandline.Executor;
27-
import org.qcmg.common.model.Accumulator;
2823

2924
import htsjdk.samtools.SAMFileHeader;
3025
import htsjdk.samtools.SAMFileHeader.SortOrder;
@@ -96,9 +91,9 @@ public void testRunStandardMode() throws Exception{
9691
final String command = "-log " + logFile.getAbsolutePath() + " -i " + ini.getAbsolutePath();
9792
final Executor exec = new Executor(command, "org.qcmg.snp.Main");
9893
assertEquals(0, exec.getErrCode());
99-
List<String> vcfs = Files.lines(Paths.get(vcf.getPath())).filter(s -> ! s.startsWith("#")).collect(Collectors.toList());
94+
List<String> vcfs = Files.lines(Paths.get(vcf.getPath())).filter(s -> ! s.startsWith("#")).toList();
10095

101-
vcfs.stream().forEach(System.out::println);
96+
vcfs.forEach(System.out::println);
10297

10398
assertEquals(60, vcfs.size());
10499
AtomicInteger ai = new AtomicInteger(0);
@@ -147,42 +142,18 @@ public void testNumbers() throws IOException, InterruptedException {
147142
final String command = "-log " + logFile.getAbsolutePath() + " -i " + ini.getAbsolutePath();
148143
final Executor exec = new Executor(command, "org.qcmg.snp.Main");
149144
assertEquals(0, exec.getErrCode());
150-
List<String> vcfs = Files.lines(Paths.get(vcf.getPath())).filter(s -> ! s.startsWith("#")).collect(Collectors.toList());
145+
List<String> vcfs = Files.lines(Paths.get(vcf.getPath())).filter(s -> ! s.startsWith("#")).toList();
151146

152-
vcfs.stream().forEach(System.out::println);
147+
vcfs.forEach(System.out::println);
153148

154149
assertEquals(4, vcfs.size());
155150
// assertEquals("GL000208.1 53 . T G . . FLANK=AAGTGGTTCAA GT:AD:DP:EOR:FF:FT:INF:NNS:OABS 0/1:90,6:96:T1[]5[]:C1;G51;T89:.:.:6:G1[41]5[40.2];T9[35.22]81[39.68] 0/1:90,6:96:T1[]5[]:C1;G51;T89:.:.:6:G1[41]5[40.2];T9[35.22]81[39.68]", vcfs.get(0));
156-
assertEquals("GL000208.1 53 . T G . . FLANK=AAGTGGTTCAA GT:AD:DP:EOR:FF:FT:INF:NNS:OABS 0/1:115,10:125:T1[]7[]:C1;G51;T89:.:.:9:G2[41]8[40.5];T9[35.22]106[39.88] 0/1:115,10:125:T1[]7[]:C1;G51;T89:.:.:9:G2[41]8[40.5];T9[35.22]106[39.88]", vcfs.get(0));
151+
assertEquals("GL000208.1 53 . T G . . FLANK=AAGTGGTTCAA GT:AD:DP:EOR:FF:FT:INF:NNS:OABS 0/1:115,10:125:T1[]7[]:C1;G51;T89:.:.:9:G2[41]8[40.5];T9[35.22]106[39.88] 0/1:115,10:125:T1[]7[]:C1;G51;T89:.:.:9:G2[41]8[40.5];T9[35.22]106[39.88]", vcfs.getFirst());
157152
// assertEquals("GL000208.1 77 . T C . . FLANK=AAAGACGTATT GT:AD:DP:EOR:FF:FT:INF:NNS:OABS 1/1:4,94:98:C0[]3[];T0[]1[]:C171;T11:.:.:64:C9[35.78]85[39.94];T1[41]3[39.67] 1/1:4,94:98:C0[]3[];T0[]1[]:C171;T11:.:.:64:C9[35.78]85[39.94];T1[41]3[39.67]", vcfs.get(1));
158153
// assertEquals("GL000208.1 84 . C A . . FLANK=TATTCAACTCA GT:AD:DP:EOR:FF:FT:INF:NNS:OABS 0/1:22,71:93:A2[]6[]:A178;C6:.:.:50:A8[38.62]63[38.92];C2[36.5]20[40.2] 0/1:22,71:93:A2[]6[]:A178;C6:.:.:50:A8[38.62]63[38.92];C2[36.5]20[40.2]", vcfs.get(2));
159154
// assertEquals("GL000208.1 98 . C A . . FLANK=ACTTTAATGCA GT:AD:DP:EOR:FF:FT:INF:NNS:OABS 1/1:0,83:83:A0[]8[]:A188;G1:.:.:59:A8[37]75[38.63] 1/1:0,83:83:A0[]8[]:A188;G1:.:.:59:A8[37]75[38.63]", vcfs.get(3));
160155
}
161156

162-
@Ignore
163-
public void testContainsAndRemove() {
164-
final ConcurrentMap<Integer, Accumulator> map = new ConcurrentHashMap<Integer, Accumulator>();
165-
final int noOfLoops = 100000;
166-
long time = System.currentTimeMillis();
167-
for (int i = 0 ; i < noOfLoops ; i++) {
168-
map.remove(i);
169-
}
170-
System.out.println("EMPTY: remove time: " + (System.currentTimeMillis() - time));
171-
172-
time = System.currentTimeMillis();
173-
for (int i = 0 ; i < noOfLoops ; i++) {
174-
if (map.containsKey(i))
175-
map.remove(i);
176-
}
177-
System.out.println("EMPTY: contains and remove time: " + (System.currentTimeMillis() - time));
178-
179-
time = System.currentTimeMillis();
180-
for (int i = 0 ; i < noOfLoops ; i++) {
181-
map.remove(i);
182-
}
183-
System.out.println("EMPTY: remove time: " + (System.currentTimeMillis() - time));
184-
}
185-
186157
public void checkBam(File f) {
187158
SamReader reader = SamReaderFactory.makeDefault().open(f);
188159
assertEquals(SortOrder.coordinate, reader.getFileHeader().getSortOrder());

0 commit comments

Comments
 (0)