Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSGO-13 Add support for cassandra 4.0 table options #1791

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Change Batch API to be consistent with Query() (CASSGO-7)

- Added Cassandra 4.0 table options support(CASSGO-13)

### Fixed

- Retry policy now takes into account query idempotency (CASSGO-27)
Expand Down
25 changes: 20 additions & 5 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2314,7 +2314,7 @@ func TestViewMetadata(t *testing.T) {

func TestMaterializedViewMetadata(t *testing.T) {
if flagCassVersion.Before(3, 0, 0) {
return
t.Skip("The Cassandra version is too old")
}
session := createSession(t)
defer session.Close()
Expand All @@ -2333,14 +2333,19 @@ func TestMaterializedViewMetadata(t *testing.T) {
expectedChunkLengthInKB := "16"
expectedDCLocalReadRepairChance := float64(0)
expectedSpeculativeRetry := "99p"
expectedAdditionalWritePolicy := "99p"
expectedReadRepair := "BLOCKING"
if flagCassVersion.Before(4, 0, 0) {
expectedChunkLengthInKB = "64"
expectedDCLocalReadRepairChance = 0.1
expectedSpeculativeRetry = "99PERCENTILE"
expectedReadRepair = ""
expectedAdditionalWritePolicy = ""
}
expectedView1 := MaterializedViewMetadata{
Keyspace: "gocql_test",
Name: "view_view",
AdditionalWritePolicy: expectedAdditionalWritePolicy,
baseTableName: "view_table",
BloomFilterFpChance: 0.01,
Caching: map[string]string{"keys": "ALL", "rows_per_partition": "NONE"},
Expand All @@ -2352,12 +2357,17 @@ func TestMaterializedViewMetadata(t *testing.T) {
DefaultTimeToLive: 0,
Extensions: map[string]string{},
GcGraceSeconds: 864000,
IncludeAllColumns: false, MaxIndexInterval: 2048, MemtableFlushPeriodInMs: 0, MinIndexInterval: 128, ReadRepairChance: 0,
SpeculativeRetry: expectedSpeculativeRetry,
IncludeAllColumns: false, MaxIndexInterval: 2048,
MemtableFlushPeriodInMs: 0,
MinIndexInterval: 128,
ReadRepair: expectedReadRepair,
ReadRepairChance: 0,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, read_repair_chance was removed from C* 4.0.

https://issues.apache.org/jira/browse/CASSANDRA-15604

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this test also runs for C* 3.0.0, so it covers data for all versions. We can make separate tests for each version, but as mentioned in the description for the current PR, the main aim is to implement a backward-compatible metadata getter, and this test case helps to test one.

SpeculativeRetry: expectedSpeculativeRetry,
}
expectedView2 := MaterializedViewMetadata{
Keyspace: "gocql_test",
Name: "view_view2",
AdditionalWritePolicy: expectedAdditionalWritePolicy,
baseTableName: "view_table2",
BloomFilterFpChance: 0.01,
Caching: map[string]string{"keys": "ALL", "rows_per_partition": "NONE"},
Expand All @@ -2369,8 +2379,13 @@ func TestMaterializedViewMetadata(t *testing.T) {
DefaultTimeToLive: 0,
Extensions: map[string]string{},
GcGraceSeconds: 864000,
IncludeAllColumns: false, MaxIndexInterval: 2048, MemtableFlushPeriodInMs: 0, MinIndexInterval: 128, ReadRepairChance: 0,
SpeculativeRetry: expectedSpeculativeRetry,
IncludeAllColumns: false,
MaxIndexInterval: 2048,
MemtableFlushPeriodInMs: 0,
MinIndexInterval: 128,
ReadRepair: expectedReadRepair,
ReadRepairChance: 0,
SpeculativeRetry: expectedSpeculativeRetry,
}

expectedView1.BaseTableId = materializedViews[0].BaseTableId
Expand Down
238 changes: 186 additions & 52 deletions metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type ViewMetadata struct {
type MaterializedViewMetadata struct {
Keyspace string
Name string
AdditionalWritePolicy string
BaseTableId UUID
BaseTable *TableMetadata
BloomFilterFpChance float64
Expand All @@ -139,7 +140,8 @@ type MaterializedViewMetadata struct {
MaxIndexInterval int
MemtableFlushPeriodInMs int
MinIndexInterval int
ReadRepairChance float64
ReadRepair string // Only present in Cassandra 4.0+
ReadRepairChance float64 // Note: Cassandra 4.0 removed ReadRepairChance and added ReadRepair instead
SpeculativeRetry string

baseTableName string
Expand Down Expand Up @@ -999,69 +1001,201 @@ func getViewsMetadata(session *Session, keyspaceName string) ([]ViewMetadata, er
return views, nil
}

func bytesMapToStringsMap(byteData map[string][]byte) map[string]string {
extensions := make(map[string]string, len(byteData))
for key, rowByte := range byteData {
extensions[key] = string(rowByte)
}

return extensions
}

func materializedViewMetadataFromMap(currentObject map[string]interface{}, materializedView *MaterializedViewMetadata) error {
const errorMessage = "gocql.materializedViewMetadataFromMap failed to read column %s"
var ok bool
for key, value := range currentObject {
switch key {
case "keyspace_name":
materializedView.Keyspace, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "view_name":
materializedView.Name, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "additional_write_policy":
materializedView.AdditionalWritePolicy, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "base_table_id":
materializedView.BaseTableId, ok = value.(UUID)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "base_table_name":
materializedView.baseTableName, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "bloom_filter_fp_chance":
materializedView.BloomFilterFpChance, ok = value.(float64)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "caching":
materializedView.Caching, ok = value.(map[string]string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "comment":
materializedView.Comment, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "compaction":
materializedView.Compaction, ok = value.(map[string]string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "compression":
materializedView.Compression, ok = value.(map[string]string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "crc_check_chance":
materializedView.CrcCheckChance, ok = value.(float64)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "dclocal_read_repair_chance":
materializedView.DcLocalReadRepairChance, ok = value.(float64)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "default_time_to_live":
materializedView.DefaultTimeToLive, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "extensions":
byteData, ok := value.(map[string][]byte)
if !ok {
return fmt.Errorf(errorMessage, key)
}

materializedView.Extensions = bytesMapToStringsMap(byteData)

case "gc_grace_seconds":
materializedView.GcGraceSeconds, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "id":
materializedView.Id, ok = value.(UUID)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "include_all_columns":
materializedView.IncludeAllColumns, ok = value.(bool)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "max_index_interval":
materializedView.MaxIndexInterval, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "memtable_flush_period_in_ms":
materializedView.MemtableFlushPeriodInMs, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "min_index_interval":
materializedView.MinIndexInterval, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "read_repair":
materializedView.ReadRepair, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "read_repair_chance":
materializedView.ReadRepairChance, ok = value.(float64)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "speculative_retry":
materializedView.SpeculativeRetry, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

}
}
return nil
}

func parseSystemSchemaViews(iter *Iter) ([]MaterializedViewMetadata, error) {
var materializedViews []MaterializedViewMetadata
s, err := iter.SliceMap()
if err != nil {
return nil, err
}

for _, row := range s {
var materializedView MaterializedViewMetadata
err = materializedViewMetadataFromMap(row, &materializedView)
if err != nil {
return nil, err
}

materializedViews = append(materializedViews, materializedView)
}

return materializedViews, nil
}

func getMaterializedViewsMetadata(session *Session, keyspaceName string) ([]MaterializedViewMetadata, error) {
if !session.useSystemSchema {
return nil, nil
}
var tableName = "system_schema.views"
stmt := fmt.Sprintf(`
SELECT
view_name,
base_table_id,
base_table_name,
bloom_filter_fp_chance,
caching,
comment,
compaction,
compression,
crc_check_chance,
dclocal_read_repair_chance,
default_time_to_live,
extensions,
gc_grace_seconds,
id,
include_all_columns,
max_index_interval,
memtable_flush_period_in_ms,
min_index_interval,
read_repair_chance,
speculative_retry
SELECT *
FROM %s
WHERE keyspace_name = ?`, tableName)

var materializedViews []MaterializedViewMetadata

rows := session.control.query(stmt, keyspaceName).Scanner()
for rows.Next() {
materializedView := MaterializedViewMetadata{Keyspace: keyspaceName}
err := rows.Scan(&materializedView.Name,
&materializedView.BaseTableId,
&materializedView.baseTableName,
&materializedView.BloomFilterFpChance,
&materializedView.Caching,
&materializedView.Comment,
&materializedView.Compaction,
&materializedView.Compression,
&materializedView.CrcCheckChance,
&materializedView.DcLocalReadRepairChance,
&materializedView.DefaultTimeToLive,
&materializedView.Extensions,
&materializedView.GcGraceSeconds,
&materializedView.Id,
&materializedView.IncludeAllColumns,
&materializedView.MaxIndexInterval,
&materializedView.MemtableFlushPeriodInMs,
&materializedView.MinIndexInterval,
&materializedView.ReadRepairChance,
&materializedView.SpeculativeRetry,
)
if err != nil {
return nil, err
}
materializedViews = append(materializedViews, materializedView)
}
iter := session.control.query(stmt, keyspaceName)

if err := rows.Err(); err != nil {
materializedViews, err := parseSystemSchemaViews(iter)
if err != nil {
return nil, err
}

Expand Down