Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 60 additions & 42 deletions src/hla_algorithm/hla_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from io import TextIOBase
from operator import attrgetter
from pathlib import Path
from typing import Final, Optional, TypedDict, cast
from typing import Final, Optional, TypedDict

import numpy as np
import yaml
Expand All @@ -18,12 +18,12 @@
HLASequence,
HLAStandard,
HLAStandardMatch,
MatchingAllelePair,
)
from .utils import (
BIN2NUC,
HLA_LOCUS,
StoredHLAStandards,
allele_coordinates_sort_key,
count_strict_mismatches,
nuc2bin,
sort_allele_pairs,
Expand Down Expand Up @@ -138,7 +138,9 @@ def load_default_hla_standards() -> LoadedStandards:
:return: List of known HLA standards
:rtype: list[HLAStandard]
"""
with open(HLAAlgorithm.DEFAULT_CONFIG_DIR / "hla_standards.yaml") as standards_file:
with open(
HLAAlgorithm.DEFAULT_CONFIG_DIR / "hla_standards.yaml"
) as standards_file:
return HLAAlgorithm.read_hla_standards(standards_file)

FREQUENCY_LOCUS_COLUMNS: dict[HLA_LOCUS, tuple[str, str]] = {
Expand Down Expand Up @@ -230,16 +232,24 @@ def combine_standards_stepper(
matching_stds: Sequence[HLAStandardMatch],
seq: Sequence[int],
mismatch_threshold: int = 0,
) -> Generator[tuple[tuple[int, ...], int, tuple[str, str]], None, None]:
) -> Generator[MatchingAllelePair, None, None]:
"""
Identifies "good" combined standards for the specified sequence.

Humans have two copies of their HLA genes, so when we use Sanger
sequencing to sequence a person's HLA, we get a single sequence with
potentially many mixtures. That is, at any position that the two genes
don't match, we see a nucleotide mixture consisting of the two
corresponding bases.

In order to find matches, we take allele sequences (reduced to ones that
are already "decent" matches for our sequence, to reduce running time)
and "mush" them together to produce potential matches for our sequence.

On each iteration, it continues checking combined standards until it
finds a "match", and yields a tuple containing the details of that
match:
- the combined standard, as a tuple of integers 0-15;
- the number of mismatches identified; and
- the allele pair (i.e. names of the two alleles in the combination).
finds a "match", and yields a MatchingAllelePair containing its details.

PRECONDITION: matching_stds should contain no duplicates.

A "match" is defined by the number of mismatches between the combined
standard and the sequence:
Expand All @@ -263,15 +273,6 @@ def combine_standards_stepper(
# "Mush" the two standards together to produce something
# that looks like what you get when you sequence HLA.
std_bin = np.array(std_b.sequence) | np.array(std_a.sequence)
allele_pair: tuple[str, str] = cast(
tuple[str, str],
tuple(
sorted(
(std_a.allele, std_b.allele),
key=allele_coordinates_sort_key,
)
),
)

# There could be more than one combined standard with the
# same sequence, so check if this one's already been found.
Expand All @@ -291,28 +292,19 @@ def combine_standards_stepper(
elif mismatches < current_rejection_threshold:
current_rejection_threshold = max(mismatches, mismatch_threshold)

yield (combined_std_bin, mismatches, allele_pair)
yield MatchingAllelePair.create_from_unsorted_alleles(
standard_bin=combined_std_bin,
mismatch_count=mismatches,
allele_names=(std_a.allele, std_b.allele),
)

@staticmethod
def combine_standards(
matching_stds: Sequence[HLAStandardMatch],
seq: Sequence[int],
def collate_matching_allele_pairs(
matching_allele_pairs: Iterable[MatchingAllelePair],
mismatch_threshold: Optional[int] = None,
) -> dict[HLACombinedStandard, int]:
"""
Find the combinations of standards that match the given sequence.

Humans have two copies of their HLA genes, so when we use Sanger
sequencing to sequence a person's HLA, we get a single sequence with
potentially many mixtures. That is, at any position that the two genes
don't match, we see a nucleotide mixture consisting of the two
corresponding bases.

In order to find matches, we take allele sequences (reduced to ones that
are already "decent" matches for our sequence, to reduce running time)
and "mush" them together to produce potential matches for our sequence.

PRECONDITION: matching_stds should contain no duplicates.
Collate the given MatchingAllelePairs into HLACombinedStandards.

Returns a dictionary mapping HLACombinedStandards to their mismatch
counts. If mismatch_threshold is None or 0, then the result contains
Expand All @@ -330,13 +322,13 @@ def combine_standards(
combos: dict[tuple[int, ...], tuple[int, list[tuple[str, str]]]] = {}

fewest_mismatches: int | float = float("inf")
for (
combined_std_bin,
mismatches,
allele_pair,
) in HLAAlgorithm.combine_standards_stepper(
matching_stds, seq, mismatch_threshold
):
for matching_allele_pair in matching_allele_pairs:
combined_std_bin: tuple[int, ...] = matching_allele_pair.standard_bin
mismatches: int = matching_allele_pair.mismatch_count
allele_pair: tuple[str, str] = (
matching_allele_pair.allele_1,
matching_allele_pair.allele_2,
)
if combined_std_bin not in combos:
combos[combined_std_bin] = (mismatches, [])
combos[combined_std_bin][1].append(allele_pair)
Expand All @@ -362,6 +354,32 @@ def combine_standards(

return result

@staticmethod
def combine_standards(
matching_stds: Sequence[HLAStandardMatch],
seq: Sequence[int],
mismatch_threshold: Optional[int] = None,
) -> dict[HLACombinedStandard, int]:
"""
Find the combinations of standards that match the given sequence.

This uses combine_standards_stepper to find any putative matches, and
then uses collate_matching_allele_pairs to compile the information into
a dictionary mapping HLACombinedStandards to their mismatch counts.

The parameters are as for combine_standards_stepper; mismatch_threshold
is also fed directly into collate_matching_allele_pairs and affects the
results accordingly.
"""
return HLAAlgorithm.collate_matching_allele_pairs(
HLAAlgorithm.combine_standards_stepper(
matching_stds,
seq,
mismatch_threshold if mismatch_threshold is not None else 0,
),
mismatch_threshold,
)

@staticmethod
def get_mismatches(
standard_bin: Sequence[int],
Expand Down
43 changes: 41 additions & 2 deletions src/hla_algorithm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
from collections.abc import Iterable
from dataclasses import dataclass, field
from operator import itemgetter
from typing import Final, Optional
from typing import Final, Optional, Self

import numpy as np
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, model_validator

from .utils import (
HLA_LOCUS,
Expand Down Expand Up @@ -75,6 +75,45 @@ class HLAStandardMatch(HLAStandard):
mismatch: int


class MatchingAllelePair(BaseModel):
"""
Represents an allele pair that matches an observed sequence.

This contains:
- the combined standard, as a tuple of integers 0-15;
- the number of mismatches identified; and
- the allele pair (i.e. names of the two alleles in the combination).
"""
standard_bin: tuple[int, ...]
mismatch_count: int
allele_1: str
allele_2: str

@model_validator(mode="after")
def check_alleles_ordered(self) -> Self:
if allele_coordinates_sort_key(self.allele_1) > allele_coordinates_sort_key(self.allele_2):
raise ValueError("allele_1 should be less than or equal to allele_2")
return self

@classmethod
def create_from_unsorted_alleles(
cls,
standard_bin: tuple[int, ...],
mismatch_count: int,
allele_names: tuple[str, str],
) -> Self:
sorted_allele_names: list[str] = sorted(
allele_names,
key=allele_coordinates_sort_key,
)
return cls(
standard_bin=standard_bin,
mismatch_count=mismatch_count,
allele_1=sorted_allele_names[0],
allele_2=sorted_allele_names[1],
)


class HLACombinedStandard(BaseModel):
"""
Represents a combined HLA standard and all of its possible combinations.
Expand Down
Loading
Loading