Skip to content

Commit

Permalink
restore task_pair
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Sep 5, 2024
1 parent d2651c8 commit 7da7365
Showing 1 changed file with 48 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,35 @@ impl<'a> ExecuteContext<'a> {
}

pub fn task_pair(&self, task_id1: TaskId, task_id2: TaskId) -> (TaskGuard<'a>, TaskGuard<'a>) {
let (task1, task2) = self.backend.storage.access_pair_mut(task_id1, task_id2);
let (mut task1, mut task2) = self.backend.storage.access_pair_mut(task_id1, task_id2);
let is_restored1 = task1.persistance_state.is_restored();
let is_restored2 = task2.persistance_state.is_restored();
if !is_restored1 || !is_restored2 {
// Avoid holding the lock too long since this can also affect other tasks
drop(task1);
drop(task2);

let items1 =
(!is_restored1).then(|| self.backend.backing_storage.lookup_data(task_id1));
let items2 =
(!is_restored2).then(|| self.backend.backing_storage.lookup_data(task_id2));

let (t1, t2) = self.backend.storage.access_pair_mut(task_id1, task_id2);
task1 = t1;
task2 = t2;
if !task1.persistance_state.is_restored() {
for item in items1.unwrap() {
task1.add(item);
}
task1.persistance_state.set_restored();
}
if !task2.persistance_state.is_restored() {
for item in items2.unwrap() {
task2.add(item);
}
task2.persistance_state.set_restored();
}
}
(
TaskGuard {
task: task1,
Expand Down Expand Up @@ -305,6 +333,25 @@ impl<'a> TaskGuard<'a> {
pub fn iter(&self) -> impl Iterator<Item = (&CachedDataItemKey, &CachedDataItemValue)> {
self.task.iter()
}

pub(crate) fn invalidate_serialization(&mut self) {
let mut count = 0;
let cell_data = self.iter().filter_map(|(key, value)| match (key, value) {
(CachedDataItemKey::CellData { cell }, CachedDataItemValue::CellData { value }) => {
count += 1;
Some(CachedDataUpdate {
task: self.task_id,
key: CachedDataItemKey::CellData { cell: *cell },
value: Some(CachedDataItemValue::CellData {
value: value.clone(),
}),
})
}
_ => None,
});
self.backend.persisted_storage_log.lock().extend(cell_data);
self.task.persistance_state.add_persisting_items(count);
}
}

macro_rules! impl_operation {
Expand Down

0 comments on commit 7da7365

Please sign in to comment.