Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allele rle normalization + pin pydantic version #234

Merged
merged 16 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ install_requires =
jsonschema>=4.17.3
numpy
pyyaml
pydantic>=2.0.3
pydantic == 2.1.1
setup_requires =
cython
pytest-runner
Expand Down
221 changes: 174 additions & 47 deletions src/ga4gh/vrs/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,63 +3,191 @@
See https://vrs.ga4gh.org/en/stable/impl-guide/normalization.html

"""

import logging
from enum import IntEnum
from typing import NamedTuple, Optional, Union

from bioutils.normalize import normalize as _normalize, NormalizationMode
from ga4gh.core import is_pydantic_instance, ga4gh_digest, pydantic_copy

from ._internal import models
from .dataproxy import SequenceProxy

_logger = logging.getLogger(__name__)

class PosType(IntEnum):
"""Define the kind of position on a location"""

INTEGER = 0
RANGE_LT_OR_EQUAL = 1
RANGE_GT_OR_EQUAL = 2


class LocationPos(NamedTuple):
"""Define Allele location pos value and type"""

value: int
pos_type: PosType


def _normalize_allele(input_allele, data_proxy):
def _get_allele_location_pos(
allele_vo: models.Allele, use_start: bool = True
) -> Optional[LocationPos]:
"""Get a representative position for Alleles with Location start or end defined by Range

:param allele_vo: VRS Allele object
:param use_start: `True` if using `allele_vo.location.start`. `False` if using
`allele_vo.location.end`
:return: A `LocationPos` if using integer or indefinite range. Otherwise return
`None`
"""
Converts .location.sequence into an IRI if it is a SequenceReference because it makes the code simpler.
If it started as a sequence reference, put it back as one at the end.
if use_start:
pos = allele_vo.location.start
else:
pos = allele_vo.location.end

if isinstance(pos, int):
val = pos
pos_type = PosType.INTEGER
else:
pos0_is_none = pos.root[0] is None
pos1_is_none = pos.root[1] is None

if not pos0_is_none and not pos1_is_none: # definite range
return None

val = pos.root[0] or pos.root[1]
pos_type = PosType.RANGE_LT_OR_EQUAL if pos0_is_none else PosType.RANGE_GT_OR_EQUAL

return LocationPos(value=val, pos_type=pos_type)


def _get_new_allele_location_pos(
new_pos_val: int, pos_type: PosType
) -> Union[int, models.Range]:
"""Get updated location pos on normalized allele

:param new_pos_val: New position after normalization
:param pos_type: Original position type used in pre-normalized VRS Allele object
:return: Updated position as integer or VRS Range object
"""
if pos_type == PosType.INTEGER:
val = new_pos_val
elif pos_type == PosType.RANGE_LT_OR_EQUAL:
val = models.Range([None, new_pos_val])
else:
val = models.Range([new_pos_val, None])
return val


def _normalize_allele(input_allele, data_proxy, rle_seq_limit=50):
"""Normalize Allele using "fully-justified" normalization adapted from NCBI's
VOCA. Fully-justified normalization expands such ambiguous representation over the
entire region of ambiguity, resulting in an unambiguous representation that may be
readily compared with other alleles.

This function assumes that IRIs are dereferenced, providing a `SequenceReference` as
the `allele.location.sequence`. If a `SequenceReference` is not provided, the allele
will be returned as is with no normalization.

:param input_allele: Input VRS Allele object
:param data_proxy: SeqRepo dataproxy
:param rle_seq_limit: If RLE is set as the new state, set the limit for the length
of the `sequence`.
To exclude `sequence` from the response, set to 0.
For no limit, set to `None`.
korikuzma marked this conversation as resolved.
Show resolved Hide resolved

