Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sketch package #210

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
cloud.google.com/go/compute/metadata v0.2.3
cloud.google.com/go/secretmanager v1.11.2
github.com/alicebob/miniredis/v2 v2.34.0
github.com/apex/log v1.9.0
github.com/cenkalti/backoff/v4 v4.1.3
github.com/go-test/deep v1.0.8
Expand Down Expand Up @@ -33,12 +34,14 @@ require (
)

require (
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
github.com/evanphx/json-patch v4.9.0+incompatible // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/imdario/mergo v0.3.5 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
golang.org/x/sync v0.4.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.34.0 h1:mBFWMaJSNL9RwdGRyEDoAAv8OQc5UlEhLDQggTglU/0=
github.com/alicebob/miniredis/v2 v2.34.0/go.mod h1:kWShP4b58T1CW0Y5dViCd5ztzrDqRWqM3nksiyXk5s8=
github.com/apex/log v1.9.0 h1:FHtw/xuaM8AgmvDDTI9fiwoAL25Sq2cxojnZICUU8l0=
github.com/apex/log v1.9.0/go.mod h1:m82fZlWIuiWzWP04XCTXmnX0xRkYYbCdYn8jbJeLBEA=
github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo=
Expand Down Expand Up @@ -360,6 +364,8 @@ github.com/tj/go-spin v1.1.0/go.mod h1:Mg1mzmePZm4dva8Qz60H2lHwmJ2loum4VIrLgVnKw
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down
126 changes: 126 additions & 0 deletions sketch/countmin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package sketch

import (
"context"
"hash/fnv"
"strconv"
"time"

"github.com/gomodule/redigo/redis"
)

// CMSketch implements the Sketch interface using Redis as a backend
type CMSketch struct {
config Config
pool *redis.Pool
// nowFunc allows overriding time.Now for testing
nowFunc func() time.Time
}

// New creates a new CMSketch with the given configuration and Redis connection pool.
func New(config Config, pool *redis.Pool) *CMSketch {
return &CMSketch{
config: config,
pool: pool,
}
}

// hash generates k different hash values for the given item.
// To generate the different hash values (one per row), we apply the FNV-1a hash
// function, chosen for its balance of speed and collision resistance in
// non-cryptographic applications.
func (s *CMSketch) hash(item string) []uint64 {
hashes := make([]uint64, s.config.Depth)
h := fnv.New64a()

for i := 0; i < s.config.Depth; i++ {
h.Reset()
h.Write([]byte(item))
h.Write([]byte{byte(i)}) // Add salt for different hashes
hashes[i] = h.Sum64() % uint64(s.config.Width)
}

return hashes
}

func (s *CMSketch) Increment(ctx context.Context, item string) error {
// Get current time window key
windowKey := s.getCurrentWindowKey()

// Get hash values for item
hashes := s.hash(item)

// Get connection from pool
conn := s.pool.Get()
defer conn.Close()

// Send all commands to pipeline
for i, hash := range hashes {
key := s.getCounterKey(windowKey, i)
hashStr := strconv.FormatUint(hash, 10)

conn.Send("HINCRBY", key, hashStr, 1)
conn.Send("EXPIRE", key, int(s.config.Window.Seconds()*2))
}

// Execute pipeline
return conn.Flush()
}

// Count returns the minimum estimated count for the given item.
func (s *CMSketch) Count(ctx context.Context, item string) (int, error) {
windowKey := s.getCurrentWindowKey()
hashes := s.hash(item)

conn := s.pool.Get()
defer conn.Close()

var minCount int = -1

// Send all HGET commands to pipeline
for i, hash := range hashes {
key := s.getCounterKey(windowKey, i)
hashStr := strconv.FormatUint(hash, 10)
conn.Send("HGET", key, hashStr)
}

// Execute pipeline
if err := conn.Flush(); err != nil {
return 0, err
}

// Receive all responses
for range hashes {
value, err := redis.Int(conn.Receive())
if err == redis.ErrNil {
value = 0
} else if err != nil {
return 0, err
}

if minCount == -1 || value < minCount {
minCount = value
}
}

if minCount == -1 {
minCount = 0
}

return minCount, nil
}

// Helper methods for key management
// getCurrentWindowKey returns the key for the current time window.
func (s *CMSketch) getCurrentWindowKey() string {
now := time.Now().UTC()
if s.nowFunc != nil {
now = s.nowFunc()
}
return now.Format("2006-01-02T15:04")
}

// getCounterKey returns the key for the counter at the given depth for the given window.
func (s *CMSketch) getCounterKey(windowKey string, depth int) string {
return s.config.RedisKeyPrefix + ":" + windowKey + ":" + strconv.Itoa(depth)
}
170 changes: 170 additions & 0 deletions sketch/countmin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package sketch

import (
"context"
"testing"
"time"

"github.com/alicebob/miniredis/v2"
"github.com/gomodule/redigo/redis"
)

func setupTestRedis(t *testing.T) (*redis.Pool, func()) {
mr, err := miniredis.Run()
if err != nil {
t.Fatal(err)
}

pool := &redis.Pool{
MaxIdle: 1,
IdleTimeout: 10 * time.Second,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", mr.Addr())
},
}

return pool, func() {
pool.Close()
mr.Close()
}
}

