Skip to content

Commit

Permalink
feat: optimize Spanner changestream metadata table (#32213)
Browse files Browse the repository at this point in the history
* feat: optimize Spanner changestream metadata table

* fix: linting

* tests: fixes admin dao tests
  • Loading branch information
thiagotnunes authored Aug 19, 2024
1 parent bed2e56 commit 6b4a7a5
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -78,6 +79,12 @@ public class PartitionMetadataAdminDao {
*/
public static final String COLUMN_FINISHED_AT = "FinishedAt";

/** Metadata table index for queries over the watermark column. */
public static final String WATERMARK_INDEX = "WatermarkIndex";

/** Metadata table index for queries over the created at / start timestamp columns. */
public static final String CREATED_AT_START_TIMESTAMP_INDEX = "CreatedAtStartTimestampIndex";

private static final int TIMEOUT_MINUTES = 10;
private static final int TTL_AFTER_PARTITION_FINISHED_DAYS = 1;

Expand Down Expand Up @@ -117,10 +124,10 @@ public class PartitionMetadataAdminDao {
* PartitionMetadataAdminDao#TTL_AFTER_PARTITION_FINISHED_DAYS} days.
*/
public void createPartitionMetadataTable() {
String metadataCreateStmt = "";
List<String> ddl = new ArrayList<>();
if (this.isPostgres()) {
// Literals need be added around literals to preserve casing.
metadataCreateStmt =
ddl.add(
"CREATE TABLE \""
+ tableName
+ "\"(\""
Expand All @@ -146,15 +153,37 @@ public void createPartitionMetadataTable() {
+ "\" SPANNER.COMMIT_TIMESTAMP,\""
+ COLUMN_FINISHED_AT
+ "\" SPANNER.COMMIT_TIMESTAMP,"
+ " PRIMARY KEY (\"PartitionToken\")"
+ " PRIMARY KEY (\""
+ COLUMN_PARTITION_TOKEN
+ "\")"
+ ")"
+ " TTL INTERVAL '"
+ TTL_AFTER_PARTITION_FINISHED_DAYS
+ " days' ON \""
+ COLUMN_FINISHED_AT
+ "\"";
+ "\"");
ddl.add(
"CREATE INDEX \""
+ WATERMARK_INDEX
+ "\" on \""
+ tableName
+ "\" (\""
+ COLUMN_WATERMARK
+ "\") INCLUDE (\""
+ COLUMN_STATE
+ "\")");
ddl.add(
"CREATE INDEX \""
+ CREATED_AT_START_TIMESTAMP_INDEX
+ "\" ON \""
+ tableName
+ "\" (\""
+ COLUMN_CREATED_AT
+ "\",\""
+ COLUMN_START_TIMESTAMP
+ "\")");
} else {
metadataCreateStmt =
ddl.add(
"CREATE TABLE "
+ tableName
+ " ("
Expand All @@ -180,16 +209,37 @@ public void createPartitionMetadataTable() {
+ " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
+ COLUMN_FINISHED_AT
+ " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
+ ") PRIMARY KEY (PartitionToken),"
+ ") PRIMARY KEY ("
+ COLUMN_PARTITION_TOKEN
+ "),"
+ " ROW DELETION POLICY (OLDER_THAN("
+ COLUMN_FINISHED_AT
+ ", INTERVAL "
+ TTL_AFTER_PARTITION_FINISHED_DAYS
+ " DAY))";
+ " DAY))");
ddl.add(
"CREATE INDEX "
+ WATERMARK_INDEX
+ " on "
+ tableName
+ " ("
+ COLUMN_WATERMARK
+ ") STORING ("
+ COLUMN_STATE
+ ")");
ddl.add(
"CREATE INDEX "
+ CREATED_AT_START_TIMESTAMP_INDEX
+ " ON "
+ tableName
+ " ("
+ COLUMN_CREATED_AT
+ ","
+ COLUMN_START_TIMESTAMP
+ ")");
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
databaseAdminClient.updateDatabaseDdl(
instanceId, databaseId, Collections.singletonList(metadataCreateStmt), null);
databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, null);
try {
// Initiate the request which returns an OperationFuture.
op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
Expand All @@ -212,15 +262,18 @@ public void createPartitionMetadataTable() {
* PartitionMetadataAdminDao#TIMEOUT_MINUTES} minutes.
*/
public void deletePartitionMetadataTable() {
String metadataDropStmt;
List<String> ddl = new ArrayList<>();
if (this.isPostgres()) {
metadataDropStmt = "DROP TABLE \"" + tableName + "\"";
ddl.add("DROP INDEX \"" + CREATED_AT_START_TIMESTAMP_INDEX + "\"");
ddl.add("DROP INDEX \"" + WATERMARK_INDEX + "\"");
ddl.add("DROP TABLE \"" + tableName + "\"");
} else {
metadataDropStmt = "DROP TABLE " + tableName;
ddl.add("DROP INDEX " + CREATED_AT_START_TIMESTAMP_INDEX);
ddl.add("DROP INDEX " + WATERMARK_INDEX);
ddl.add("DROP TABLE " + tableName);
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
databaseAdminClient.updateDatabaseDdl(
instanceId, databaseId, Collections.singletonList(metadataDropStmt), null);
databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, null);
try {
// Initiate the request which returns an OperationFuture.
op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.cloud.spanner.SpannerException;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
Expand Down Expand Up @@ -86,8 +87,11 @@ public void testCreatePartitionMetadataTable() throws Exception {
partitionMetadataAdminDao.createPartitionMetadataTable();
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull());
assertEquals(1, ((Collection<?>) statements.getValue()).size());
assertTrue(statements.getValue().iterator().next().contains("CREATE TABLE"));
assertEquals(3, ((Collection<?>) statements.getValue()).size());
Iterator<String> it = statements.getValue().iterator();
assertTrue(it.next().contains("CREATE TABLE"));
assertTrue(it.next().contains("CREATE INDEX"));
assertTrue(it.next().contains("CREATE INDEX"));
}

@Test
Expand All @@ -96,8 +100,11 @@ public void testCreatePartitionMetadataTablePostgres() throws Exception {
partitionMetadataAdminDaoPostgres.createPartitionMetadataTable();
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull());
assertEquals(1, ((Collection<?>) statements.getValue()).size());
assertTrue(statements.getValue().iterator().next().contains("CREATE TABLE \""));
assertEquals(3, ((Collection<?>) statements.getValue()).size());
Iterator<String> it = statements.getValue().iterator();
assertTrue(it.next().contains("CREATE TABLE \""));
assertTrue(it.next().contains("CREATE INDEX \""));
assertTrue(it.next().contains("CREATE INDEX \""));
}

@Test
Expand Down Expand Up @@ -129,8 +136,11 @@ public void testDeletePartitionMetadataTable() throws Exception {
partitionMetadataAdminDao.deletePartitionMetadataTable();
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull());
assertEquals(1, ((Collection<?>) statements.getValue()).size());
assertTrue(statements.getValue().iterator().next().contains("DROP TABLE"));
assertEquals(3, ((Collection<?>) statements.getValue()).size());
Iterator<String> it = statements.getValue().iterator();
assertTrue(it.next().contains("DROP INDEX"));
assertTrue(it.next().contains("DROP INDEX"));
assertTrue(it.next().contains("DROP TABLE"));
}

@Test
Expand All @@ -139,8 +149,11 @@ public void testDeletePartitionMetadataTablePostgres() throws Exception {
partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable();
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull());
assertEquals(1, ((Collection<?>) statements.getValue()).size());
assertTrue(statements.getValue().iterator().next().contains("DROP TABLE \""));
assertEquals(3, ((Collection<?>) statements.getValue()).size());
Iterator<String> it = statements.getValue().iterator();
assertTrue(it.next().contains("DROP INDEX \""));
assertTrue(it.next().contains("DROP INDEX \""));
assertTrue(it.next().contains("DROP TABLE \""));
}

@Test
Expand Down

0 comments on commit 6b4a7a5

Please sign in to comment.