Skip to content

Commit

Permalink
Merge pull request #267 from SABS-R3-Epidemiology/secondary-infections
Browse files Browse the repository at this point in the history
Recording the serial interval
  • Loading branch information
mghosh00 authored Dec 5, 2024
2 parents 1cc1fb9 + d0564cd commit aacf8f4
Show file tree
Hide file tree
Showing 15 changed files with 848 additions and 80 deletions.
112 changes: 111 additions & 1 deletion pyEpiabm/pyEpiabm/core/person.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ def __init__(self, microcell, age_group=None):
self.secondary_infections_counts = []
self.time_of_recovery = None
self.num_times_infected = 0
self.latent_period = None
self.exposure_period = None
self.infector_latent_period = None
self.serial_interval_dict = {}
self.generation_time_dict = {}
self.care_home_resident = False
self.key_worker = False
self.date_positive = None
Expand Down Expand Up @@ -283,10 +288,115 @@ def increment_secondary_infections(self):
"""Increments the number of secondary infections the given person has
for this specific infection period (i.e. if the given person has been
infected multiple times, then we only increment the current secondary
infection count)
infection count).
"""
try:
self.secondary_infections_counts[-1] += 1
except IndexError:
raise RuntimeError("Cannot call increment_secondary_infections "
"while secondary_infections_counts is empty")

def set_latent_period(self, latent_period: float):
"""Sets the latent period of the current Person.
Parameters
----------
latent_period : float
The time between the exposure and infection onset of the current
Person.
"""
self.latent_period = latent_period

def set_exposure_period(self, exposure_period: float):
"""Sets the exposure period (we define here as the time between a
primary case infection and a secondary case exposure, with the current
`Person` being the secondary case). We store this to be added to the
latent period of the infection to give a serial interval.
Parameters
----------
exposure_period : float
The time between the infector's time of infection and the time
of exposure to the current Person
"""
self.exposure_period = exposure_period

def set_infector_latent_period(self, latent_period: float):
"""Sets the latent period of the primary infector of this Person. We
store this in order to calculate the generation_time of the interaction
between infector and infectee.
Parameters
----------
latent_period : float
The latency period of the primary infector (the individual who
infected the current Person).
"""
self.infector_latent_period = latent_period

def store_serial_interval(self):
"""Adds the `latent_period` to the current `exposure_period` to give
a `serial_interval`, which will be stored in the
`serial_interval_dict`. The serial interval is the time between a
primary case infection and a secondary case infection. This method
is called immediately after a person becomes exposed.
"""
# This method has been called erroneously if the latent period or
# exposure period is None
if self.exposure_period is None:
raise RuntimeError("Cannot call store_serial_interval while the"
" exposure_period is None")
elif self.latent_period is None:
raise RuntimeError("Cannot call store_serial_interval while the"
" latent_period is None")

serial_interval = self.exposure_period + self.latent_period
# The reference day is the day the primary case was first infected
# This is what we will store in the dictionary
reference_day = self.time_of_status_change - serial_interval
try:
(self.serial_interval_dict[reference_day]
.append(serial_interval))
except KeyError:
self.serial_interval_dict[reference_day] = [serial_interval]

# Reset the exposure period for the next infection
self.exposure_period = None

def store_generation_time(self):
"""Adds the `infector_latent_period` to the current
`exposure_period` to give a `generation_time`, which will be stored
in the `generation_time_dict`. The generation time is the time between
a primary case exposure and a secondary case exposure. This method
is called immediately after the infectee becomes exposed.
"""
# This method has been called erroneously if the exposure period is
# None or if the latent period of primary infector is None
if self.exposure_period is None:
raise RuntimeError("Cannot call store_generation_time while the"
" exposure_period is None")
elif self.latent_period is None:
raise RuntimeError("Cannot call store_generation_time while the"
" latent_period is None")
elif self.infector_latent_period is None:
if self.time_of_status_change - self.latent_period - \
self.exposure_period <= 0.0:
# We do not record the generation time if the infector has
# no latent period (if their time of infection was day 0)
return
raise RuntimeError("Cannot call store_generation_time while the"
" infector_latent_period is None")

generation_time = self.exposure_period + self.infector_latent_period
# The reference day is the day the primary case was first exposed
# This is what we will store in the dictionary
reference_day = (self.time_of_status_change - self.latent_period
- generation_time)
try:
(self.generation_time_dict[reference_day]
.append(generation_time))
except KeyError:
self.generation_time_dict[reference_day] = [generation_time]

