Skip to content

Commit

Permalink
Fix Cassandra read bug when user query has no where clause (fixes #24829
Browse files Browse the repository at this point in the history
) (#24830)

* Fix Cassandra read bug when user query has no where clause

* update changelog

* fix format violation

* fix remaining format violation

* add unity test for read with unfiltered query

* fix unfiltered query unit test count validation

* use constant for num rows assertion on unfiltered query unit test

Co-authored-by: Yi Hu <[email protected]>

* fix cassandra query bugfix description

Co-authored-by: Yi Hu <[email protected]>

Co-authored-by: Lucas Marques <[email protected]>
Co-authored-by: Yi Hu <[email protected]>
  • Loading branch information
3 people authored Dec 30, 2022
1 parent 889fd8b commit ad72bac
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@

## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Avoids Cassandra syntax error when user-defined query has no where clause in it (Java) ([#24829](https://github.com/apache/beam/issues/24829)).

## Known Issues

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ private static String buildInitialQuery(Read<?> spec, Boolean hasRingRange) {
return (spec.query() == null)
? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), spec.table().get())
+ " WHERE "
: spec.query().get() + (hasRingRange ? " AND " : "");
: spec.query().get()
+ (hasRingRange
? spec.query().get().toUpperCase().contains("WHERE") ? " AND " : " WHERE "
: "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,11 @@ public KV<String, Integer> apply(Scientist scientist) {

@Test
public void testReadWithQuery() throws Exception {
String query =
String.format(
"select person_id, writetime(person_name) from %s.%s where person_id=10 AND person_department='logic'",
CASSANDRA_KEYSPACE, CASSANDRA_TABLE);

PCollection<Scientist> output =
pipeline.apply(
CassandraIO.<Scientist>read()
Expand All @@ -466,8 +471,7 @@ public void testReadWithQuery() throws Exception {
.withKeyspace(CASSANDRA_KEYSPACE)
.withTable(CASSANDRA_TABLE)
.withMinNumberOfSplits(20)
.withQuery(
"select person_id, writetime(person_name) from beam_ks.scientist where person_id=10 AND person_department='logic'")
.withQuery(query)
.withCoder(SerializableCoder.of(Scientist.class))
.withEntity(Scientist.class));

Expand All @@ -485,6 +489,39 @@ public void testReadWithQuery() throws Exception {
pipeline.run();
}

@Test
public void testReadWithUnfilteredQuery() throws Exception {
String query =
String.format(
"select person_id, writetime(person_name) from %s.%s",
CASSANDRA_KEYSPACE, CASSANDRA_TABLE);

PCollection<Scientist> output =
pipeline.apply(
CassandraIO.<Scientist>read()
.withHosts(Collections.singletonList(CASSANDRA_HOST))
.withPort(cassandraPort)
.withKeyspace(CASSANDRA_KEYSPACE)
.withTable(CASSANDRA_TABLE)
.withMinNumberOfSplits(20)
.withQuery(query)
.withCoder(SerializableCoder.of(Scientist.class))
.withEntity(Scientist.class));

PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(NUM_ROWS);
PAssert.that(output)
.satisfies(
input -> {
for (Scientist sci : input) {
assertNull(sci.name);
assertTrue(sci.nameTs != null && sci.nameTs > 0);
}
return null;
});

pipeline.run();
}

@Test
public void testWrite() {
ArrayList<ScientistWrite> data = new ArrayList<>();
Expand Down

0 comments on commit ad72bac

Please sign in to comment.