Skip to content

Commit

Permalink
tweaks to ordering an timestamps
Browse files Browse the repository at this point in the history
* in the storage-controller report the correct dependencies for tables
* in the Coordinator register ReadPolicies
* in the storage collections install read holds with the existing collections read frontier, not the implied capability
* in storage collections set the write frontier of the new collection to the write frontier of the existing collection
  • Loading branch information
ParkMyCar committed Jan 2, 2025
1 parent 8412f3f commit 219f937
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 24 deletions.
21 changes: 16 additions & 5 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4805,16 +4805,15 @@ impl Coordinator {
let CatalogItem::Table(table) = &entry.item else {
panic!("programming error, expected table found {:?}", entry.item);
};
let table = table.clone();

let new_version = table.desc.latest_version();
let new_desc = table
.desc
.at_version(RelationVersionSelector::Specific(new_version));
let forget_ts = coord.get_local_write_ts().await.timestamp;
let register_ts = coord.get_local_write_ts().await.timestamp;

assert!(forget_ts <= register_ts);

// Alter the table description, creating a "new" collection.
coord
.controller
.storage
Expand All @@ -4823,13 +4822,25 @@ impl Coordinator {
new_global_id,
new_desc,
expected_version,
forget_ts,
register_ts,
)
.await
.expect("failed to alter desc of table");

coord.apply_local_write(register_ts).await;

// Initialize the ReadPolicy which ensures we have the correct read holds.
let compaction_window = table
.custom_logical_compaction_window
.unwrap_or(CompactionWindow::Default);
coord
.initialize_read_policies(
&crate::CollectionIdBundle {
storage_ids: btreeset![new_global_id],
compute_ids: BTreeMap::new(),
},
compaction_window,
)
.await;
})
.await?;

Expand Down
1 change: 0 additions & 1 deletion src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,6 @@ pub trait StorageController: Debug {
new_collection: GlobalId,
new_desc: RelationDesc,
expected_version: RelationVersion,
forget_ts: Self::Timestamp,
register_ts: Self::Timestamp,
) -> Result<(), StorageError<Self::Timestamp>>;

Expand Down
66 changes: 58 additions & 8 deletions src/storage-client/src/storage_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1091,11 +1091,21 @@ where
let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
for (key, (mut changes, frontier)) in collections_net {
if !changes.is_empty() {
// If the table has a "primary" collection, let that collection drive compaction.
let collection = collections.get(&key).expect("must still exist");
let should_emit_persist_compaction = !matches!(
collection.description.data_source,
DataSource::Table { primary: Some(_) }
);

if frontier.is_empty() {
info!(id = %key, "removing collection state because the since advanced to []!");
collections.remove(&key).expect("must still exist");
}
persist_compaction_commands.push((key, frontier));

if should_emit_persist_compaction {
persist_compaction_commands.push((key, frontier));
}
}
}

