diff --git a/jetstream/kv.go b/jetstream/kv.go index c16c57146..779f811d6 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -92,6 +92,10 @@ type ( RePublish *RePublish Mirror *StreamSource Sources []*StreamSource + + // Enable underlying stream compression. + // NOTE: Compression is supported for nats-server 2.10.0+ + Compression bool } KeyValueLister interface { @@ -123,6 +127,9 @@ type ( // Bytes returns the size in bytes of the bucket Bytes() uint64 + + // IsCompressed indicates if the data is compressed on disk + IsCompressed() bool } // KeyWatcher is what is returned when doing a watch. @@ -322,6 +329,10 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke if cfg.TTL > 0 && cfg.TTL < duplicateWindow { duplicateWindow = cfg.TTL } + var compression StoreCompression + if cfg.Compression { + compression = S2Compression + } scfg := StreamConfig{ Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket), Description: cfg.Description, @@ -339,6 +350,7 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke MaxConsumers: -1, AllowDirect: true, RePublish: cfg.RePublish, + Compression: compression, } if cfg.Mirror != nil { // Copy in case we need to make changes so we do not change caller's version. @@ -479,6 +491,9 @@ func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo { return s.nfo } // Bytes is the size of the stream func (s *KeyValueBucketStatus) Bytes() uint64 { return s.nfo.State.Bytes } +// IsCompressed indicates if the data is compressed on disk +func (s *KeyValueBucketStatus) IsCompressed() bool { return s.nfo.Config.Compression != NoCompression } + type kvLister struct { kvs chan KeyValueStatus kvNames chan string diff --git a/jetstream/test/kv_test.go b/jetstream/test/kv_test.go index 788a31896..b3f7d4f25 100644 --- a/jetstream/test/kv_test.go +++ b/jetstream/test/kv_test.go @@ -1356,3 +1356,38 @@ func expectErr(t *testing.T, err error, expected ...error) { } t.Fatalf("Expected one of %+v, got '%v'", expected, err) } + +func TestKeyValueCompression(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx := context.Background() + + kvCompressed, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "A", + Compression: true, + }) + if err != nil { + t.Fatalf("Error creating kv: %v", err) + } + + status, err := kvCompressed.Status(ctx) + if err != nil { + t.Fatalf("Error getting bucket status: %v", err) + } + + if !status.IsCompressed() { + t.Fatalf("Expected bucket to be compressed") + } + + kvStream, err := js.Stream(ctx, "KV_A") + if err != nil { + t.Fatalf("Error getting stream info: %v", err) + } + + if kvStream.CachedInfo().Config.Compression != jetstream.S2Compression { + t.Fatalf("Expected stream to be compressed with S2") + } +} diff --git a/kv.go b/kv.go index ed4edc391..aa86d5aeb 100644 --- a/kv.go +++ b/kv.go @@ -95,6 +95,9 @@ type KeyValueStatus interface { // Bytes returns the size in bytes of the bucket Bytes() uint64 + + // IsCompressed indicates if the data is compressed on disk + IsCompressed() bool } // KeyWatcher is what is returned when doing a watch. @@ -249,6 +252,10 @@ type KeyValueConfig struct { RePublish *RePublish Mirror *StreamSource Sources []*StreamSource + + // Enable underlying stream compression. + // NOTE: Compression is supported for nats-server 2.10.0+ + Compression bool } // Used to watch all keys. @@ -405,6 +412,10 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { if cfg.TTL > 0 && cfg.TTL < duplicateWindow { duplicateWindow = cfg.TTL } + var compression StoreCompression + if cfg.Compression { + compression = S2Compression + } scfg := &StreamConfig{ Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket), Description: cfg.Description, @@ -422,6 +433,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { MaxConsumers: -1, AllowDirect: true, RePublish: cfg.RePublish, + Compression: compression, } if cfg.Mirror != nil { // Copy in case we need to make changes so we do not change caller's version. @@ -1040,6 +1052,9 @@ func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo { return s.nfo } // Bytes is the size of the stream func (s *KeyValueBucketStatus) Bytes() uint64 { return s.nfo.State.Bytes } +// IsCompressed indicates if the data is compressed on disk +func (s *KeyValueBucketStatus) IsCompressed() bool { return s.nfo.Config.Compression != NoCompression } + // Status retrieves the status and configuration of a bucket func (kv *kvs) Status() (KeyValueStatus, error) { nfo, err := kv.js.StreamInfo(kv.stream) diff --git a/test/kv_test.go b/test/kv_test.go index afa937de5..92f0ff7da 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -1395,3 +1395,37 @@ func TestKeyValueSourcing(t *testing.T) { t.Fatalf("Got error getting keyB from C: %v", err) } } + +func TestKeyValueCompression(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: "A", + Compression: true, + }) + if err != nil { + t.Fatalf("Error creating kv: %v", err) + } + + status, err := kv.Status() + if err != nil { + t.Fatalf("Error getting bucket status: %v", err) + } + + if !status.IsCompressed() { + t.Fatalf("Expected bucket to be compressed") + } + + kvStream, err := js.StreamInfo("KV_A") + if err != nil { + t.Fatalf("Error getting stream info: %v", err) + } + + if kvStream.Config.Compression != nats.S2Compression { + t.Fatalf("Expected stream to be compressed with S2") + } +}