diff --git a/CHANGES.md b/CHANGES.md index c8d3eb97e889..e87cd3baf213 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -80,7 +80,7 @@ ## Bugfixes -* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Avoids Cassandra syntax error when user-defined query has no where clause in it (Java) ([#24829](https://github.com/apache/beam/issues/24829)). ## Known Issues diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java index 6bca1cf3d177..3bb536029187 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java @@ -141,6 +141,9 @@ private static String buildInitialQuery(Read spec, Boolean hasRingRange) { return (spec.query() == null) ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), spec.table().get()) + " WHERE " - : spec.query().get() + (hasRingRange ? " AND " : ""); + : spec.query().get() + + (hasRingRange + ? spec.query().get().toUpperCase().contains("WHERE") ? " AND " : " WHERE " + : ""); } } diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java index 7196556abc7c..a472b9ee1c3a 100644 --- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java @@ -458,6 +458,11 @@ public KV apply(Scientist scientist) { @Test public void testReadWithQuery() throws Exception { + String query = + String.format( + "select person_id, writetime(person_name) from %s.%s where person_id=10 AND person_department='logic'", + CASSANDRA_KEYSPACE, CASSANDRA_TABLE); + PCollection output = pipeline.apply( CassandraIO.read() @@ -466,8 +471,7 @@ public void testReadWithQuery() throws Exception { .withKeyspace(CASSANDRA_KEYSPACE) .withTable(CASSANDRA_TABLE) .withMinNumberOfSplits(20) - .withQuery( - "select person_id, writetime(person_name) from beam_ks.scientist where person_id=10 AND person_department='logic'") + .withQuery(query) .withCoder(SerializableCoder.of(Scientist.class)) .withEntity(Scientist.class)); @@ -485,6 +489,39 @@ public void testReadWithQuery() throws Exception { pipeline.run(); } + @Test + public void testReadWithUnfilteredQuery() throws Exception { + String query = + String.format( + "select person_id, writetime(person_name) from %s.%s", + CASSANDRA_KEYSPACE, CASSANDRA_TABLE); + + PCollection output = + pipeline.apply( + CassandraIO.read() + .withHosts(Collections.singletonList(CASSANDRA_HOST)) + .withPort(cassandraPort) + .withKeyspace(CASSANDRA_KEYSPACE) + .withTable(CASSANDRA_TABLE) + .withMinNumberOfSplits(20) + .withQuery(query) + .withCoder(SerializableCoder.of(Scientist.class)) + .withEntity(Scientist.class)); + + PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(NUM_ROWS); + PAssert.that(output) + .satisfies( + input -> { + for (Scientist sci : input) { + assertNull(sci.name); + assertTrue(sci.nameTs != null && sci.nameTs > 0); + } + return null; + }); + + pipeline.run(); + } + @Test public void testWrite() { ArrayList data = new ArrayList<>();