Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Batch operation in Cassandra Delete() #4054

Merged
merged 3 commits into from
Mar 23, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions physical/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,21 @@ func (c *CassandraBackend) Delete(ctx context.Context, key string) error {
defer metrics.MeasureSince([]string{"cassandra", "delete"}, time.Now())

stmt := fmt.Sprintf(`DELETE FROM "%s" WHERE bucket = ? AND key = ?`, c.table)
batch := gocql.NewBatch(gocql.LoggedBatch)
for _, bucket := range c.buckets(key) {
batch.Entries = append(batch.Entries, gocql.BatchEntry{
Stmt: stmt,
Args: []interface{}{bucket, key}})
results := make(chan error)
buckets := c.buckets(key)

for _, bucket := range buckets {
go func(bucket string) {
results <- c.sess.Query(stmt, bucket, key).Exec()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing it this way means that the delete operation is not really atomic, but I see that Put() is also done this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The alternative approach, which also fixes the test, is:

batch := c.sess.NewBatch(gocql.LoggedBatch)
for _, bucket := range c.buckets(key) {
	batch.Query(stmt, bucket, key)
}

This is the Batch creation process shown in: https://github.com/gocql/gocql/blob/master/batch_test.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer the above. I think that batch.Query() is an append on the list of queries that will be executed, but we should double check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is. So that change is just leveraging the lib function, but we were doing the same thing I believe. c.sess.NewBatch() instead of gocql.NewBatch() has some significant differences though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are those differences/why should/would we pick one over the other? It feels way nicer to use a lib's batch operations than goroutines so if this works (one way or another via c.sess.NewBatch or gocql.NewBatch) probably Put should be fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now understand this more fully. c.sess.NewBatch() creates a batch that uses all of the session default parameters, whereas gocql.NewBatch() is just a Batch zero value (and is deprecated). The critical member is "defaultTimestamp", which defaults to true in the session, but is false if the Batch is created with gocql.NewBatch(). When true, the client specifies the query timestamp. All of the other queries (none of which use Batch) would be using client time, but the Batch before this PR will use server time:

If the client does not specify any timestamp, the server will assign one based on the time it receives the query. This can be a problem when the order of the writes matter
(https://docs.datastax.com/en/developer/java-driver/2.1/manual/query_timestamps/)

I agree Batch seems like the simpler approach, but both datastax docs (see link in commit message) and @obeattie have raised concerns about it. Specifically: if there are more than 65535 statements in the batch, it will simply fail. More generally, it places a heavier load on the cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the statement limit does exist as you pointed out, the more problematic limit I've run into is batch_size_fail_threshold_in_kb, which by default is set to 50KB.

}(bucket)
}

for i := 0; i < len(buckets); i++ {
if err := <-results; err != nil {
return err
}
}
return c.sess.ExecuteBatch(batch)
return nil
}

// List is used ot list all the keys under a given
Expand Down