diff --git a/go.mod b/go.mod index ecb1792..95a2128 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( cloud.google.com/go/compute/metadata v0.2.3 cloud.google.com/go/secretmanager v1.11.2 + github.com/alicebob/miniredis/v2 v2.34.0 github.com/apex/log v1.9.0 github.com/cenkalti/backoff/v4 v4.1.3 github.com/go-test/deep v1.0.8 @@ -33,12 +34,14 @@ require ( ) require ( + github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect github.com/evanphx/json-patch v4.9.0+incompatible // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/imdario/mergo v0.3.5 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/yuin/gopher-lua v1.1.1 // indirect golang.org/x/sync v0.4.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect diff --git a/go.sum b/go.sum index 8ad23ea..be9c9d1 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,10 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.34.0 h1:mBFWMaJSNL9RwdGRyEDoAAv8OQc5UlEhLDQggTglU/0= +github.com/alicebob/miniredis/v2 v2.34.0/go.mod h1:kWShP4b58T1CW0Y5dViCd5ztzrDqRWqM3nksiyXk5s8= github.com/apex/log v1.9.0 h1:FHtw/xuaM8AgmvDDTI9fiwoAL25Sq2cxojnZICUU8l0= github.com/apex/log v1.9.0/go.mod h1:m82fZlWIuiWzWP04XCTXmnX0xRkYYbCdYn8jbJeLBEA= github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo= @@ -360,6 +364,8 @@ github.com/tj/go-spin v1.1.0/go.mod h1:Mg1mzmePZm4dva8Qz60H2lHwmJ2loum4VIrLgVnKw github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/sketch/countmin.go b/sketch/countmin.go new file mode 100644 index 0000000..e25930c --- /dev/null +++ b/sketch/countmin.go @@ -0,0 +1,126 @@ +package sketch + +import ( + "context" + "hash/fnv" + "strconv" + "time" + + "github.com/gomodule/redigo/redis" +) + +// CMSketch implements the Sketch interface using Redis as a backend +type CMSketch struct { + config Config + pool *redis.Pool + // nowFunc allows overriding time.Now for testing + nowFunc func() time.Time +} + +// New creates a new CMSketch with the given configuration and Redis connection pool. +func New(config Config, pool *redis.Pool) *CMSketch { + return &CMSketch{ + config: config, + pool: pool, + } +} + +// hash generates k different hash values for the given item. +// To generate the different hash values (one per row), we apply the FNV-1a hash +// function, chosen for its balance of speed and collision resistance in +// non-cryptographic applications. +func (s *CMSketch) hash(item string) []uint64 { + hashes := make([]uint64, s.config.Depth) + h := fnv.New64a() + + for i := 0; i < s.config.Depth; i++ { + h.Reset() + h.Write([]byte(item)) + h.Write([]byte{byte(i)}) // Add salt for different hashes + hashes[i] = h.Sum64() % uint64(s.config.Width) + } + + return hashes +} + +func (s *CMSketch) Increment(ctx context.Context, item string) error { + // Get current time window key + windowKey := s.getCurrentWindowKey() + + // Get hash values for item + hashes := s.hash(item) + + // Get connection from pool + conn := s.pool.Get() + defer conn.Close() + + // Send all commands to pipeline + for i, hash := range hashes { + key := s.getCounterKey(windowKey, i) + hashStr := strconv.FormatUint(hash, 10) + + conn.Send("HINCRBY", key, hashStr, 1) + conn.Send("EXPIRE", key, int(s.config.Window.Seconds()*2)) + } + + // Execute pipeline + return conn.Flush() +} + +// Count returns the minimum estimated count for the given item. +func (s *CMSketch) Count(ctx context.Context, item string) (int, error) { + windowKey := s.getCurrentWindowKey() + hashes := s.hash(item) + + conn := s.pool.Get() + defer conn.Close() + + var minCount int = -1 + + // Send all HGET commands to pipeline + for i, hash := range hashes { + key := s.getCounterKey(windowKey, i) + hashStr := strconv.FormatUint(hash, 10) + conn.Send("HGET", key, hashStr) + } + + // Execute pipeline + if err := conn.Flush(); err != nil { + return 0, err + } + + // Receive all responses + for range hashes { + value, err := redis.Int(conn.Receive()) + if err == redis.ErrNil { + value = 0 + } else if err != nil { + return 0, err + } + + if minCount == -1 || value < minCount { + minCount = value + } + } + + if minCount == -1 { + minCount = 0 + } + + return minCount, nil +} + +// Helper methods for key management +// getCurrentWindowKey returns the key for the current time window. +func (s *CMSketch) getCurrentWindowKey() string { + now := time.Now().UTC() + if s.nowFunc != nil { + now = s.nowFunc() + } + return now.Format("2006-01-02T15:04") +} + +// getCounterKey returns the key for the counter at the given depth for the given window. +func (s *CMSketch) getCounterKey(windowKey string, depth int) string { + return s.config.RedisKeyPrefix + ":" + windowKey + ":" + strconv.Itoa(depth) +} diff --git a/sketch/countmin_test.go b/sketch/countmin_test.go new file mode 100644 index 0000000..5ae7695 --- /dev/null +++ b/sketch/countmin_test.go @@ -0,0 +1,170 @@ +package sketch + +import ( + "context" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/gomodule/redigo/redis" +) + +func setupTestRedis(t *testing.T) (*redis.Pool, func()) { + mr, err := miniredis.Run() + if err != nil { + t.Fatal(err) + } + + pool := &redis.Pool{ + MaxIdle: 1, + IdleTimeout: 10 * time.Second, + Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", mr.Addr()) + }, + } + + return pool, func() { + pool.Close() + mr.Close() + } +} + +func TestCMSketch_SingleItem(t *testing.T) { + ctx := context.Background() + client, cleanup := setupTestRedis(t) + defer cleanup() + + config := Config{ + Width: 1000, + Depth: 3, + Window: time.Minute, + RedisKeyPrefix: "test", + } + + sketch := New(config, client) + + // Test single increment and count + err := sketch.Increment(ctx, "item1") + if err != nil { + t.Fatalf("Failed to increment: %v", err) + } + + count, err := sketch.Count(ctx, "item1") + if err != nil { + t.Fatalf("Failed to count: %v", err) + } + + if count != 1 { + t.Errorf("Expected count 1, got %d", count) + } +} + +func TestCMSketch_MultipleIncrements(t *testing.T) { + ctx := context.Background() + client, cleanup := setupTestRedis(t) + defer cleanup() + + config := Config{ + Width: 1000, + Depth: 3, + Window: time.Minute, + RedisKeyPrefix: "test", + } + + sketch := New(config, client) + + // Increment same item multiple times + expectedCount := 5 + for i := 0; i < expectedCount; i++ { + err := sketch.Increment(ctx, "item1") + if err != nil { + t.Fatalf("Failed to increment on iteration %d: %v", i, err) + } + } + + // Verify count + count, err := sketch.Count(ctx, "item1") + if err != nil { + t.Fatalf("Failed to count: %v", err) + } + + if count != expectedCount { + t.Errorf("Expected count %d, got %d", expectedCount, count) + } +} + +func TestCMSketch_TimeWindow(t *testing.T) { + ctx := context.Background() + client, cleanup := setupTestRedis(t) + defer cleanup() + + baseTime := time.Now().UTC() + config := Config{ + Width: 1000, + Depth: 3, + Window: time.Minute, + RedisKeyPrefix: "test", + } + + sketch := New(config, client) + + // Increment in current window + err := sketch.Increment(ctx, "item1") + if err != nil { + t.Fatalf("Failed to increment: %v", err) + } + + // Verify count in current window + count, err := sketch.Count(ctx, "item1") + if err != nil { + t.Fatalf("Failed to count: %v", err) + } + + if count != 1 { + t.Errorf("Expected count 1 in current window, got %d", count) + } + + // Move to next time window + sketch.nowFunc = func() time.Time { + return baseTime.Add(time.Minute) + } + + // Count should be 0 in new window + count, err = sketch.Count(ctx, "item1") + if err != nil { + t.Fatalf("Failed to count in second window: %v", err) + } + if count != 0 { + t.Errorf("Expected count 0 in new window, got %d", count) + } +} + +func TestCMSketch_RedisErrors(t *testing.T) { + ctx := context.Background() + pool, cleanup := setupTestRedis(t) + defer cleanup() + + config := Config{ + Width: 1000, + Depth: 3, + Window: time.Minute, + RedisKeyPrefix: "test", + } + + sketch := New(config, pool) + + // Force Redis connection failure by closing the pool. + cleanup() + + // Test Increment with broken Redis + err := sketch.Increment(ctx, "item1") + if err == nil { + t.Error("Expected error on Increment with broken Redis connection, got nil") + } + + // Test Count with broken Redis + _, err = sketch.Count(ctx, "item1") + if err == nil { + t.Error("Expected error on Count with broken Redis connection, got nil") + } +} diff --git a/sketch/interface.go b/sketch/interface.go new file mode 100644 index 0000000..ac86858 --- /dev/null +++ b/sketch/interface.go @@ -0,0 +1,30 @@ +package sketch + +import ( + "context" + "time" +) + +// Sketch represents a Redis-backed Count-Min Sketch for counting events +type Sketch interface { + // Increment adds 1 to the counters for the given item. + Increment(ctx context.Context, item string) error + + // Count returns the estimated count for the given item. + Count(ctx context.Context, item string) (int, error) +} + +// Config holds configuration for the Count-Min Sketch +type Config struct { + // Width is the number of counters per hash function + Width int + + // Depth is the number of hash functions + Depth int + + // Window is the duration of the counting window + Window time.Duration + + // RedisKeyPrefix is the prefix for all Redis keys used by this sketch + RedisKeyPrefix string +}