Skip to content

Commit

Permalink
refactor(multithreading): isolate process data handling
Browse files Browse the repository at this point in the history
Move data handling code from scripts.multithreading.process to
scripts.data_handling.process. This allows for improved test coverage,
by separating easy-to-test data handling from much harder-to-test
concurrency.
  • Loading branch information
rbpatt2019 committed Jun 10, 2021
1 parent 04648e7 commit 16d96e7
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 16 deletions.
2 changes: 2 additions & 0 deletions scripts/data_handling/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
"""Functions for handling data."""
66 changes: 66 additions & 0 deletions scripts/data_handling/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
"""Data handling for *process* step."""
import logging
from os import PathLike

import pandas as pd

logger = logging.getLogger(__name__)


def merge_data(
gtex_path: PathLike, bm_path: PathLike, mane: pd.DataFrame
) -> pd.DataFrame:
"""Merge the data from previous pipeline queries.
Parameters
----------
gtex_path : PathLike
Path to the file containing GTEx query data.
bm_path : PathLike
Path to the file containing BioMart query data.
mane : pd.DataFrame
A DataFrame containing MANE annotations
Returns
-------
pd.DataFrame
The merged DataFrame, containing the GTEx query,
the BioMart query, and the MANE annotations.
"""
gtex = pd.read_csv(gtex_path, header=0, index_col=None)
bm = pd.read_csv(bm_path, header=0, index_col=None)
data = (
gtex.merge(bm, on=["geneSymbol", "gencodeId", "transcriptId"], how="outer")
.merge(
mane,
on=["geneSymbol", "gencodeId", "transcriptId", "refseq"],
how="left",
)
.sort_values(["median", "MANE_status"])
)
return data


def write_data(data: pd.DataFrame, writer: pd.ExcelWriter) -> None:
"""Write a DataFrame to an Excel file.
Note
----
This function is best used within a ``with`` block, so that:
#. The ``ExcelWriter`` is already open.
#. It will be properly closed.
Parameters
----------
data : pd.DataFrame
The DataFrame to be written.
writer : pd.ExcelWriter
An **open** pandas ExcelWriter.
"""
gene = data["geneSymbol"].unique()[0]
data.to_excel(writer, index=False, sheet_name=gene)
logger.info(f"{gene} add to output file.")
19 changes: 3 additions & 16 deletions scripts/multithreading/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from typing import Union

import pandas as pd
from data_handling.process import merge_data, write_data

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -96,19 +97,7 @@ def _producer(self) -> None:
while (gtex_path := self.gtex.pop(0)) is not None and (
bm_path := self.bm.pop(0)
) is not None:
gtex = pd.read_csv(gtex_path, header=0, index_col=None)
bm = pd.read_csv(bm_path, header=0, index_col=None)
data = (
gtex.merge(
bm, on=["geneSymbol", "gencodeId", "transcriptId"], how="outer"
)
.merge(
self.mane,
on=["geneSymbol", "gencodeId", "transcriptId", "refseq"],
how="left",
)
.sort_values(["median", "MANE_status"])
)
data = merge_data(gtex_path, bm_path, self.mane)
self._q.put(data)
logger.info(f"Contents of file {gtex_path} added to queue")
else:
Expand All @@ -123,9 +112,7 @@ def _consumer(self) -> None:
queue is a FIFO queue.
"""
while (data := self._q.get()) is not None:
gene = data["geneSymbol"].unique()[0]
data.to_excel(self.writer, index=False, sheet_name=gene)
logger.info(f"{gene} add to output file.")
write_data(data, self.writer)
self._q.task_done()
else:
logging.info("None received. Queue consumed.")
Expand Down

0 comments on commit 16d96e7

Please sign in to comment.