Does not attempt to normalize Alleles with definite ranges and will instead return the
`input_allele`
"""
allele = pydantic_copy(input_allele)
sequence_reference = None

if isinstance(allele.location.sequence, models.SequenceReference):
sequence_reference = allele.location.sequence
allele.location.sequence = models.IRI(sequence_reference.refgetAccession)
alias = f"ga4gh:{allele.location.sequence.refgetAccession}"
theferrit32 marked this conversation as resolved.
Show resolved Hide resolved
else:
return input_allele

sequence = SequenceProxy(data_proxy, allele.location.sequence.root)
# Get reference sequence and interval
ref_seq = SequenceProxy(data_proxy, alias)
start = _get_allele_location_pos(allele, use_start=True)
if start is None:
return input_allele

ival = (allele.location.start, allele.location.end)
end = _get_allele_location_pos(allele, use_start=False)
if end is None:
return input_allele

_allele_state = allele.state.type
_states_with_sequence = ["ReferenceLengthExpression", "LiteralSequenceExpression"]
if _allele_state in _states_with_sequence:
ival = (start.value, end.value)
if allele.state.sequence:
alleles = (None, allele.state.sequence.root)
elif _allele_state == "RepeatedSequenceExpression" and \
allele.state.seq_expr.type in _states_with_sequence:
alleles = (None, allele.state.seq_expr.sequence.root)
else:
alleles = (None, "")

new_allele = pydantic_copy(allele)

# Trim common flanking sequence from Allele sequences.
try:
new_ival, new_alleles = _normalize(sequence,
ival,
alleles=alleles,
mode=NormalizationMode.EXPAND,
anchor_length=0)
trim_ival, trim_alleles = _normalize(ref_seq, ival, alleles, mode=None, trim=True)
except ValueError:
# Occurs for ref agree Alleles (when alt = ref)
len_ref_seq = len_alt_seq = 0
else:
trim_ref_seq = ref_seq[trim_ival[0]: trim_ival[1]]
trim_alt_seq = trim_alleles[1]
len_ref_seq = len(trim_ref_seq)
len_alt_seq = len(trim_alt_seq)

# Compare the two allele sequences
if not len_ref_seq and not len_alt_seq:
return input_allele

new_allele.location.start = new_ival[0]
new_allele.location.end = new_ival[1]
new_allele = pydantic_copy(allele)

if new_allele.state.type in _states_with_sequence:
new_allele.state.sequence = models.SequenceString(new_alleles[1])
if len_ref_seq and len_alt_seq:
new_allele.location.start = _get_new_allele_location_pos(
trim_ival[0], start.pos_type
)
new_allele.location.end = _get_new_allele_location_pos(
trim_ival[1], end.pos_type
)
new_allele.state.sequence = models.SequenceString(trim_alleles[1])
return new_allele

# Determine bounds of ambiguity
try:
new_ival, new_alleles = _normalize(
ref_seq,
trim_ival,
(None, trim_alleles[1]),
mode=NormalizationMode.EXPAND
)
except ValueError:
# Occurs for ref agree Alleles (when alt = ref)
pass

if sequence_reference:
new_allele.location.sequence = sequence_reference
else:
new_allele.location.start = _get_new_allele_location_pos(
new_ival[0], start.pos_type
)
new_allele.location.end = _get_new_allele_location_pos(
new_ival[1], end.pos_type
)

new_ref_seq = ref_seq[new_ival[0]: new_ival[1]]

if not new_ref_seq:
# If the reference sequence is empty this is an unambiguous insertion.
# Return a new Allele with the trimmed alternate sequence as a Literal
# Sequence Expression
new_allele.state = models.LiteralSequenceExpression(
sequence=models.SequenceString(new_alleles[1])
)
else:
# Otherwise, return a new Allele using a RLE
len_sequence = len(new_alleles[1])

new_allele.state = models.ReferenceLengthExpression(
length=len_sequence,
repeatSubunitLength=len_ref_seq or len_alt_seq
)

if (rle_seq_limit and len_sequence <= rle_seq_limit) or (rle_seq_limit is None):
new_allele.state.sequence = models.SequenceString(new_alleles[1])

return new_allele

Expand All @@ -73,27 +201,26 @@ def _normalize_haplotype(o, data_proxy=None):
return o


def _normalize_variationset(o, data_proxy=None):
o.members = sorted(o.members, key=ga4gh_digest)
return o


handlers = {
"Allele": _normalize_allele,
"Haplotype": _normalize_haplotype,
"VariationSet": _normalize_variationset,
}


def normalize(vo, data_proxy=None):
"""normalize given vrs object, regardless of type"""
def normalize(vo, data_proxy=None, **kwargs):
"""normalize given vrs object, regardless of type

kwargs:
rle_seq_limit: If RLE is set as the new state, set the limit for the length
of the `sequence`. To exclude `state.sequence`, set to 0.
"""

assert is_pydantic_instance(vo)
vo_type = vo.type

if vo_type in handlers:
handler = handlers[vo_type]
return handler(vo, data_proxy)
return handler(vo, data_proxy, **kwargs)

# No handler for vo_type; pass-through unchanged
return vo
Expand Down Expand Up @@ -122,13 +249,13 @@ def normalize(vo, data_proxy=None):
},
"type": "Allele"
}
allele = models.Allele(**allele_dict)
a = models.Allele(**allele_dict)

allele2 = normalize(allele, dp)
allele2 = normalize(a, dp)

allele.state.sequence = "C"
allele3 = normalize(allele, dp)
a.state.sequence.root = "C"
allele3 = normalize(a, dp)

allele.location.interval.end = 44908823
allele.state.sequence = ""
allele4 = normalize(allele, dp)
a.location.end = 44908823
a.state.sequence.root = ""
allele4 = normalize(a, dp)
Loading
Loading