func TestCMSketch_SingleItem(t *testing.T) {
ctx := context.Background()
client, cleanup := setupTestRedis(t)
defer cleanup()

config := Config{
Width: 1000,
Depth: 3,
Window: time.Minute,
RedisKeyPrefix: "test",
}

sketch := New(config, client)

// Test single increment and count
err := sketch.Increment(ctx, "item1")
if err != nil {
t.Fatalf("Failed to increment: %v", err)
}

count, err := sketch.Count(ctx, "item1")
if err != nil {
t.Fatalf("Failed to count: %v", err)
}

if count != 1 {
t.Errorf("Expected count 1, got %d", count)
}
}

func TestCMSketch_MultipleIncrements(t *testing.T) {
ctx := context.Background()
client, cleanup := setupTestRedis(t)
defer cleanup()

config := Config{
Width: 1000,
Depth: 3,
Window: time.Minute,
RedisKeyPrefix: "test",
}

sketch := New(config, client)

// Increment same item multiple times
expectedCount := 5
for i := 0; i < expectedCount; i++ {
err := sketch.Increment(ctx, "item1")
if err != nil {
t.Fatalf("Failed to increment on iteration %d: %v", i, err)
}
}

// Verify count
count, err := sketch.Count(ctx, "item1")
if err != nil {
t.Fatalf("Failed to count: %v", err)
}

if count != expectedCount {
t.Errorf("Expected count %d, got %d", expectedCount, count)
}
}

func TestCMSketch_TimeWindow(t *testing.T) {
ctx := context.Background()
client, cleanup := setupTestRedis(t)
defer cleanup()

baseTime := time.Now().UTC()
config := Config{
Width: 1000,
Depth: 3,
Window: time.Minute,
RedisKeyPrefix: "test",
}

sketch := New(config, client)

// Increment in current window
err := sketch.Increment(ctx, "item1")
if err != nil {
t.Fatalf("Failed to increment: %v", err)
}

// Verify count in current window
count, err := sketch.Count(ctx, "item1")
if err != nil {
t.Fatalf("Failed to count: %v", err)
}

if count != 1 {
t.Errorf("Expected count 1 in current window, got %d", count)
}

// Move to next time window
sketch.nowFunc = func() time.Time {
return baseTime.Add(time.Minute)
}

// Count should be 0 in new window
count, err = sketch.Count(ctx, "item1")
if err != nil {
t.Fatalf("Failed to count in second window: %v", err)
}
if count != 0 {
t.Errorf("Expected count 0 in new window, got %d", count)
}
}

func TestCMSketch_RedisErrors(t *testing.T) {
ctx := context.Background()
pool, cleanup := setupTestRedis(t)
defer cleanup()

config := Config{
Width: 1000,
Depth: 3,
Window: time.Minute,
RedisKeyPrefix: "test",
}

sketch := New(config, pool)

// Force Redis connection failure by closing the pool.
cleanup()

// Test Increment with broken Redis
err := sketch.Increment(ctx, "item1")
if err == nil {
t.Error("Expected error on Increment with broken Redis connection, got nil")
}

// Test Count with broken Redis
_, err = sketch.Count(ctx, "item1")
if err == nil {
t.Error("Expected error on Count with broken Redis connection, got nil")
}
}
30 changes: 30 additions & 0 deletions sketch/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package sketch

import (
"context"
"time"
)

// Sketch represents a Redis-backed Count-Min Sketch for counting events
type Sketch interface {
// Increment adds 1 to the counters for the given item.
Increment(ctx context.Context, item string) error

// Count returns the estimated count for the given item.
Count(ctx context.Context, item string) (int, error)
}

// Config holds configuration for the Count-Min Sketch
type Config struct {
// Width is the number of counters per hash function
Width int

// Depth is the number of hash functions
Depth int

// Window is the duration of the counting window
Window time.Duration

// RedisKeyPrefix is the prefix for all Redis keys used by this sketch
RedisKeyPrefix string
}