Expand Down Expand Up @@ -1856,7 +1866,12 @@ where
new_desc: RelationDesc,
expected_version: RelationVersion,
) -> Result<(), StorageError<Self::Timestamp>> {
let (data_shard, existing_since) = {
let (
data_shard,
existing_write_frontier,
existing_read_policy,
mut existing_read_capabilities,
) = {
let self_collections = self.collections.lock().expect("lock poisoned");
let existing = self_collections
.get(&existing_collection)
Expand All @@ -1869,7 +1884,9 @@ where

(
existing.collection_metadata.data_shard,
existing.description.since.clone(),
existing.write_frontier.clone(),
existing.read_policy.clone(),
existing.read_capabilities.clone(),
)
};

Expand Down Expand Up @@ -1923,7 +1940,7 @@ where
.open_data_handles(
&new_collection,
data_shard,
existing_since.as_ref(),
None,
new_desc.clone(),
&persist_client,
)
Expand All @@ -1932,6 +1949,16 @@ where
// TODO(alter_table): Do we need to advance the since of the table to match the time this
// new version was registered with txn-wal?

// Because this is a Table, we know it's managed by txn_wal, and thus it's logical write
// frontier is possibly in advance of the write_handle's upper. So we fast forward the
// write frontier to match that of the existing collection.
let write_frontier =
if PartialOrder::less_than(write_handle.upper(), &existing_write_frontier) {
existing_write_frontier
} else {
write_handle.upper().clone()
};

// Note: The new collection is now the "primary collection" so we specify `None` here.
let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
let collection_meta = CollectionMetadata {
Expand All @@ -1945,7 +1972,7 @@ where
let collection_state = CollectionState::new(
collection_desc,
since_handle.since().clone(),
write_handle.upper().clone(),
write_frontier,
Vec::new(),
collection_meta,
);
Expand Down Expand Up @@ -1973,10 +2000,33 @@ where
primary: Some(new_collection),
};
existing.storage_dependencies.push(new_collection);
self.install_collection_dependency_read_holds_inner(

// Install the relevant read capabilities on the new collection.
//
// Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
// here, but that only installed a ReadHold on the new collection for the implied
// capability of the existing collection. This would cause runtime panics because it
// would eventually result in negative read capabilities.
let mut changes = ChangeBatch::new();
for (time, diff) in existing_read_capabilities.updates() {
changes.update(time.clone(), *diff);
}
let mut updates = BTreeMap::from([(new_collection, changes)]);
StorageCollectionsImpl::update_read_capabilities_inner(
&self.cmd_tx,
&mut *self_collections,
existing_collection,
)?;
&mut updates,
);

// Note: The Coordinator will also set the ReadPolicy, but start by
// initializing it to the same policy as the existing collection to
// prevent some other action, e.g. the write frontier of the
// txn_wal shard advancing, from getting interleaved between this
// call and the Coordinator's call to set the ReadPolicy.
self.set_read_policies_inner(
&mut *self_collections,
vec![(new_collection, existing_read_policy)],
);
};

// TODO(alter_table): Support changes to sources.
Expand Down
26 changes: 16 additions & 10 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,11 @@ where
let mut new_webhook_statistic_entries = BTreeSet::new();

for (id, description, write, metadata) in to_register {
let is_in_txns = |id, metadata: &CollectionMetadata| {
metadata.txns_shard.is_some()
&& !(self.read_only && migrated_storage_collections.contains(&id))
};

let mut data_source = description.data_source;

to_execute.insert(id);
Expand Down Expand Up @@ -758,7 +763,9 @@ where
}

// Assert some invariants.
if !dependency_read_holds.is_empty() {
//
// TODO(parkmycar): Include Tables (is_in_txns) in this check.
if !dependency_read_holds.is_empty() && !is_in_txns(id, &metadata) {
// The dependency since cannot be beyond the dependent (our)
// upper unless the collection is new. In practice, the
// depdenency is the remap shard of a source (export), and if
Expand Down Expand Up @@ -1192,7 +1199,6 @@ where
new_collection: GlobalId,
new_desc: RelationDesc,
expected_version: RelationVersion,
forget_ts: Self::Timestamp,
register_ts: Self::Timestamp,
) -> Result<(), StorageError<Self::Timestamp>> {
let data_shard = {
Expand Down Expand Up @@ -1271,13 +1277,10 @@ where
primary: Some(new_collection),
};

self.persist_table_worker.update(
existing_collection,
new_collection,
forget_ts,
register_ts,
write_handle,
);
self.persist_table_worker
.register(register_ts, vec![(new_collection, write_handle)])
.await
.expect("table worker unexpectedly shut down");

Ok(())
}
Expand Down Expand Up @@ -3136,9 +3139,12 @@ where
let dependency = match &data_source {
DataSource::Introspection(_)
| DataSource::Webhook
| DataSource::Table { .. }
| DataSource::Table { primary: None }
| DataSource::Progress
| DataSource::Other => vec![],
DataSource::Table {
primary: Some(primary),
} => vec![*primary],
DataSource::IngestionExport { ingestion_id, .. } => {
// Ingestion exports depend on their primary source's remap
// collection.
Expand Down

0 comments on commit 219f937

Please sign in to comment.