From 2631a1d683033bec88833de4cdc3094bbb1a9aa3 Mon Sep 17 00:00:00 2001 From: claudevdm <33973061+claudevdm@users.noreply.github.com> Date: Wed, 29 Jan 2025 08:32:49 -0500 Subject: [PATCH 1/4] Support avro arrays for postgres insertion. (#2154) Co-authored-by: Claude --- .../teleport/v2/utils/DatastreamToDML.java | 1 - .../v2/utils/DatastreamToPostgresDML.java | 55 +++++++++++++ .../v2/utils/DatastreamToDMLTest.java | 82 +++++++++++++++++-- 3 files changed, 132 insertions(+), 6 deletions(-) diff --git a/v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/utils/DatastreamToDML.java b/v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/utils/DatastreamToDML.java index 6a208e3579..a626c9237d 100644 --- a/v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/utils/DatastreamToDML.java +++ b/v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/utils/DatastreamToDML.java @@ -295,7 +295,6 @@ public String getValueSql(JsonNode rowObj, String columnName, Map elements = new ArrayList<>(); + if (arrayNode.isArray()) { + for (JsonNode element : arrayNode) { + if (element.has("elementValue")) { + JsonNode elementValue = element.get("elementValue"); + if (!elementValue.isNull()) { + elements.add(formatArrayElement(elementValue)); + } else { + elements.add(getNullValueSql()); + } + } else if (!element.isNull()) { + elements.add(formatArrayElement(element)); + } + } + } + return "ARRAY[" + String.join(",", elements) + "]"; + } catch (JsonProcessingException e) { + LOG.error("Error parsing JSON array: {}", jsonValue); + return getNullValueSql(); + } + } + + private String formatArrayElement(JsonNode element) { + if (element.isTextual()) { + return "\'" + cleanSql(element.textValue()) + "\'"; + } + return element.toString(); + } } diff --git a/v2/datastream-to-sql/src/test/java/com/google/cloud/teleport/v2/utils/DatastreamToDMLTest.java b/v2/datastream-to-sql/src/test/java/com/google/cloud/teleport/v2/utils/DatastreamToDMLTest.java index 585efd2fbe..79fb365155 100644 --- a/v2/datastream-to-sql/src/test/java/com/google/cloud/teleport/v2/utils/DatastreamToDMLTest.java +++ b/v2/datastream-to-sql/src/test/java/com/google/cloud/teleport/v2/utils/DatastreamToDMLTest.java @@ -33,7 +33,7 @@ public class DatastreamToDMLTest { private static final Logger LOG = LoggerFactory.getLogger(DatastreamToDMLTest.class); - private String jsonString = + private static final String JSON_STRING = "{" + "\"text_column\":\"value\"," + "\"quoted_text_column\":\"Test Values: '!@#$%^\"," @@ -42,7 +42,7 @@ public class DatastreamToDMLTest { + "\"_metadata_table\":\"MY_TABLE$NAME\"" + "}"; - private JsonNode getRowObj() { + private JsonNode getRowObj(String jsonString) { ObjectMapper mapper = new ObjectMapper(); JsonNode rowObj; try { @@ -59,7 +59,7 @@ private JsonNode getRowObj() { */ @Test public void testGetValueSql() { - JsonNode rowObj = this.getRowObj(); + JsonNode rowObj = this.getRowObj(JSON_STRING); String expectedTextContent = "'value'"; String testSqlContent = @@ -82,6 +82,78 @@ public void testGetValueSql() { assertEquals(expectedNullByteTextContent, testNullByteSqlContent); } + /** + * Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts array + * data into correct integer array syntax. + */ + @Test + public void testIntArrayWithNullTypeCoercion() { + String arrayJson = + "{\"number_array\": {" + + "\"nestedArray\": [" + + " {\"nestedArray\": null, \"elementValue\": null}," + + " {\"nestedArray\": null, \"elementValue\": 456}" + + "], \"elementValue\": null}}"; + JsonNode rowObj = this.getRowObj(arrayJson); + Map tableSchema = new HashMap<>(); + tableSchema.put("number_array", "_int4"); + DatastreamToPostgresDML dml = DatastreamToPostgresDML.of(null); + String expectedInt = "ARRAY[NULL,456]"; + + String actualInt = + DatastreamToPostgresDML.of(null).getValueSql(rowObj, "number_array", tableSchema); + + assertEquals(expectedInt, actualInt); + } + + /** + * Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts array + * data into correct integer array syntax. + */ + @Test + public void testIntArrayTypeCoercion() { + String arrayJson = + "{\"number_array\": {" + + "\"nestedArray\": [" + + " {\"nestedArray\": null, \"elementValue\": 123}," + + " {\"nestedArray\": null, \"elementValue\": 456}" + + "], \"elementValue\": null}}"; + JsonNode rowObj = this.getRowObj(arrayJson); + Map tableSchema = new HashMap<>(); + tableSchema.put("number_array", "_int4"); + DatastreamToPostgresDML dml = DatastreamToPostgresDML.of(null); + String expectedInt = "ARRAY[123,456]"; + + String actualInt = + DatastreamToPostgresDML.of(null).getValueSql(rowObj, "number_array", tableSchema); + + assertEquals(expectedInt, actualInt); + } + + /** + * Test whether {@link DatastreamToPostgresDML#getValueSql(JsonNode, String, Map)} converts array + * data into correct text array syntax. + */ + @Test + public void testTextArrayTypeCoercion() { + String arrayJson = + "{\"text_array\": {" + + "\"nestedArray\": [" + + " {\"nestedArray\": null, \"elementValue\": \"apple\"}," + + " {\"nestedArray\": null, \"elementValue\": \"cherry\"}" + + "], \"elementValue\": null}}"; + JsonNode rowObj = this.getRowObj(arrayJson); + Map tableSchema = new HashMap<>(); + tableSchema.put("text_array", "_text"); + DatastreamToPostgresDML dml = DatastreamToPostgresDML.of(null); + String expectedInt = "ARRAY['apple','cherry']"; + + String actualInt = + DatastreamToPostgresDML.of(null).getValueSql(rowObj, "text_array", tableSchema); + + assertEquals(expectedInt, actualInt); + } + /** * Test whether {@link DatastreamToDML#getTargetSchemaName} converts the Oracle schema into the * correct Postgres schema. @@ -89,7 +161,7 @@ public void testGetValueSql() { @Test public void testGetPostgresSchemaName() { DatastreamToDML datastreamToDML = DatastreamToPostgresDML.of(null); - JsonNode rowObj = this.getRowObj(); + JsonNode rowObj = this.getRowObj(JSON_STRING); DatastreamRow row = DatastreamRow.of(rowObj); String expectedSchemaName = "my_schema"; @@ -104,7 +176,7 @@ public void testGetPostgresSchemaName() { @Test public void testGetPostgresTableName() { DatastreamToDML datastreamToDML = DatastreamToPostgresDML.of(null); - JsonNode rowObj = this.getRowObj(); + JsonNode rowObj = this.getRowObj(JSON_STRING); DatastreamRow row = DatastreamRow.of(rowObj); String expectedTableName = "my_table$name"; From 4e82f2bb56176d616e0d78ca4603c7eaddad801b Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 31 Jan 2025 15:22:44 -0500 Subject: [PATCH 2/4] Upgrade upload-artifact version (#2164) * Upgrade upload-artifact version * Fix naming * naming * naming * naming --- .github/actions/publish-site-report/action.yml | 2 +- .github/workflows/java-pr.yml | 12 ++++++------ .github/workflows/kafka-pr.yml | 12 ++++++------ .github/workflows/load-tests.yml | 2 +- .github/workflows/scorecards-analysis.yml | 2 +- .github/workflows/spanner-load-tests.yml | 10 +++++----- .github/workflows/spanner-pr.yml | 12 ++++++------ 7 files changed, 26 insertions(+), 26 deletions(-) diff --git a/.github/actions/publish-site-report/action.yml b/.github/actions/publish-site-report/action.yml index 2262f385db..67dcd298be 100644 --- a/.github/actions/publish-site-report/action.yml +++ b/.github/actions/publish-site-report/action.yml @@ -32,7 +32,7 @@ runs: shell: bash run: mvn -B surefire-report:report-only -f pom.xml -Daggregate=true -Denforcer.skip=true - name: Publish Site Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 with: name: ${{ inputs.output-zip-file }} path: 'target/site/' diff --git a/.github/workflows/java-pr.yml b/.github/workflows/java-pr.yml index a777e95712..f1765997e8 100644 --- a/.github/workflows/java-pr.yml +++ b/.github/workflows/java-pr.yml @@ -104,10 +104,10 @@ jobs: - name: Run Unit Tests run: ./cicd/run-unit-tests - name: Upload Unit Tests Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 if: always() # always run even if the previous step fails with: - name: surefire-test-results + name: surefire-unit-test-results path: '**/surefire-reports/TEST-*.xml' retention-days: 1 - name: Upload coverage reports to Codecov @@ -142,10 +142,10 @@ jobs: --it-artifact-bucket="cloud-teleport-testing-it-gitactions" \ --it-private-connectivity="datastream-private-connect-us-central1" - name: Upload Smoke Tests Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 if: always() # always run even if the previous step fails with: - name: surefire-test-results + name: surefire-smoke-test-results path: '**/surefire-reports/TEST-*.xml' retention-days: 1 - name: Cleanup Java Environment @@ -172,10 +172,10 @@ jobs: --it-artifact-bucket="cloud-teleport-testing-it-gitactions" \ --it-private-connectivity="datastream-private-connect-us-central1" - name: Upload Integration Tests Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 if: always() # always run even if the previous step fails with: - name: surefire-test-results + name: surefire-integration-test-results path: '**/surefire-reports/TEST-*.xml' retention-days: 1 - name: Cleanup Java Environment diff --git a/.github/workflows/kafka-pr.yml b/.github/workflows/kafka-pr.yml index e293699bd8..2b699e0e21 100644 --- a/.github/workflows/kafka-pr.yml +++ b/.github/workflows/kafka-pr.yml @@ -107,10 +107,10 @@ jobs: ./cicd/run-unit-tests \ --modules-to-build="KAFKA" - name: Upload Unit Tests Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 if: always() # always run even if the previous step fails with: - name: surefire-test-results + name: surefire-unit-test-results path: '**/surefire-reports/TEST-*.xml' retention-days: 1 - name: Upload coverage reports to Codecov @@ -144,10 +144,10 @@ jobs: --it-artifact-bucket="cloud-teleport-testing-it-gitactions" \ --it-private-connectivity="datastream-private-connect-us-central1" - name: Upload Smoke Tests Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 if: always() # always run even if the previous step fails with: - name: surefire-test-results + name: surefire-smoke-test-results path: '**/surefire-reports/TEST-*.xml' retention-days: 1 - name: Cleanup Java Environment @@ -173,10 +173,10 @@ jobs: --it-artifact-bucket="cloud-teleport-testing-it-gitactions" \ --it-private-connectivity="datastream-private-connect-us-central1" - name: Upload Integration Tests Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 if: always() # always run even if the previous step fails with: - name: surefire-test-results + name: surefire-integration-test-results path: '**/surefire-reports/TEST-*.xml' retention-days: 1 - name: Cleanup Java Environment diff --git a/.github/workflows/load-tests.yml b/.github/workflows/load-tests.yml index c1151079e4..2170209e02 100644 --- a/.github/workflows/load-tests.yml +++ b/.github/workflows/load-tests.yml @@ -38,7 +38,7 @@ jobs: env: HOST_IP: ${{ steps.variables.outputs.hostIP }} - name: Upload Load Tests Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 if: always() # always run even if the previous step fails with: name: surefire-test-results diff --git a/.github/workflows/scorecards-analysis.yml b/.github/workflows/scorecards-analysis.yml index 4836ed1215..6ef3f54a6a 100644 --- a/.github/workflows/scorecards-analysis.yml +++ b/.github/workflows/scorecards-analysis.yml @@ -42,7 +42,7 @@ jobs: # Upload the results as artifacts (optional). - name: "Upload artifact" - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 with: name: SARIF file path: results.sarif diff --git a/.github/workflows/spanner-load-tests.yml b/.github/workflows/spanner-load-tests.yml index 1a71637b41..c0d6f19517 100644 --- a/.github/workflows/spanner-load-tests.yml +++ b/.github/workflows/spanner-load-tests.yml @@ -62,10 +62,10 @@ jobs: with: filename: .github/ISSUE_TEMPLATE/spanner-load-test-failure-issue-template.md - name: Upload Load Tests Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 if: always() # always run even if the previous step fails with: - name: surefire-test-results + name: surefire-load-test-results path: '**/surefire-reports/TEST-*.xml' retention-days: 30 - name: Load Test report on GitHub @@ -117,11 +117,11 @@ jobs: with: filename: .github/ISSUE_TEMPLATE/spanner-load-test-failure-issue-template.md - name: Upload Load Test Observer Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 if: always() # always run even if the previous step fails with: - name: surefire-test-results + name: surefire-observer-test-results path: '**/surefire-reports/TEST-*.xml' retention-days: 1 - name: Cleanup Java Environment - uses: ./.github/actions/cleanup-java-env \ No newline at end of file + uses: ./.github/actions/cleanup-java-env diff --git a/.github/workflows/spanner-pr.yml b/.github/workflows/spanner-pr.yml index fe104d5764..b9845effab 100644 --- a/.github/workflows/spanner-pr.yml +++ b/.github/workflows/spanner-pr.yml @@ -111,10 +111,10 @@ jobs: ./cicd/run-unit-tests \ --modules-to-build="SPANNER" - name: Upload Unit Tests Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 if: always() # always run even if the previous step fails with: - name: surefire-test-results + name: surefire-unit-test-results path: '**/surefire-reports/TEST-*.xml' retention-days: 1 - name: Upload coverage reports to Codecov @@ -148,10 +148,10 @@ jobs: --it-artifact-bucket="cloud-teleport-testing-it-gitactions" \ --it-private-connectivity="datastream-private-connect-us-central1" - name: Upload Smoke Tests Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 if: always() # always run even if the previous step fails with: - name: surefire-test-results + name: surefire-smoke-test-results path: '**/surefire-reports/TEST-*.xml' retention-days: 1 - name: Cleanup Java Environment @@ -177,10 +177,10 @@ jobs: --it-artifact-bucket="cloud-teleport-testing-it-gitactions" \ --it-private-connectivity="datastream-private-connect-us-central1" - name: Upload Integration Tests Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + uses: actions/upload-artifact@v4 if: always() # always run even if the previous step fails with: - name: surefire-test-results + name: surefire-integration-test-results path: '**/surefire-reports/TEST-*.xml' retention-days: 10 - name: Cleanup Java Environment From 79c9fe9d9c3f5d856dfd689cdbf8109d46d50fc8 Mon Sep 17 00:00:00 2001 From: n-d-joshi Date: Mon, 3 Feb 2025 00:09:15 -0800 Subject: [PATCH 3/4] Search indexes (#2152) * Add support for Search Indexes * Reverted order of where clause * Add tests and fix formatting * Fixing tests and addressing comments * Addressing comments and simplifying code * Fixed formatting --- .../cloud/teleport/spanner/ddl/Index.java | 33 ++++++++++++- .../spanner/ddl/InformationSchemaScanner.java | 47 ++++++++++--------- .../spanner/DdlToAvroSchemaConverterTest.java | 11 ++++- .../teleport/spanner/ExportPipelineIT.java | 11 +++++ .../cloud/teleport/spanner/ddl/DdlTest.java | 34 ++++++++++++++ .../ddl/InformationSchemaScannerIT.java | 32 +++++++++++++ .../ddl/InformationSchemaScannerTest.java | 8 ++-- 7 files changed, 149 insertions(+), 27 deletions(-) diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Index.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Index.java index b4c35f6e72..9d1719780e 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Index.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Index.java @@ -85,7 +85,9 @@ public void prettyPrint(Appendable appendable) throws IOException { private void prettyPrintPg(Appendable appendable) throws IOException { appendable.append("CREATE"); - if (unique()) { + if (type() != null && (type().equals("SEARCH"))) { + appendable.append(" " + type()); + } else if (unique()) { appendable.append(" UNIQUE"); } appendable @@ -111,6 +113,28 @@ private void prettyPrintPg(Appendable appendable) throws IOException { appendable.append(" INCLUDE (").append(storingString).append(")"); } + if (partitionBy() != null) { + String partitionByString = + partitionBy().stream() + .map(c -> quoteIdentifier(c, dialect())) + .collect(Collectors.joining(",")); + + if (!partitionByString.isEmpty()) { + appendable.append(" PARTITION BY ").append(partitionByString); + } + } + + if (orderBy() != null) { + String orderByString = + orderBy().stream() + .map(c -> quoteIdentifier(c, dialect())) + .collect(Collectors.joining(",")); + + if (!orderByString.isEmpty()) { + appendable.append(" ORDER BY ").append(orderByString); + } + } + if (interleaveIn() != null) { appendable.append(" INTERLEAVE IN ").append(quoteIdentifier(interleaveIn(), dialect())); } @@ -118,6 +142,13 @@ private void prettyPrintPg(Appendable appendable) throws IOException { if (filter() != null && !filter().isEmpty()) { appendable.append(" WHERE ").append(filter()); } + + if (options() != null) { + String optionsString = String.join(",", options()); + if (!optionsString.isEmpty()) { + appendable.append(" WITH (").append(optionsString).append(")"); + } + } } private void prettyPrintGsql(Appendable appendable) throws IOException { diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java index 3fa757a27b..f8a791b970 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java @@ -118,9 +118,7 @@ public Ddl scan() { Map> indexes = Maps.newHashMap(); listIndexes(indexes); listIndexColumns(builder, indexes); - if (dialect == Dialect.GOOGLE_STANDARD_SQL) { - listIndexOptions(builder, indexes); - } + listIndexOptions(builder, indexes); for (Map.Entry> tableEntry : indexes.entrySet()) { String tableName = tableEntry.getKey(); @@ -364,10 +362,7 @@ private void listColumns(Ddl.Builder builder) { dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getBoolean(15) : resultSet.getString(15).equalsIgnoreCase("YES"); - boolean isPlacementKey = - dialect == Dialect.GOOGLE_STANDARD_SQL - ? resultSet.getBoolean(16) - : resultSet.getBoolean(16); + boolean isPlacementKey = resultSet.getBoolean(16); builder .createTable(tableName) @@ -463,20 +458,15 @@ private void listIndexes(Map> indexe : resultSet.getString(5).equalsIgnoreCase("YES"); String filter = resultSet.isNull(6) ? null : resultSet.getString(6); - // Note that 'type' is only queried from GoogleSQL and is not from Postgres and - // the number of columns will be different. - String type = - (dialect == Dialect.GOOGLE_STANDARD_SQL && !resultSet.isNull(7)) - ? resultSet.getString(7) - : null; + String type = !resultSet.isNull(7) ? resultSet.getString(7) : null; ImmutableList searchPartitionBy = - (dialect == Dialect.GOOGLE_STANDARD_SQL && !resultSet.isNull(8)) + !resultSet.isNull(8) ? ImmutableList.builder().addAll(resultSet.getStringList(8)).build() : null; ImmutableList searchOrderBy = - (dialect == Dialect.GOOGLE_STANDARD_SQL && !resultSet.isNull(9)) + !resultSet.isNull(9) ? ImmutableList.builder().addAll(resultSet.getStringList(9)).build() : null; @@ -513,10 +503,11 @@ Statement listIndexesSQL() { case POSTGRESQL: return Statement.of( "SELECT t.table_schema, t.table_name, t.index_name, t.parent_table_name, t.is_unique," - + " t.is_null_filtered, t.filter FROM information_schema.indexes AS t " + + " t.is_null_filtered, t.filter, t.index_type, t.search_partition_by, t.search_order_by" + + " FROM information_schema.indexes AS t " + " WHERE t.table_schema NOT IN " + " ('information_schema', 'spanner_sys', 'pg_catalog')" - + " AND t.index_type='INDEX' AND t.spanner_is_managed = 'NO' " + + " AND (t.index_type='INDEX' OR t.index_type='SEARCH') AND t.spanner_is_managed = 'NO' " + " ORDER BY t.table_name, t.index_name"); default: throw new IllegalArgumentException("Unrecognized dialect: " + dialect); @@ -533,8 +524,8 @@ private void listIndexColumns( String columnName = resultSet.getString(2); String ordering = resultSet.isNull(3) ? null : resultSet.getString(3); String indexLocalName = resultSet.getString(4); - String indexType = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getString(5) : null; - String spannerType = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getString(6) : null; + String indexType = resultSet.getString(5); + String spannerType = resultSet.getString(6); if (indexLocalName.equals("PRIMARY_KEY")) { IndexColumn.IndexColumnsBuilder pkBuilder = @@ -546,8 +537,10 @@ private void listIndexColumns( } pkBuilder.end().endTable(); } else { + String tokenlistType = dialect == Dialect.POSTGRESQL ? "spanner.tokenlist" : "TOKENLIST"; if (indexType != null && ordering != null) { - if ((indexType.equals("SEARCH") && !spannerType.equals("TOKENLIST")) + // Non-tokenlist columns should not be included in the key for Search Indexes. + if ((indexType.equals("SEARCH") && !spannerType.contains(tokenlistType)) || (indexType.equals("VECTOR") && !spannerType.startsWith("ARRAY"))) { continue; } @@ -567,8 +560,9 @@ private void listIndexColumns( } IndexColumn.IndexColumnsBuilder indexColumnsBuilder = indexBuilder.columns().create().name(columnName); + // Tokenlist columns do not have ordering. if (spannerType != null - && (spannerType.equals("TOKENLIST") || spannerType.startsWith("ARRAY"))) { + && (spannerType.equals(tokenlistType) || spannerType.startsWith("ARRAY"))) { indexColumnsBuilder.none(); } else if (ordering == null) { indexColumnsBuilder.storing(); @@ -605,7 +599,8 @@ Statement listIndexColumnsSQL() { + "ORDER BY t.table_name, t.index_name, t.ordinal_position"); case POSTGRESQL: return Statement.of( - "SELECT t.table_schema, t.table_name, t.column_name, t.column_ordering, t.index_name " + "SELECT t.table_schema, t.table_name, t.column_name, t.column_ordering, t.index_name," + + " t.index_type, t.spanner_type " + "FROM information_schema.index_columns AS t " + "WHERE t.table_schema NOT IN " + "('information_schema', 'spanner_sys', 'pg_catalog') " @@ -674,6 +669,14 @@ Statement listIndexOptionsSQL() { + " WHERE t.table_schema NOT IN" + " ('INFORMATION_SCHEMA', 'SPANNER_SYS')" + " ORDER BY t.table_name, t.index_name, t.option_name"); + case POSTGRESQL: + return Statement.of( + "SELECT t.table_schema, t.table_name, t.index_name, t.index_type," + + " t.option_name, t.option_type, t.option_value" + + " FROM information_schema.index_options AS t" + + " WHERE t.table_schema NOT IN" + + " ('information_schema', 'spanner_sys', 'pg_catalog') " + + " ORDER BY t.table_name, t.index_name, t.option_name"); default: throw new IllegalArgumentException("Unrecognized dialect: " + dialect); } diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java index a421555a5b..4149d4be7b 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java @@ -388,7 +388,10 @@ public void pgSimple() { .asc("last_name") .end() .indexes( - ImmutableList.of("CREATE INDEX \"UsersByFirstName\" ON \"Users\" (\"first_name\")")) + ImmutableList.of( + "CREATE INDEX \"UsersByFirstName\" ON \"Users\" (\"first_name\")", + "CREATE SEARCH INDEX \"SearchIndex\" ON \"Users\" (\"tokens\")" + + " WITH (sort_order_sharding=TRUE)")) .foreignKeys( ImmutableList.of( "ALTER TABLE \"Users\" ADD CONSTRAINT \"fk\" FOREIGN KEY (\"first_name\")" @@ -496,6 +499,12 @@ public void pgSimple() { assertThat( avroSchema.getProp(SPANNER_INDEX + "0"), equalTo("CREATE INDEX \"UsersByFirstName\" ON \"Users\" (\"first_name\")")); + + assertThat( + avroSchema.getProp(SPANNER_INDEX + "1"), + equalTo( + "CREATE SEARCH INDEX \"SearchIndex\" ON \"Users\" (\"tokens\") WITH (sort_order_sharding=TRUE)")); + assertThat( avroSchema.getProp(SPANNER_FOREIGN_KEY + "0"), equalTo( diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ExportPipelineIT.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ExportPipelineIT.java index 49a2f068b2..3566a260f4 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ExportPipelineIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ExportPipelineIT.java @@ -278,9 +278,16 @@ private void testPostgresSpannerToGCSAvroBase( + " \"NameTokens\" spanner.tokenlist generated always as (spanner.tokenize_fulltext(\"FirstName\")) stored hidden,\n" + "PRIMARY KEY(\"Id\"))", testName); + String createSearchIndexStatement = + String.format( + "CREATE SEARCH INDEX \"%s_SearchIndex\"\n" + + " ON \"%s_Singers\"(\"NameTokens\") ORDER BY \"Id\" WHERE \"Id\" IS NOT NULL\n" + + " WITH (sort_order_sharding=TRUE, disable_automatic_uid_column=TRUE)", + testName, testName); spannerResourceManager.executeDdlStatement(createEmptyTableStatement); spannerResourceManager.executeDdlStatement(createSingersTableStatement); + spannerResourceManager.executeDdlStatement(createSearchIndexStatement); List expectedData = generateTableRows(String.format("%s_Singers", testName)); spannerResourceManager.write(expectedData); PipelineLauncher.LaunchConfig.Builder options = @@ -305,6 +312,10 @@ private void testPostgresSpannerToGCSAvroBase( List emptyArtifacts = gcsClient.listArtifacts( "output/", Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "Empty"))); + List searchIndexArtifacts = + gcsClient.listArtifacts( + "output/", + Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "SearchIndex"))); assertThat(singersArtifacts).isNotEmpty(); assertThat(emptyArtifacts).isNotEmpty(); diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/DdlTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/DdlTest.java index fc46dd17c3..497db4d48e 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/DdlTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/DdlTest.java @@ -679,6 +679,40 @@ public void testSearchIndex() { + " STORING (`Data`) PARTITION BY `UserId`, INTERLEAVE IN `Users` OPTIONS (sort_order_sharding=TRUE)")); } + @Test + public void testpgSearchIndex() { + Index.Builder builder = + Index.builder(Dialect.POSTGRESQL) + .name("SearchIndex") + .type("SEARCH") + .table("Messages") + .interleaveIn("Users") + .partitionBy(ImmutableList.of("userid")) + .orderBy(ImmutableList.of("orderid")) + .options(ImmutableList.of("sort_order_sharding=TRUE")); + builder + .columns() + .create() + .name("subject_tokens") + .none() + .endIndexColumn() + .create() + .name("body_tokens") + .none() + .endIndexColumn() + .create() + .name("data") + .storing() + .endIndexColumn() + .end(); + Index index = builder.build(); + assertThat( + index.prettyPrint(), + equalToCompressingWhiteSpace( + "CREATE SEARCH INDEX \"SearchIndex\" ON \"Messages\"(\"subject_tokens\" , \"body_tokens\" )" + + " INCLUDE (\"data\") PARTITION BY \"userid\" ORDER BY \"orderid\" INTERLEAVE IN \"Users\" WITH (sort_order_sharding=TRUE)")); + } + @Test public void testVectorIndex() { Index.Builder builder = diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java index 7bd87aaba8..779a5706dd 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java @@ -735,6 +735,38 @@ public void searchIndexes() throws Exception { assertThat(ddl.prettyPrint(), equalToCompressingWhiteSpace(String.join("", statements))); } + @Test + public void pgSearchIndexes() throws Exception { + // Prefix indexes to ensure ordering. + List statements = + Arrays.asList( + "CREATE TABLE \"Users\" (" + + " \"userid\" bigint NOT NULL," + + " PRIMARY KEY (\"userid\")" + + " )", + " CREATE TABLE \"Messages\" (" + + " \"userid\" bigint NOT NULL," + + " \"messageid\" bigint NOT NULL," + + " \"orderid\" bigint NOT NULL," + + " \"subject\" character varying," + + " \"subject_tokens\" spanner.tokenlist GENERATED ALWAYS AS (spanner.tokenize_fulltext(subject)) STORED HIDDEN," + + " \"body\" character varying," + + " \"body_tokens\" spanner.tokenlist GENERATED ALWAYS AS (spanner.tokenize_fulltext(body)) STORED HIDDEN," + + " \"data\" character varying," + + " PRIMARY KEY (\"userid\", \"messageid\")" + + " ) INTERLEAVE IN PARENT \"Users\"", + " CREATE SEARCH INDEX \"SearchIndex\" ON \"Messages\"(\"subject_tokens\" , \"body_tokens\" )" + + " INCLUDE (\"data\")" + + " PARTITION BY \"userid\"" + + " ORDER BY \"orderid\"" + + " INTERLEAVE IN \"Users\"" + + " WITH (sort_order_sharding=TRUE)"); + + SPANNER_SERVER.createPgDatabase(dbId, statements); + Ddl ddl = getPgDatabaseDdl(); + assertThat(ddl.prettyPrint(), equalToCompressingWhiteSpace(String.join("", statements))); + } + @Test public void vectorIndexes() throws Exception { List statements = diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java index 6b57387b74..94dc1b8d46 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java @@ -100,10 +100,11 @@ public void testListIndexesSQL() { postgresSQLInfoScanner.listIndexesSQL().getSql(), equalToCompressingWhiteSpace( "SELECT t.table_schema, t.table_name, t.index_name, t.parent_table_name, t.is_unique," - + " t.is_null_filtered, t.filter FROM information_schema.indexes AS t " + + " t.is_null_filtered, t.filter, t.index_type, t.search_partition_by, t.search_order_by" + + " FROM information_schema.indexes AS t " + " WHERE t.table_schema NOT IN " + " ('information_schema', 'spanner_sys', 'pg_catalog')" - + " AND t.index_type='INDEX' AND t.spanner_is_managed = 'NO' " + + " AND (t.index_type='INDEX' OR t.index_type='SEARCH') AND t.spanner_is_managed = 'NO' " + " ORDER BY t.table_name, t.index_name")); } @@ -122,7 +123,8 @@ public void testListIndexColumnsSQL() { assertThat( postgresSQLInfoScanner.listIndexColumnsSQL().getSql(), equalToCompressingWhiteSpace( - "SELECT t.table_schema, t.table_name, t.column_name, t.column_ordering, t.index_name " + "SELECT t.table_schema, t.table_name, t.column_name, t.column_ordering, t.index_name, " + + "t.index_type, t.spanner_type " + "FROM information_schema.index_columns AS t " + "WHERE t.table_schema NOT IN " + "('information_schema', 'spanner_sys', 'pg_catalog') " From 2adaab67ea143e1bef64d3648fe216063b36bcc5 Mon Sep 17 00:00:00 2001 From: Vardhan Vinay Thigle <39047439+VardhanThigle@users.noreply.github.com> Date: Thu, 6 Feb 2025 15:56:14 +0530 Subject: [PATCH 4/4] Cassandra Bulk - List and Set - Avro Mapping. (#2165) * Adding List and Map tables in UT * Support for non-nested Cassandra List and Set --- .../cassandra/mappings/CassandraMappings.java | 56 ++++++- .../mappings/CassandraMappingsProvider.java | 137 +++++++++++++++--- .../rowmapper/CassandraRowMapper.java | 3 + .../CassandraRowValueArrayMapper.java | 48 ++++++ .../rowmapper/CassandraRowValueExtractor.java | 2 +- .../schema/CassandraSchemaDiscovery.java | 12 +- .../schema/typemapping/UnifiedTypeMapper.java | 3 + .../typemapping/UnifiedTypeMapping.java | 3 +- .../typemapping/provider/unified/Array.java | 40 +++++ .../unified/UnifiedMappingProvider.java | 25 ++++ .../CassandraIOWrapperHelperTest.java | 3 +- .../CassandraSourceRowMapperTest.java | 40 +++-- .../schema/CassandraSchemaDiscoveryTest.java | 24 ++- .../cassandra/testutils/BasicTestSchema.java | 125 +++++++++++++++- .../unified/UnifiedMappingProviderTest.java | 11 ++ .../test/resources/CassandraUT/basicTest.cql | 104 +++++++++++++ 16 files changed, 591 insertions(+), 45 deletions(-) create mode 100644 v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueArrayMapper.java create mode 100644 v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/Array.java diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappings.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappings.java index 7385d04c3a..b0d8bada71 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappings.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappings.java @@ -17,11 +17,13 @@ import com.google.auto.value.AutoValue; import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraFieldMapper; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueArrayMapper; import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueExtractor; import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueMapper; import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapping; import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.UnifiedMappingProvider; import com.google.common.collect.ImmutableMap; +import com.google.common.reflect.TypeToken; /** Represent Unified type mapping, value extractor and value mappings for Cassandra. */ @AutoValue @@ -40,20 +42,72 @@ public abstract static class Builder { abstract ImmutableMap.Builder> fieldMappingBuilder(); + /** + * Maintain mappings for a given type, as primitive as well as part of collections. + * + * @param cassandraType - name of the cassandra type, as discovered by the schema discovery. + * @param type - Unified mapping type. + * @param rowValueExtractor - {@link CassandraRowValueExtractor} to extract value from {@link + * com.datastax.driver.core.Row Cassandra Row} + * @param rowValueMapper - {@link CassandraRowValueMapper} to map value to {@link + * com.google.cloud.teleport.v2.source.reader.io.row.SourceRow} + * @param typeClass - Class of the extracted value. Generally return type of the + * rowValueExtractor. + * @return Builder + */ public Builder put( String cassandraType, UnifiedMappingProvider.Type type, CassandraRowValueExtractor rowValueExtractor, - CassandraRowValueMapper rowValueMapper) { + CassandraRowValueMapper rowValueMapper, + Class typeClass) { this.typeMappingBuilder() .put(cassandraType.toUpperCase(), UnifiedMappingProvider.getMapping(type)); this.fieldMappingBuilder() .put( cassandraType.toUpperCase(), CassandraFieldMapper.create(rowValueExtractor, rowValueMapper)); + if (!type.equals(UnifiedMappingProvider.Type.UNSUPPORTED)) { + putList(cassandraType, type, rowValueExtractor, rowValueMapper, typeClass); + putSet(cassandraType, type, rowValueExtractor, rowValueMapper, typeClass); + } return this; } + private void putList( + String cassandraType, + UnifiedMappingProvider.Type type, + CassandraRowValueExtractor rowValueExtractor, + CassandraRowValueMapper rowValueMapper, + Class typeClass) { + String listType = "LIST<" + cassandraType.toUpperCase() + ">"; + this.typeMappingBuilder().put(listType, UnifiedMappingProvider.getArrayMapping(type)); + TypeToken typeToken = TypeToken.of(typeClass); + this.fieldMappingBuilder() + .put( + listType, + CassandraFieldMapper.create( + (row, name) -> row.getList(name, typeToken), + CassandraRowValueArrayMapper.create(rowValueMapper))); + } + + private void putSet( + String cassandraType, + UnifiedMappingProvider.Type type, + CassandraRowValueExtractor rowValueExtractor, + CassandraRowValueMapper rowValueMapper, + Class typeClass) { + String setType = "SET<" + cassandraType.toUpperCase() + ">"; + TypeToken typeToken = TypeToken.of(typeClass); + this.typeMappingBuilder().put(setType, UnifiedMappingProvider.getArrayMapping(type)); + this.fieldMappingBuilder() + .put( + setType, + CassandraFieldMapper.create( + (row, name) -> row.getSet(name, typeToken), + CassandraRowValueArrayMapper.create(rowValueMapper))); + } + public abstract CassandraMappings build(); } } diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappingsProvider.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappingsProvider.java index dd89bf16bf..1bdca622f7 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappingsProvider.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappingsProvider.java @@ -26,8 +26,12 @@ import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomSchema.IntervalNano; import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.UnifiedMappingProvider; import com.google.common.collect.ImmutableMap; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.Date; +import java.util.UUID; import org.apache.avro.LogicalTypes; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.commons.codec.binary.Hex; @@ -40,7 +44,11 @@ public class CassandraMappingsProvider { private static final CassandraRowValueMapper toString = (value, schema) -> value.toString(); /** Pass the value as an integer to avro. */ - private static final CassandraRowValueMapper toInt = (value, schema) -> value.intValue(); + private static final CassandraRowValueMapper byteToInt = + (value, schema) -> value.intValue(); + + private static final CassandraRowValueMapper shortToInt = + (value, schema) -> value.intValue(); /** Map {@link ByteBuffer} to a Hex encoded String. */ private static final CassandraRowValueMapper ByteBufferToHexString = @@ -84,38 +92,121 @@ public class CassandraMappingsProvider { private static final CassandraMappings CASSANDRA_MAPPINGS = CassandraMappings.builder() - .put("ASCII", UnifiedMappingProvider.Type.STRING, Row::getString, valuePassThrough) - .put("BIGINT", UnifiedMappingProvider.Type.LONG, Row::getLong, valuePassThrough) - .put("BLOB", UnifiedMappingProvider.Type.STRING, Row::getBytes, ByteBufferToHexString) - .put("BOOLEAN", UnifiedMappingProvider.Type.BOOLEAN, Row::getBool, valuePassThrough) - .put("COUNTER", UnifiedMappingProvider.Type.LONG, Row::getLong, valuePassThrough) - .put("DATE", UnifiedMappingProvider.Type.DATE, Row::getDate, localDateToAvroLogicalDate) + .put( + "ASCII", + UnifiedMappingProvider.Type.STRING, + Row::getString, + valuePassThrough, + String.class) + .put( + "BIGINT", + UnifiedMappingProvider.Type.LONG, + Row::getLong, + valuePassThrough, + Long.class) + .put( + "BLOB", + UnifiedMappingProvider.Type.STRING, + Row::getBytes, + ByteBufferToHexString, + ByteBuffer.class) + .put( + "BOOLEAN", + UnifiedMappingProvider.Type.BOOLEAN, + Row::getBool, + valuePassThrough, + Boolean.class) + .put( + "COUNTER", + UnifiedMappingProvider.Type.LONG, + Row::getLong, + valuePassThrough, + Long.class) + .put( + "DATE", + UnifiedMappingProvider.Type.DATE, + Row::getDate, + localDateToAvroLogicalDate, + LocalDate.class) // The Cassandra decimal does not have precision and scale fixed in the // schema which would be needed if we want to map it to Avro Decimal. - .put("DECIMAL", UnifiedMappingProvider.Type.STRING, Row::getDecimal, toString) - .put("DOUBLE", UnifiedMappingProvider.Type.DOUBLE, Row::getDouble, valuePassThrough) - .put("DURATION", UnifiedMappingProvider.Type.INTERVAL_NANO, getDuration, durationToAvro) - .put("FLOAT", UnifiedMappingProvider.Type.FLOAT, Row::getFloat, valuePassThrough) - .put("INET", UnifiedMappingProvider.Type.STRING, Row::getInet, toString) - .put("INT", UnifiedMappingProvider.Type.INTEGER, Row::getInt, valuePassThrough) - .put("SMALLINT", UnifiedMappingProvider.Type.INTEGER, Row::getShort, toInt) - .put("TEXT", UnifiedMappingProvider.Type.STRING, Row::getString, valuePassThrough) + .put( + "DECIMAL", + UnifiedMappingProvider.Type.STRING, + Row::getDecimal, + toString, + BigDecimal.class) + .put( + "DOUBLE", + UnifiedMappingProvider.Type.DOUBLE, + Row::getDouble, + valuePassThrough, + Double.class) + .put( + "DURATION", + UnifiedMappingProvider.Type.INTERVAL_NANO, + getDuration, + durationToAvro, + Duration.class) + .put( + "FLOAT", + UnifiedMappingProvider.Type.FLOAT, + Row::getFloat, + valuePassThrough, + Float.class) + .put( + "INET", UnifiedMappingProvider.Type.STRING, Row::getInet, toString, InetAddress.class) + .put( + "INT", + UnifiedMappingProvider.Type.INTEGER, + Row::getInt, + valuePassThrough, + Integer.class) + .put( + "SMALLINT", + UnifiedMappingProvider.Type.INTEGER, + Row::getShort, + shortToInt, + Short.class) + .put( + "TEXT", + UnifiedMappingProvider.Type.STRING, + Row::getString, + valuePassThrough, + String.class) .put( "TIME", UnifiedMappingProvider.Type.INTERVAL_NANO, Row::getTime, - cassandraTimeToIntervalNano) - .put("TIMESTAMP", UnifiedMappingProvider.Type.TIMESTAMP, Row::getTimestamp, dateToAvro) - .put("TIMEUUID", UnifiedMappingProvider.Type.STRING, Row::getUUID, toString) - .put("TINYINT", UnifiedMappingProvider.Type.INTEGER, Row::getByte, toInt) - .put("UUID", UnifiedMappingProvider.Type.STRING, Row::getUUID, toString) - .put("VARCHAR", UnifiedMappingProvider.Type.STRING, Row::getString, valuePassThrough) - .put("VARINT", UnifiedMappingProvider.Type.NUMBER, Row::getVarint, toString) + cassandraTimeToIntervalNano, + Long.class) + .put( + "TIMESTAMP", + UnifiedMappingProvider.Type.TIMESTAMP, + Row::getTimestamp, + dateToAvro, + Date.class) + .put("TIMEUUID", UnifiedMappingProvider.Type.STRING, Row::getUUID, toString, UUID.class) + .put("TINYINT", UnifiedMappingProvider.Type.INTEGER, Row::getByte, byteToInt, Byte.class) + .put("UUID", UnifiedMappingProvider.Type.STRING, Row::getUUID, toString, UUID.class) + .put( + "VARCHAR", + UnifiedMappingProvider.Type.STRING, + Row::getString, + valuePassThrough, + String.class) + .put( + "VARINT", + UnifiedMappingProvider.Type.NUMBER, + Row::getVarint, + toString, + BigInteger.class) .put( "UNSUPPORTED", UnifiedMappingProvider.Type.UNSUPPORTED, (row, name) -> null, - (value, schema) -> null) + (value, schema) -> null, + null) .build(); private CassandraMappingsProvider() {} diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowMapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowMapper.java index 7097fa9295..8117a583fb 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowMapper.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowMapper.java @@ -36,6 +36,9 @@ abstract class CassandraRowMapper implements Transformer, Serial public static final ImmutableMap> MAPPINGS = CassandraMappingsProvider.getFieldMapping(); + /* + * TODO(vardhanvthigle): support nested collections. + */ public static CassandraRowMapper create( SourceSchemaReference sourceSchemaReference, SourceTableSchema sourceTableSchema) { return new AutoValue_CassandraRowMapper(sourceSchemaReference, sourceTableSchema); diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueArrayMapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueArrayMapper.java new file mode 100644 index 0000000000..51a3700f91 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueArrayMapper.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericRecordBuilder; +import org.checkerframework.checker.nullness.qual.NonNull; + +@AutoValue +public abstract class CassandraRowValueArrayMapper + implements CassandraRowValueMapper> { + + public static CassandraRowValueArrayMapper create( + CassandraRowValueMapper rowValueMapper) { + return new AutoValue_CassandraRowValueArrayMapper<>(rowValueMapper); + } + + abstract CassandraRowValueMapper rowValueMapper(); + + /** + * Map the extracted value to an object accepted by {@link GenericRecordBuilder#set(Field, + * Object)} as per the schema of the field. + * + * @param values extracted value collection. + * @param schema Avro Schema. + * @return mapped object. + */ + @Override + public Object map(@NonNull Iterable values, Schema schema) { + return ImmutableList.builder().addAll(values).build(); + } +} diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueExtractor.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueExtractor.java index 135f79c429..3ce1303f87 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueExtractor.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueExtractor.java @@ -20,7 +20,7 @@ import java.io.Serializable; import javax.annotation.Nullable; -public interface CassandraRowValueExtractor extends Serializable { +public interface CassandraRowValueExtractor extends Serializable { /** * Extract the requested field from the result set. diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscovery.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscovery.java index 5abbfd96a4..674b400430 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscovery.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscovery.java @@ -162,7 +162,17 @@ private ImmutableMap getTableColumns( for (ColumnMetadata columnMetadata : metadata.getTable(table).get().getColumns().values()) { String name = columnMetadata.getName().toString(); SourceColumnType sourceColumnType = - new SourceColumnType(columnMetadata.getType().toString(), new Long[] {}, new Long[] {}); + new SourceColumnType( + /* + * Get the name of the type as represented in CSql Language, using the driver's `asCql` wrapper. + * here we exclude the frozen keyword, as a type being frozen or not does not matter to the read pipeline. + */ + columnMetadata + .getType() + .asCql(false /*includeFrozen*/, true /*prettyPrint*/) + .toUpperCase(), + new Long[] {}, + new Long[] {}); tableSchemaBuilder.put(name, sourceColumnType); } return tableSchemaBuilder.build(); diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/UnifiedTypeMapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/UnifiedTypeMapper.java index b346ce86fa..e73a0cc3d9 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/UnifiedTypeMapper.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/UnifiedTypeMapper.java @@ -88,6 +88,9 @@ public Schema getSchema(SourceColumnType columnType) { : SchemaBuilder.builder().unionOf().nullType().and().type(schema).endUnion(); } + /* + * TODO(vardhanvthigle): Handle Nested collections. + */ private Schema getBasicSchema(SourceColumnType columnType) { return mappers .get(this.mapperType) diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/UnifiedTypeMapping.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/UnifiedTypeMapping.java index 4bfcd19bca..13e0653483 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/UnifiedTypeMapping.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/UnifiedTypeMapping.java @@ -15,6 +15,7 @@ */ package com.google.cloud.teleport.v2.source.reader.io.schema.typemapping; +import java.io.Serializable; import javax.annotation.Nullable; import org.apache.avro.Schema; @@ -24,7 +25,7 @@ * com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.MysqlMappingProvider * MysqlMappingProvider}. */ -public interface UnifiedTypeMapping { +public interface UnifiedTypeMapping extends Serializable { /** * Convert the Source Schema. diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/Array.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/Array.java new file mode 100644 index 0000000000..d9cbc51005 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/Array.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified; + +import com.google.auto.value.AutoValue; +import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapping; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.avro.Schema; + +/** + * Generates a Decimal Avro Type. + */ +@AutoValue +abstract class Array implements UnifiedTypeMapping, Serializable { + + public static Array create(UnifiedTypeMapping mapping) { + return new AutoValue_Array(mapping); + } + + abstract UnifiedTypeMapping mapping(); + + @Override + public Schema getSchema(@Nullable Long[] mods, @Nullable Long[] arrayBounds) { + return Schema.createArray(mapping().getSchema(mods, arrayBounds)); + } +} diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/UnifiedMappingProvider.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/UnifiedMappingProvider.java index d86c631b3f..30b11c33bf 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/UnifiedMappingProvider.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/UnifiedMappingProvider.java @@ -116,6 +116,31 @@ public static UnifiedTypeMapping getMapping(Type type) { return MAPPING.getOrDefault(type, new Unsupported()); } + /** + * Returns the {@link UnifiedTypeMapping} for an array of unified type mapping. + * + * @param type reference to the unified type to which an avro schema mapping for an array is + * requested. + * @return mapping implementation. Default is {@link Unsupported} for unrecognized type. + */ + public static UnifiedTypeMapping getArrayMapping(Type type) { + return getArrayMapping(MAPPING.getOrDefault(type, new Unsupported())); + } + + /** + * Returns the {@link UnifiedTypeMapping} for an array of unified type mapping. + * + * @param mapping reference to the unified type mapping to which an avro schema mapping for an + * array is requested. + * @return mapping implementation. Default is {@link Unsupported} for unrecognized type. + */ + public static UnifiedTypeMapping getArrayMapping(UnifiedTypeMapping mapping) { + if (mapping instanceof Unsupported) { + return mapping; + } + return Array.create(mapping); + } + /** Static final class. * */ private UnifiedMappingProvider() {} } diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraIOWrapperHelperTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraIOWrapperHelperTest.java index 9153a083f0..c39fc32ac7 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraIOWrapperHelperTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraIOWrapperHelperTest.java @@ -35,6 +35,7 @@ import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_CONFIG; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_CQLSH; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_KEYSPACE; +import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_TABLES; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.mock; @@ -138,7 +139,7 @@ public void testTablesToRead() { dataSource, CassandraIOWrapperHelper.buildSchemaDiscovery(), cassandraSchemaReference)) - .isEqualTo(List.of(BASIC_TEST_TABLE, PRIMITIVE_TYPES_TABLE)); + .isEqualTo(TEST_TABLES); assertThat( CassandraIOWrapperHelper.getTablesToRead( List.of(BASIC_TEST_TABLE), diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapperTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapperTest.java index f2ad23759d..1df9e2f9c4 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapperTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapperTest.java @@ -15,8 +15,12 @@ */ package com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper; +import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.LIST_TYPES_TABLE; +import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.LIST_TYPES_TABLE_AVRO_ROWS; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.PRIMITIVE_TYPES_TABLE; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.PRIMITIVE_TYPES_TABLE_AVRO_ROWS; +import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.SET_TYPES_TABLE; +import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.SET_TYPES_TABLE_AVRO_ROWS; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_CONFIG; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_CQLSH; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_KEYSPACE; @@ -44,6 +48,7 @@ import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType; import com.google.common.collect.ImmutableList; import java.io.IOException; +import java.util.List; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -74,6 +79,21 @@ public static void stopEmbeddedCassandra() throws Exception { @Test public void testCassandraSourceRowMapperBasic() throws RetriableSchemaDiscoveryException { + cassandraSourceRowMapperTestHelper(PRIMITIVE_TYPES_TABLE, PRIMITIVE_TYPES_TABLE_AVRO_ROWS); + } + + @Test + public void testCassandraSourceRowMapperList() throws RetriableSchemaDiscoveryException { + cassandraSourceRowMapperTestHelper(LIST_TYPES_TABLE, LIST_TYPES_TABLE_AVRO_ROWS); + } + + @Test + public void testCassandraSourceRowMapperSet() throws RetriableSchemaDiscoveryException { + cassandraSourceRowMapperTestHelper(SET_TYPES_TABLE, SET_TYPES_TABLE_AVRO_ROWS); + } + + private void cassandraSourceRowMapperTestHelper(String tableName, List expectedRows) + throws RetriableSchemaDiscoveryException { SourceSchemaReference sourceSchemaReference = SourceSchemaReference.ofCassandra( @@ -89,11 +109,10 @@ public void testCassandraSourceRowMapperBasic() throws RetriableSchemaDiscoveryE .build()); SourceTableSchema.Builder sourceTableSchemaBuilder = - SourceTableSchema.builder(MapperType.CASSANDRA).setTableName(PRIMITIVE_TYPES_TABLE); + SourceTableSchema.builder(MapperType.CASSANDRA).setTableName(tableName); new CassandraSchemaDiscovery() - .discoverTableSchema( - dataSource, sourceSchemaReference, ImmutableList.of(PRIMITIVE_TYPES_TABLE)) - .get(PRIMITIVE_TYPES_TABLE) + .discoverTableSchema(dataSource, sourceSchemaReference, ImmutableList.of(tableName)) + .get(tableName) .forEach(sourceTableSchemaBuilder::addSourceColumnNameToSourceColumnType); CassandraSourceRowMapper cassandraSourceRowMapper = @@ -103,7 +122,7 @@ public void testCassandraSourceRowMapperBasic() throws RetriableSchemaDiscoveryE .build(); ResultSet resultSet; - String query = "SELECT * FROM " + PRIMITIVE_TYPES_TABLE; + String query = "SELECT * FROM " + tableName; com.datastax.oss.driver.api.core.cql.SimpleStatement statement = SimpleStatement.newInstance(query); Cluster cluster = @@ -123,26 +142,23 @@ public void testCassandraSourceRowMapperBasic() throws RetriableSchemaDiscoveryE cassandraSourceRowMapper.map(resultSet).forEachRemaining(row -> readRowsBuilder.add(row)); ImmutableList readRows = readRowsBuilder.build(); - readRows.forEach(r -> assertThat(r.tableName() == PRIMITIVE_TYPES_TABLE)); + readRows.forEach(r -> assertThat(r.tableName() == tableName)); readRows.forEach(r -> assertThat(r.sourceSchemaReference() == sourceSchemaReference)); assertThat( readRows.stream() .map(r -> r.getPayload().toString()) .sorted() .collect(ImmutableList.toImmutableList())) - .isEqualTo( - PRIMITIVE_TYPES_TABLE_AVRO_ROWS.stream() - .sorted() - .collect(ImmutableList.toImmutableList())); + .isEqualTo(expectedRows.stream().sorted().collect(ImmutableList.toImmutableList())); // Since we will use CassandraIO only for reads, we don't need to support the `deleteAsync` // and `saveAsync` functions of the CassandraIO mapper interface. assertThrows( UnsupportedOperationException.class, - () -> cassandraSourceRowMapper.deleteAsync(readRows.get(1))); + () -> cassandraSourceRowMapper.deleteAsync(readRows.get(0))); assertThrows( UnsupportedOperationException.class, - () -> cassandraSourceRowMapper.saveAsync(readRows.get(1))); + () -> cassandraSourceRowMapper.saveAsync(readRows.get(0))); } } diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscoveryTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscoveryTest.java index d263c3b252..7d27feb7aa 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscoveryTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscoveryTest.java @@ -16,6 +16,8 @@ package com.google.cloud.teleport.v2.source.reader.io.cassandra.schema; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.BASIC_TEST_TABLE_SCHEMA; +import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.LIST_TEST_TABLE_SCHEMA; +import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.SET_TEST_TABLE_SCHEMA; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_CONFIG; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_CQLSH; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_KEYSPACE; @@ -126,6 +128,22 @@ public void testDiscoverTablesConfigFile() throws IOException, RetriableSchemaDi @Test public void testDiscoverTableSchemaBasic() throws IOException, RetriableSchemaDiscoveryException { + discoverTableSchemaTestHelper(BASIC_TEST_TABLE_SCHEMA); + } + + @Test + public void testDiscoverTableSchemaList() throws IOException, RetriableSchemaDiscoveryException { + discoverTableSchemaTestHelper(LIST_TEST_TABLE_SCHEMA); + } + + @Test + public void testDiscoverTableSchemaSet() throws IOException, RetriableSchemaDiscoveryException { + discoverTableSchemaTestHelper(SET_TEST_TABLE_SCHEMA); + } + + private void discoverTableSchemaTestHelper( + ImmutableMap> expecedSchema) + throws IOException, RetriableSchemaDiscoveryException { SourceSchemaReference cassandraSchemaReference = SourceSchemaReference.ofCassandra( @@ -142,10 +160,8 @@ public void testDiscoverTableSchemaBasic() throws IOException, RetriableSchemaDi CassandraSchemaDiscovery cassandraSchemaDiscovery = new CassandraSchemaDiscovery(); ImmutableMap> schema = cassandraSchemaDiscovery.discoverTableSchema( - cassandraDataSource, - cassandraSchemaReference, - BASIC_TEST_TABLE_SCHEMA.keySet().asList()); - assertThat(schema).isEqualTo(BASIC_TEST_TABLE_SCHEMA); + cassandraDataSource, cassandraSchemaReference, expecedSchema.keySet().asList()); + assertThat(schema).isEqualTo(expecedSchema); } @Test diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/BasicTestSchema.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/BasicTestSchema.java index 32d2b8aaef..b66b03c7ea 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/BasicTestSchema.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/BasicTestSchema.java @@ -31,6 +31,8 @@ public class BasicTestSchema { public static final String TEST_CQLSH = TEST_RESOURCE_ROOT + "basicTest.cql"; public static final String BASIC_TEST_TABLE = "basic_test_table"; public static final String PRIMITIVE_TYPES_TABLE = "primitive_types_table"; + public static final String LIST_TYPES_TABLE = "list_types_table"; + public static final String SET_TYPES_TABLE = "set_types_table"; public static final Long PRIMITIVE_TYPES_TABLE_ROW_COUNT = 6L; public static final ImmutableMap> BASIC_TEST_TABLE_SCHEMA = @@ -39,8 +41,122 @@ public class BasicTestSchema { ImmutableMap.of( "id", new SourceColumnType("TEXT", new Long[] {}, new Long[] {}), "name", new SourceColumnType("TEXT", new Long[] {}, new Long[] {}))); + public static final ImmutableMap> + LIST_TEST_TABLE_SCHEMA = + ImmutableMap.of( + LIST_TYPES_TABLE, + ImmutableMap.builder() + .put("primary_key", new SourceColumnType("UUID", new Long[] {}, new Long[] {})) + .put( + "ascii_list", + new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put( + "frozen_ascii_list", + new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put( + "bigint_list", + new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put( + "blob_list", new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put( + "boolean_list", + new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put( + "date_list", new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put( + "decimal_list", + new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put( + "double_list", + new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put( + "duration_list", + new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put( + "float_list", + new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put( + "inet_list", new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put("int_list", new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put( + "smallint_list", + new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put( + "text_list", new SourceColumnType("LIST", new Long[] {}, new Long[] {})) + .put( + "time_list", new SourceColumnType("LIST