Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions rounds/1_histogram/solution.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
"""Your Round 1 solution — byte-pair histogram.
"""Round 1 solution — byte-pair histogram."""

**Edit this file.** It currently delegates to ``baseline.py`` so everything
passes out of the box. Replace the body of ``compute_histogram`` with your
own faster implementation.
"""
import numpy as np


def compute_histogram(path: str) -> dict[bytes, int]:
"""Frequency of every 2-byte bigram in the file at ``path``."""
# TODO: remove this delegation and write your own implementation here.
from .baseline import compute_histogram as _baseline
data = np.fromfile(path, dtype=np.uint8)
if len(data) < 2:
return {}

return _baseline(path)
bigrams = (data[:-1].astype(np.uint16) << 8) | data[1:]

counts = np.bincount(bigrams, minlength=65536)

valid_indices = np.nonzero(counts)[0]
valid_counts = counts[valid_indices]

return {
int(idx).to_bytes(2, 'big'): int(count)
for idx, count in zip(valid_indices, valid_counts)
}
81 changes: 70 additions & 11 deletions rounds/3_dna/solution.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,76 @@
"""Your Round 3 solutionDNA sequence matcher.
"""Fast Round 3 solution: DNA sequence matcher."""

**Edit this file.** It currently delegates to ``baseline.py`` so everything
passes out of the box. Replace the body of ``find_matches`` with your
own faster implementation.
"""
from __future__ import annotations

from .baseline import find_matches as _baseline
import os
from concurrent.futures import ThreadPoolExecutor

import numpy as np

_NEWLINE = b"\n"


def find_matches(fasta_path: str, pattern: bytes) -> list[tuple[str, list[int]]]:
"""Find every FASTA record whose sequence contains ``pattern``.
"""Find every FASTA record whose sequence contains ``pattern``."""
if not pattern:
return []

pattern_len = len(pattern)
pattern_value = np.frombuffer(pattern, dtype=np.uint64)[0]

with open(fasta_path, "rb") as file:
data = file.read()

records = data.split(b">")[1:]
worker_count = os.cpu_count()

chunk_size = (len(records) + worker_count - 1) // worker_count
chunks = [
records[start : start + chunk_size]
for start in range(0, len(records), chunk_size)
]
with ThreadPoolExecutor(max_workers=worker_count) as executor:
groups = executor.map(
_scan_records,
chunks,
[pattern_value] * len(chunks),
[pattern_len] * len(chunks),
)

return [match for group in groups for match in group]


def _scan_records(
records: list[bytes],
pattern_value: np.uint64,
pattern_len: int,
) -> list[tuple[str, list[int]]]:
matches: list[tuple[str, list[int]]] = []
for record in records:
match = _scan_record(record, pattern_value, pattern_len)
if match is not None:
matches.append(match)
return matches


def _scan_record(
record: bytes,
pattern_value: np.uint64,
pattern_len: int,
) -> tuple[str, list[int]] | None:
record_id, _, wrapped_sequence = record.partition(_NEWLINE)
sequence = wrapped_sequence.replace(_NEWLINE, b"")
sequence_len = len(sequence)
if sequence_len < pattern_len:
return None

Returns ``[(record_id, [positions...]), ...]`` in file order.
"""
# TODO: remove this delegation and write your own implementation here.
return _baseline(fasta_path, pattern)
windows = np.ndarray(
shape=(sequence_len - pattern_len + 1,),
dtype=np.uint64,
buffer=sequence,
strides=(1,),
)
positions = np.nonzero(windows == pattern_value)[0]
if positions.size:
return record_id.decode("ascii"), positions.tolist()
return None
Loading