Skip to content

Commit

Permalink
Update clinical ETL to process sequencing accessioning records from r…
Browse files Browse the repository at this point in the history
…eceiving table

The clinical ETL is being updated for ingestion of sequencing accessioning data from receiving.clinical. This data is being sourced from tracking sheets maintained on Github for the Seattle Flu Study.

Running this ETL on receiving.clinical records with document containing `gisaid_accession` or `genbank_accession` will result in custom processing for this particular type of data. After matching to an existing sample, a minimal `consensus_genome` and `genomic_sequence` record will be generated for each covid-19, RSV-A, and RSV-B sequence record.
  • Loading branch information
davereinhart committed Aug 6, 2024
1 parent daff921 commit 1688cb5
Showing 1 changed file with 107 additions and 5 deletions.
112 changes: 107 additions & 5 deletions lib/seattleflu/id3c/cli/command/etl/clinical.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from id3c.db.session import DatabaseSession
from id3c.db.datatypes import Json
from id3c.cli.command.etl.redcap_det import insert_fhir_bundle

from id3c.db.types import MinimalSampleRecord, GenomeRecord, OrganismRecord
from id3c.cli.command.etl import (
etl,

Expand Down Expand Up @@ -39,6 +39,7 @@
from . import race, ethnicity
from .fhir import *
from .clinical_retrospectives import *
from id3c.cli.command.etl.consensus_genome import find_organism
from .redcap_map import map_symptom


Expand Down Expand Up @@ -104,17 +105,44 @@ def etl_clinical(*, db: DatabaseSession):
# Most of the time we expect to see existing sites so a
# select-first approach makes the most sense to avoid useless
# updates.
site = find_or_create_site(db,
identifier = site_identifier(record.document["site"]),
details = {"type": "retrospective"})
if record.document.get("site"):
site = find_or_create_site(db,
identifier = site_identifier(record.document["site"]),
details = {"type": "retrospective"})

# Sequencing accession IDs are being loaded into the clinical receiving table, and will
# be processed differently than other records, populating only the warehouse.consensus_genome and
# warehouse.genomic_sequence tables with the relevant data.
if record.document.get('genbank_accession') or record.document.get('gisaid_accession'):
# Find the matching organism within the warehouse for the reference organism
organism_name_map = {
'rsv-a': 'RSV.A',
'rsv-b': 'RSV.B',
'hcov19': 'Human_coronavirus.2019'
}
organism = find_organism(db, organism_name_map[record.document['pathogen']])

assert organism, f"No organism found with name «{record.document['pathogen']}»"

# Most of the time we expect to see new sequences, so an
# insert-first approach makes the most sense to avoid useless
# queries.
genome = upsert_genome(db,
sample = sample,
organism = organism)

genomic_sequence = upsert_genomic_sequence(db,
genome = genome,
details = record.document)



# PHSKC and KP2023 will be handled differently than other clinical records, converted
# to FHIR format and inserted into receiving.fhir table to be processed
# by the FHIR ETL. When time allows, SCH and KP should follow suit.
# Since KP2023 and KP samples both have KaiserPermanente as their site in id3c,
# use the ndjson document's site to distinguish KP vs KP2023 samples
if site.identifier == 'RetrospectivePHSKC' or record.document["site"].upper() == 'KP2023':
elif site.identifier == 'RetrospectivePHSKC' or record.document["site"].upper() == 'KP2023':
fhir_bundle = generate_fhir_bundle(db, record.document, site.identifier)
insert_fhir_bundle(db, fhir_bundle)

Expand Down Expand Up @@ -159,6 +187,80 @@ def etl_clinical(*, db: DatabaseSession):
LOG.info(f"Finished processing clinical record {record.id}")


def upsert_genome(db: DatabaseSession, sample: MinimalSampleRecord, organism: OrganismRecord) -> GenomeRecord:
"""
Upsert consensus genomes with the given *organism* and consensus genome *document*.
"""
LOG.debug(f"""
Upserting genome with sample_id ${sample.id},
organism {organism.id} «{organism.lineage}»""")

data = {
"sample_id": sample.id,
"organism_id": organism.id
}

genome: GenomeRecord = db.fetch_row("""
insert into warehouse.consensus_genome (sample_id, organism_id)
values (%(sample_id)s, %(organism_id)s)
on conflict (sample_id, organism_id, sequence_read_set_id) do nothing
returning consensus_genome_id as id, sample_id, organism_id
""", data)

assert genome.id, "Upsert affected no rows!"

LOG.info(f"""
Upserted genome {genome.id} with sample ID «{genome.sample_id}»
and organism ID «{genome.organism_id}»
""")

return genome

def upsert_genomic_sequence(db: DatabaseSession, genome: GenomeRecord, details: dict) -> Any:
"""
Upsert genomic sequence given a *genome* record and *details*.
"""
sequence_identifier = details['sequence_identifier']
LOG.info(f"Upserting genomic sequence «{sequence_identifier}»")

data = {
"identifier": sequence_identifier,
"segment": details.get('segment', ''),
"seq": "",
"genome_id": genome.id,
"additional_details": Json({
k:v for k,v in details.items() if k in [
'nwgc_id',
'strain_name',
'genbank_accession',
'gisaid_accession',
'_provenance'
]
})
}

genomic_sequence = db.fetch_row("""
insert into warehouse.genomic_sequence (identifier, segment, seq, consensus_genome_id, details)
values (%(identifier)s, %(segment)s, %(seq)s, %(genome_id)s, %(additional_details)s)
on conflict (identifier) do update
set seq = excluded.seq,
segment = excluded.segment,
details = excluded.details
returning genomic_sequence_id as id, identifier, segment, seq, consensus_genome_id
""", data)

assert genomic_sequence.consensus_genome_id == genome.id, \
"Provided sequence identifier was not unique, matched a sequence linked to another consensus genome!"
assert genomic_sequence.id, "Upsert affected no rows!"

LOG.info(f"Upserted genomic sequence {genomic_sequence.id}»")

return genomic_sequence

def create_encounter(db: DatabaseSession,
record: dict,
patient_reference: dict,
Expand Down

0 comments on commit 1688cb5

Please sign in to comment.