# Reset the latency period of the infector for the next infection
self.infector_latent_period = None
151 changes: 138 additions & 13 deletions pyEpiabm/pyEpiabm/routine/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def configure(self,
a csv file containing infectiousness (viral load) values
* `secondary_infections_output`: Boolean to determine whether we \
need a csv file containing secondary infections and R_t values
* `serial_interval_output`: Boolean to determine whether we \
need a csv file containing serial interval data
* `generation_time_output`: Boolean to determine whether we \
need a csv file containing generation time data
* `compress`: Boolean to determine whether we compress \
the infection history csv files
Expand All @@ -87,16 +91,16 @@ def configure(self,
inf_history_params : dict
This is short for 'infection history file parameters' and we will
use the abbreviation 'ih' to refer to infection history throughout
this class. If `status_output`, `infectiousness_output` and
`secondary_infections_output` are all False, then no infection
history csv files are produced (or if the dictionary is None).
These files contain the infection status, infectiousness and
secondary infection counts of each person every time step
respectively. The EpiOS tool
(https://github.com/SABS-R3-Epidemiology/EpiOS) samples data from
these files to mimic real life epidemic sampling techniques. These
files can be compressed when 'compress' is True, reducing the size
of these files.
this class. If `status_output`, `infectiousness_output`,
`secondary_infections_output`, `serial_interval_output` and
`generation_time_output` are all False, then no infection history
csv files are produced (or if the dictionary is None). These files
contain the infection status, infectiousness and secondary
infection counts of each person every time step respectively. The
EpiOS tool (https://github.com/SABS-R3-Epidemiology/EpiOS) samples
data from these files to mimic real life epidemic sampling
techniques. These files can be compressed when 'compress' is True,
reducing the size of these files.
"""
self.sim_params = sim_params
self.population = population
Expand Down Expand Up @@ -154,9 +158,13 @@ def configure(self,
self.status_output = False
self.infectiousness_output = False
self.secondary_infections_output = False
self.serial_interval_output = False
self.generation_time_output = False
self.ih_status_writer = None
self.ih_infectiousness_writer = None
self.secondary_infections_writer = None
self.serial_interval_writer = None
self.generation_time_writer = None
self.compress = False

if inf_history_params:
Expand All @@ -168,19 +176,32 @@ def configure(self,
.get("infectiousness_output")
self.secondary_infections_output = inf_history_params\
.get("secondary_infections_output")
self.serial_interval_output = inf_history_params \
.get("serial_interval_output")
self.generation_time_output = inf_history_params \
.get("generation_time_output")
self.compress = inf_history_params.get("compress", False)
person_ids = []
person_ids += [person.id for cell in population.cells for person
in cell.persons]
self.ih_output_titles = ["time"] + person_ids
self.Rt_output_titles = ["time"] + person_ids + ["R_t"]
ts = 1 / Parameters.instance().time_steps_per_day
times = np.arange(self.sim_params["simulation_start_time"],
self.sim_params["simulation_end_time"] + ts,
ts).tolist()
self.si_output_titles = times
ih_folder = os.path.join(os.getcwd(),
inf_history_params["output_dir"])

if not (self.status_output or self.infectiousness_output
or self.secondary_infections_output):
logging.warning("status_output, infectiousness_output and "
+ "secondary_infections_output are False. "
or self.secondary_infections_output
or self.serial_interval_output
or self.generation_time_output):
logging.warning("status_output, infectiousness_output, "
+ "secondary_infections_output, "
+ "serial_interval_output and "
+ "generation_time_output are False. "
+ "No infection history csvs will be created.")

if self.status_output:
Expand Down Expand Up @@ -219,6 +240,30 @@ def configure(self,
self.Rt_output_titles
)

if self.serial_interval_output:

file_name = "serial_intervals.csv"
logging.info(
f"Set serial interval location to "
f"{os.path.join(ih_folder, file_name)}")

self.serial_interval_writer = _CsvDictWriter(
ih_folder, file_name,
self.si_output_titles
)

if self.generation_time_output:

file_name = "generation_times.csv"
logging.info(
f"Set generation time location to "
f"{os.path.join(ih_folder, file_name)}")

self.generation_time_writer = _CsvDictWriter(
ih_folder, file_name,
self.si_output_titles
)

@log_exceptions()
def run_sweeps(self):
"""Iteration step of the simulation. First the initialisation sweeps
Expand Down Expand Up @@ -264,6 +309,10 @@ def run_sweeps(self):
logging.info(f"Final time {t} days reached")
if self.secondary_infections_writer:
self.write_to_Rt_file(times)
if self.serial_interval_writer:
self.write_to_serial_interval_file(times)
if self.generation_time_writer:
self.write_to_generation_time_file(times)

def write_to_file(self, time):
"""Records the count number of a given list of infection statuses
Expand Down Expand Up @@ -404,6 +453,82 @@ def write_to_Rt_file(self, times: np.array):
# Write each time step in dictionary form
self.secondary_infections_writer.write(dict_row)

def write_to_serial_interval_file(self, times: np.array):
"""Records the intervals between an infector and an infectee getting
infected to provide an overall serial interval for each time-step of
the epidemic. This can be used as a histogram of values for each
time step.
Parameters
----------
times : np.array
An array of all time steps of the simulation
"""
# Initialise the dataframe
all_times = np.hstack((np.array(self
.sim_params["simulation_start_time"]),
times))
data_dict = {time: [] for time in all_times}
for cell in self.population.cells:
for person in cell.persons:
# For every time the person was infected, add their list of
# serial intervals to the timepoint at which their infector
# became infected
for t_inf, intervals in person.serial_interval_dict.items():
data_dict[t_inf] += intervals

# Here we will fill out the rest of the dataframe with NaN values,
# as all lists will have different lengths
max_list_length = max([len(intervals)
for intervals in data_dict.values()])
for t in data_dict.keys():
data_dict[t] += ([np.nan] * (max_list_length - len(data_dict[t])))

# Change to dataframe to get the data in a list of dicts format
df = pd.DataFrame(data_dict)

list_of_dicts = df.to_dict(orient='records')
for row_t in list_of_dicts:
self.serial_interval_writer.write(row_t)

def write_to_generation_time_file(self, times: np.array):
"""Records the intervals between an infector and an infectee getting
exposed to provide an overall generation time for each time-step of
the epidemic. This can be used as a histogram of values for each
time step.
Parameters
----------
times : np.array
An array of all time steps of the simulation
"""
# Initialise the dataframe
all_times = np.hstack((np.array(self
.sim_params["simulation_start_time"]),
times))
data_dict = {time: [] for time in all_times}
for cell in self.population.cells:
for person in cell.persons:
# For every time the person was infected, add their list of
# generation times to the timepoint at which their infector
# became exposed
for t_inf, intervals in person.generation_time_dict.items():
data_dict[t_inf] += intervals

# Here we will fill out the rest of the dataframe with NaN values,
# as all lists will have different lengths
max_list_length = max([len(intervals)
for intervals in data_dict.values()])
for t in data_dict.keys():
data_dict[t] += ([np.nan] * (max_list_length - len(data_dict[t])))

# Change to dataframe to get the data in a list of dicts format
df = pd.DataFrame(data_dict)

list_of_dicts = df.to_dict(orient='records')
for row_t in list_of_dicts:
self.generation_time_writer.write(row_t)

def add_writer(self, writer: AbstractReporter):
self.writers.append(writer)

Expand Down
22 changes: 22 additions & 0 deletions pyEpiabm/pyEpiabm/sweep/abstract_sweep.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,25 @@ def __call__(self, time: float):
"""
raise NotImplementedError

@staticmethod
def store_infection_periods(infector, infectee, time):
"""Sets the exposure_period of the infectee (defined as the time
between the infector having status I and the infectee having status
E. Also sets stores the infector's latent period within the infectee
(to be used in calculating the generation time). This is called during
the daily sweeps.
Parameters
----------
infector : Person
Current primary case
infectee : Person
Current secondary case
time : float
Current simulation time
"""
inf_to_exposed = (time -
infector.infection_start_times[-1])
infectee.set_exposure_period(inf_to_exposed)
infectee.set_infector_latent_period(infector.latent_period)
9 changes: 9 additions & 0 deletions pyEpiabm/pyEpiabm/sweep/host_progression_sweep.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@ def update_time_status_change(self, person: Person, time: float):

person.time_of_status_change = time + transition_time

# Finally, if the person is Exposed, we can store their latency period
# as the transition_time. This can be used for calculating the serial
# interval. We can also store their generation time in this step.
if person.infection_status == InfectionStatus.Exposed:
latent_period = transition_time
person.set_latent_period(latent_period)
person.store_generation_time()
person.store_serial_interval()

def _updates_infectiousness(self, person: Person, time: float):
"""Updates infectiousness. Scales using the initial infectiousness
if the person is in an infectious state. Updates the infectiousness to
Expand Down
3 changes: 3 additions & 0 deletions pyEpiabm/pyEpiabm/sweep/household_sweep.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,6 @@ def __call__(self, time: float):
# Increment the infector's
# secondary_infections_count
infector.increment_secondary_infections()
# Stores the exposure period and infector's latent
# period within attributes of the infectee
self.store_infection_periods(infector, infectee, time)
Loading

0 comments on commit aacf8f4

Please sign in to comment.