diff --git a/chunk/chunk_store.go b/chunk/chunk_store.go index 7c71626192..548008f4ec 100644 --- a/chunk/chunk_store.go +++ b/chunk/chunk_store.go @@ -7,7 +7,6 @@ import ( "sort" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/s3" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" @@ -20,17 +19,11 @@ import ( "github.com/weaveworks/cortex/util" ) -const ( - hashKey = "h" - rangeKey = "r" - chunkKey = "c" -) - var ( indexEntriesPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", Name: "chunk_store_index_entries_per_chunk", - Help: "Number of entries written to dynamodb per chunk.", + Help: "Number of entries written to storage per chunk.", Buckets: prometheus.ExponentialBuckets(1, 2, 5), }) s3RequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ @@ -43,7 +36,7 @@ var ( HistogramOpts: prometheus.HistogramOpts{ Namespace: "cortex", Name: "chunk_store_row_writes_distribution", - Help: "Distribution of writes to individual DynamoDB rows", + Help: "Distribution of writes to individual storage rows", Buckets: prometheus.DefBuckets, }, HashBuckets: 1024, @@ -56,18 +49,17 @@ func init() { prometheus.MustRegister(rowWrites) } -// Store type stores and indexes chunks -type Store interface { - Put(ctx context.Context, chunks []Chunk) error - Get(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]Chunk, error) -} - // StoreConfig specifies config for a ChunkStore type StoreConfig struct { SchemaConfig CacheConfig - S3 S3ClientValue - DynamoDB DynamoDBClientValue + S3 util.URLValue + DynamoDB util.URLValue + + mockS3 S3Client + mockBucketName string + mockDynamoDB StorageClient + mockTableName string // For injecting different schemas in tests. schemaFactory func(cfg SchemaConfig) Schema @@ -82,17 +74,39 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.DynamoDB, "dynamodb.url", "DynamoDB endpoint URL.") } -// AWSStore implements ChunkStore for AWS -type AWSStore struct { - cfg StoreConfig - cache *Cache - dynamo *dynamoDBBackoffClient - schema Schema +// Store implements Store +type Store struct { + cfg StoreConfig + + storage StorageClient + tableName string + s3 S3Client + bucketName string + cache *Cache + schema Schema } -// NewAWSStore makes a new ChunkStore -func NewAWSStore(cfg StoreConfig) (*AWSStore, error) { - cfg.SchemaConfig.OriginalTableName = cfg.DynamoDB.TableName +// NewStore makes a new ChunkStore +func NewStore(cfg StoreConfig) (*Store, error) { + dynamoDBClient, tableName := cfg.mockDynamoDB, cfg.mockTableName + if dynamoDBClient == nil { + var err error + dynamoDBClient, tableName, err = NewDynamoDBClient(cfg.DynamoDB.String()) + if err != nil { + return nil, err + } + } + + s3Client, bucketName := cfg.mockS3, cfg.mockBucketName + if s3Client == nil { + var err error + s3Client, bucketName, err = NewS3Client(cfg.S3.String()) + if err != nil { + return nil, err + } + } + + cfg.SchemaConfig.OriginalTableName = tableName var schema Schema var err error if cfg.schemaFactory == nil { @@ -104,11 +118,14 @@ func NewAWSStore(cfg StoreConfig) (*AWSStore, error) { return nil, err } - return &AWSStore{ - cfg: cfg, - schema: schema, - cache: NewCache(cfg.CacheConfig), - dynamo: newDynamoDBBackoffClient(cfg.DynamoDB), + return &Store{ + cfg: cfg, + storage: dynamoDBClient, + tableName: tableName, + s3: s3Client, + bucketName: bucketName, + schema: schema, + cache: NewCache(cfg.CacheConfig), }, nil } @@ -117,7 +134,7 @@ func chunkName(userID, chunkID string) string { } // Put implements ChunkStore -func (c *AWSStore) Put(ctx context.Context, chunks []Chunk) error { +func (c *Store) Put(ctx context.Context, chunks []Chunk) error { userID, err := user.GetID(ctx) if err != nil { return err @@ -132,7 +149,7 @@ func (c *AWSStore) Put(ctx context.Context, chunks []Chunk) error { } // putChunks writes a collection of chunks to S3 in parallel. -func (c *AWSStore) putChunks(ctx context.Context, userID string, chunks []Chunk) error { +func (c *Store) putChunks(ctx context.Context, userID string, chunks []Chunk) error { incomingErrors := make(chan error) for _, chunk := range chunks { go func(chunk Chunk) { @@ -151,7 +168,7 @@ func (c *AWSStore) putChunks(ctx context.Context, userID string, chunks []Chunk) } // putChunk puts a chunk into S3. -func (c *AWSStore) putChunk(ctx context.Context, userID string, chunk *Chunk) error { +func (c *Store) putChunk(ctx context.Context, userID string, chunk *Chunk) error { body, err := chunk.reader() if err != nil { return err @@ -159,9 +176,9 @@ func (c *AWSStore) putChunk(ctx context.Context, userID string, chunk *Chunk) er err = instrument.TimeRequestHistogram(ctx, "S3.PutObject", s3RequestDuration, func(_ context.Context) error { var err error - _, err = c.cfg.S3.PutObject(&s3.PutObjectInput{ + _, err = c.s3.PutObject(&s3.PutObjectInput{ Body: body, - Bucket: aws.String(c.cfg.S3.BucketName), + Bucket: aws.String(c.bucketName), Key: aws.String(chunkName(userID, chunk.ID)), }) return err @@ -176,19 +193,19 @@ func (c *AWSStore) putChunk(ctx context.Context, userID string, chunk *Chunk) er return nil } -func (c *AWSStore) updateIndex(ctx context.Context, userID string, chunks []Chunk) error { +func (c *Store) updateIndex(ctx context.Context, userID string, chunks []Chunk) error { writeReqs, err := c.calculateDynamoWrites(userID, chunks) if err != nil { return err } - return c.dynamo.batchWriteDynamo(ctx, writeReqs) + return c.storage.BatchWrite(ctx, writeReqs) } -// calculateDynamoWrites creates a set of WriteRequests to dynamo for all `the -// chunks it is given. -func (c *AWSStore) calculateDynamoWrites(userID string, chunks []Chunk) (map[string][]*dynamodb.WriteRequest, error) { - writeReqs := map[string][]*dynamodb.WriteRequest{} +// calculateDynamoWrites creates a set of batched WriteRequests to dynamo for all +// the chunks it is given. +func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch, error) { + writeReqs := c.storage.NewWriteBatch() for _, chunk := range chunks { metricName, err := util.ExtractMetricNameFromMetric(chunk.Metric) if err != nil { @@ -203,22 +220,14 @@ func (c *AWSStore) calculateDynamoWrites(userID string, chunks []Chunk) (map[str for _, entry := range entries { rowWrites.Observe(entry.HashKey, 1) - - writeReqs[entry.TableName] = append(writeReqs[entry.TableName], &dynamodb.WriteRequest{ - PutRequest: &dynamodb.PutRequest{ - Item: map[string]*dynamodb.AttributeValue{ - hashKey: {S: aws.String(entry.HashKey)}, - rangeKey: {B: entry.RangeKey}, - }, - }, - }) + writeReqs.Add(entry.TableName, entry.HashKey, entry.RangeKey) } } return writeReqs, nil } // Get implements ChunkStore -func (c *AWSStore) Get(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]Chunk, error) { +func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]Chunk, error) { userID, err := user.GetID(ctx) if err != nil { return nil, err @@ -226,8 +235,8 @@ func (c *AWSStore) Get(ctx context.Context, from, through model.Time, allMatcher filters, matchers := util.SplitFiltersAndMatchers(allMatchers) - // Fetch chunk descriptors (just ID really) from DynamoDB - chunks, err := c.lookupChunks(ctx, userID, from, through, matchers) + // Fetch chunk descriptors (just ID really) from storage + chunks, err := c.lookupMatchers(ctx, userID, from, through, matchers) if err != nil { return nil, err } @@ -281,7 +290,7 @@ outer: return filteredChunks, nil } -func (c *AWSStore) lookupChunks(ctx context.Context, userID string, from, through model.Time, matchers []*metric.LabelMatcher) ([]Chunk, error) { +func (c *Store) lookupMatchers(ctx context.Context, userID string, from, through model.Time, matchers []*metric.LabelMatcher) ([]Chunk, error) { metricName, matchers, err := util.ExtractMetricNameFromMatchers(matchers) if err != nil { return nil, err @@ -333,7 +342,7 @@ func (c *AWSStore) lookupChunks(ctx context.Context, userID string, from, throug return nWayIntersect(chunkSets), lastErr } -func (c *AWSStore) lookupEntries(ctx context.Context, entries []IndexEntry, matcher *metric.LabelMatcher) (ByID, error) { +func (c *Store) lookupEntries(ctx context.Context, entries []IndexEntry, matcher *metric.LabelMatcher) (ByID, error) { incomingChunkSets := make(chan ByID) incomingErrors := make(chan error) for _, entry := range entries { @@ -361,38 +370,17 @@ func (c *AWSStore) lookupEntries(ctx context.Context, entries []IndexEntry, matc return chunks, lastErr } -func (c *AWSStore) lookupEntry(ctx context.Context, entry IndexEntry, matcher *metric.LabelMatcher) (ByID, error) { - input := &dynamodb.QueryInput{ - TableName: aws.String(entry.TableName), - KeyConditions: map[string]*dynamodb.Condition{ - hashKey: { - AttributeValueList: []*dynamodb.AttributeValue{ - {S: aws.String(entry.HashKey)}, - }, - ComparisonOperator: aws.String("EQ"), - }, - }, - ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), - } - if len(entry.RangeKey) > 0 { - input.KeyConditions[rangeKey] = &dynamodb.Condition{ - AttributeValueList: []*dynamodb.AttributeValue{ - {B: entry.RangeKey}, - }, - ComparisonOperator: aws.String(dynamodb.ComparisonOperatorBeginsWith), - } - } - +func (c *Store) lookupEntry(ctx context.Context, entry IndexEntry, matcher *metric.LabelMatcher) (ByID, error) { var chunkSet ByID var processingError error - if err := c.dynamo.queryPages(ctx, input, func(resp interface{}, lastPage bool) (shouldContinue bool) { - processingError = processResponse(resp.(*dynamodb.QueryOutput), &chunkSet, matcher) + if err := c.storage.QueryPages(ctx, entry.TableName, entry.HashKey, entry.RangeKey, func(resp ReadBatch, lastPage bool) (shouldContinue bool) { + processingError = processResponse(resp, &chunkSet, matcher) return processingError != nil && !lastPage }); err != nil { - log.Errorf("Error querying DynamoDB: %v", err) + log.Errorf("Error querying storage: %v", err) return nil, err } else if processingError != nil { - log.Errorf("Error processing DynamoDB response: %v", processingError) + log.Errorf("Error processing storage response: %v", processingError) return nil, processingError } sort.Sort(ByID(chunkSet)) @@ -400,11 +388,11 @@ func (c *AWSStore) lookupEntry(ctx context.Context, entry IndexEntry, matcher *m return chunkSet, nil } -func processResponse(resp *dynamodb.QueryOutput, chunkSet *ByID, matcher *metric.LabelMatcher) error { - for _, item := range resp.Items { - rangeValue := item[rangeKey].B +func processResponse(resp ReadBatch, chunkSet *ByID, matcher *metric.LabelMatcher) error { + for i := 0; i < resp.Len(); i++ { + rangeValue := resp.RangeValue(i) if rangeValue == nil { - return fmt.Errorf("invalid item: %v", item) + return fmt.Errorf("invalid item: %d", i) } _, value, chunkID, err := parseRangeValue(rangeValue) if err != nil { @@ -415,8 +403,8 @@ func processResponse(resp *dynamodb.QueryOutput, chunkSet *ByID, matcher *metric ID: chunkID, } - if chunkValue, ok := item[chunkKey]; ok && chunkValue.B != nil { - if err := json.Unmarshal(chunkValue.B, &chunk); err != nil { + if value := resp.Value(i); value != nil { + if err := json.Unmarshal(value, &chunk); err != nil { return err } chunk.metadataInIndex = true @@ -431,7 +419,7 @@ func processResponse(resp *dynamodb.QueryOutput, chunkSet *ByID, matcher *metric return nil } -func (c *AWSStore) fetchChunkData(ctx context.Context, userID string, chunkSet []Chunk) ([]Chunk, error) { +func (c *Store) fetchChunkData(ctx context.Context, userID string, chunkSet []Chunk) ([]Chunk, error) { incomingChunks := make(chan Chunk) incomingErrors := make(chan error) for _, chunk := range chunkSet { @@ -439,8 +427,8 @@ func (c *AWSStore) fetchChunkData(ctx context.Context, userID string, chunkSet [ var resp *s3.GetObjectOutput err := instrument.TimeRequestHistogram(ctx, "S3.GetObject", s3RequestDuration, func(_ context.Context) error { var err error - resp, err = c.cfg.S3.GetObject(&s3.GetObjectInput{ - Bucket: aws.String(c.cfg.S3.BucketName), + resp, err = c.s3.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(c.bucketName), Key: aws.String(chunkName(userID, chunk.ID)), }) return err diff --git a/chunk/chunk_store_test.go b/chunk/chunk_store_test.go index a596a440ee..7aa8a83aff 100644 --- a/chunk/chunk_store_test.go +++ b/chunk/chunk_store_test.go @@ -16,11 +16,9 @@ import ( "github.com/weaveworks/common/user" ) -func setupDynamodb(t *testing.T, dynamoDB DynamoDBClient) { +func setupDynamodb(t *testing.T, dynamoDB StorageClient) { tableManager, err := NewDynamoTableManager(TableManagerConfig{ - DynamoDB: DynamoDBClientValue{ - DynamoDBClient: dynamoDB, - }, + mockDynamoDB: dynamoDB, }) if err != nil { t.Fatal(err) @@ -30,48 +28,6 @@ func setupDynamodb(t *testing.T, dynamoDB DynamoDBClient) { } } -func TestChunkStoreUnprocessed(t *testing.T) { - dynamoDB := NewMockDynamoDB(2, 2) - setupDynamodb(t, dynamoDB) - store, err := NewAWSStore(StoreConfig{ - DynamoDB: DynamoDBClientValue{ - DynamoDBClient: dynamoDB, - }, - S3: S3ClientValue{ - S3Client: NewMockS3(), - }, - }) - if err != nil { - t.Fatal(err) - } - - ctx := user.WithID(context.Background(), "0") - now := model.Now() - chunks, _ := chunk.New().Add(model.SamplePair{Timestamp: now, Value: 0}) - chunk := NewChunk( - model.Fingerprint(1), - model.Metric{ - model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", - }, - chunks[0], - now.Add(-time.Hour), - now, - ) - want := []Chunk{chunk} - if err := store.Put(ctx, want); err != nil { - t.Fatal(err) - } - have, err := store.Get(ctx, now.Add(-time.Hour), now, mustNewLabelMatcher(metric.Equal, model.MetricNameLabel, "foo")) - if err != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(want, have) { - t.Fatalf("wrong chunks - %s", test.Diff(want, have)) - } -} - func TestChunkStore(t *testing.T) { ctx := user.WithID(context.Background(), "0") now := model.Now() @@ -165,15 +121,11 @@ func TestChunkStore(t *testing.T) { for _, schema := range schemas { t.Run(fmt.Sprintf("%s/%s", tc.name, schema.name), func(t *testing.T) { log.Infoln("========= Running test", tc.name, "with schema", schema.name) - dynamoDB := NewMockDynamoDB(0, 0) + dynamoDB := NewMockStorage() setupDynamodb(t, dynamoDB) - store, err := NewAWSStore(StoreConfig{ - DynamoDB: DynamoDBClientValue{ - DynamoDBClient: dynamoDB, - }, - S3: S3ClientValue{ - S3Client: NewMockS3(), - }, + store, err := NewStore(StoreConfig{ + mockDynamoDB: dynamoDB, + mockS3: NewMockS3(), schemaFactory: schema.fn, }) if err != nil { diff --git a/chunk/dynamodb_client.go b/chunk/dynamodb_client.go index d6c219b92d..129c391c49 100644 --- a/chunk/dynamodb_client.go +++ b/chunk/dynamodb_client.go @@ -9,7 +9,6 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/prometheus/client_golang/prometheus" @@ -18,11 +17,17 @@ import ( ) const ( + hashKey = "h" + rangeKey = "r" + chunkKey = "c" + // For dynamodb errors tableNameLabel = "table" errorReasonLabel = "error" otherError = "other" + provisionedThroughputExceededException = "ProvisionedThroughputExceededException" + // Backoff for dynamoDB requests, to match AWS lib - see: // https://github.com/aws/aws-sdk-go/blob/master/service/dynamodb/customizations.go minBackoff = 50 * time.Millisecond @@ -31,8 +36,6 @@ const ( // See http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Limits.html. dynamoMaxBatchSize = 25 - - provisionedThroughputExceededException = "ProvisionedThroughputExceededException" ) var ( @@ -69,36 +72,12 @@ func init() { prometheus.MustRegister(dynamoUnprocessedItems) } -func recordDynamoError(tableName string, err error) { - if awsErr, ok := err.(awserr.Error); ok { - dynamoFailures.WithLabelValues(tableName, awsErr.Code()).Add(float64(1)) - } else { - dynamoFailures.WithLabelValues(tableName, otherError).Add(float64(1)) - } -} - -// DynamoDBClient is a client for DynamoDB -type DynamoDBClient interface { - ListTablesPages(*dynamodb.ListTablesInput, func(p *dynamodb.ListTablesOutput, lastPage bool) (shouldContinue bool)) error - CreateTable(*dynamodb.CreateTableInput) (*dynamodb.CreateTableOutput, error) - DescribeTable(*dynamodb.DescribeTableInput) (*dynamodb.DescribeTableOutput, error) - UpdateTable(*dynamodb.UpdateTableInput) (*dynamodb.UpdateTableOutput, error) - - BatchWriteItem(*dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) - QueryRequest(*dynamodb.QueryInput) (req dynamoRequest, output *dynamodb.QueryOutput) -} - -type dynamoRequest interface { - NextPage() dynamoRequest - HasNextPage() bool - Data() interface{} - OperationName() string - Send() error - Error() error +type dynamoClientAdapter struct { + *dynamodb.DynamoDB } // NewDynamoDBClient makes a new DynamoDBClient -func NewDynamoDBClient(dynamoDBURL string) (DynamoDBClient, string, error) { +func NewDynamoDBClient(dynamoDBURL string) (StorageClient, string, error) { url, err := url.Parse(dynamoDBURL) if err != nil { return nil, "", err @@ -114,126 +93,24 @@ func NewDynamoDBClient(dynamoDBURL string) (DynamoDBClient, string, error) { return dynamoDBClient, tableName, nil } -// DynamoDBClientValue is a flag.Value that parses a URL and produces a DynamoDBClient -type DynamoDBClientValue struct { - url, TableName string - DynamoDBClient -} - -// String implements flag.Value -func (c *DynamoDBClientValue) String() string { - return c.url -} - -// Set implements flag.Value -func (c *DynamoDBClientValue) Set(v string) error { - var err error - c.DynamoDBClient, c.TableName, err = NewDynamoDBClient(v) - return err +func (d dynamoClientAdapter) NewWriteBatch() WriteBatch { + return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{}) } -type dynamoClientAdapter struct { - *dynamodb.DynamoDB -} - -func (d dynamoClientAdapter) QueryRequest(in *dynamodb.QueryInput) (dynamoRequest, *dynamodb.QueryOutput) { - req, out := d.DynamoDB.QueryRequest(in) - return dynamoRequestAdapter{req}, out -} - -type dynamoRequestAdapter struct { - *request.Request -} - -func (d dynamoRequestAdapter) Data() interface{} { - return d.Request.Data -} - -func (d dynamoRequestAdapter) OperationName() string { - return d.Operation.Name -} - -func (d dynamoRequestAdapter) NextPage() dynamoRequest { - if r := d.Request.NextPage(); r != nil { - return dynamoRequestAdapter{r} - } - return nil -} - -func (d dynamoRequestAdapter) Error() error { - return d.Request.Error -} - -type dynamoDBBackoffClient struct { - client DynamoDBClient -} - -func newDynamoDBBackoffClient(client DynamoDBClient) *dynamoDBBackoffClient { - return &dynamoDBBackoffClient{ - client: client, - } -} - -// batchWriteDynamo writes many requests to dynamo in a single batch. -func (c *dynamoDBBackoffClient) batchWriteDynamo(ctx context.Context, reqs map[string][]*dynamodb.WriteRequest) error { - min := func(i, j int) int { - if i < j { - return i - } - return j - } - - dictLen := func(in map[string][]*dynamodb.WriteRequest) int { - result := 0 - for _, reqs := range in { - result += len(reqs) - } - return result - } - - // Fill 'out' with WriteRequests from 'in' until it 'out' has at most dynamoMaxBatchSize requests. Remove those requests from 'in'. - fillReq := func(in map[string][]*dynamodb.WriteRequest, out map[string][]*dynamodb.WriteRequest) { - outLen, inLen := dictLen(out), dictLen(in) - toFill := min(inLen, dynamoMaxBatchSize-outLen) - for toFill > 0 { - for tableName := range in { - reqs := in[tableName] - taken := min(len(reqs), toFill) - if taken > 0 { - out[tableName] = append(out[tableName], reqs[:taken]...) - in[tableName] = reqs[taken:] - toFill -= taken - } - } - } - } - - copyUnprocessed := func(in map[string][]*dynamodb.WriteRequest, out map[string][]*dynamodb.WriteRequest) { - for tableName, unprocessReqs := range in { - out[tableName] = append(out[tableName], unprocessReqs...) - dynamoUnprocessedItems.Add(float64(len(unprocessReqs))) - } - } - - tableNames := func(reqs map[string][]*dynamodb.WriteRequest) []string { - result := []string{} - for tableName := range reqs { - result = append(result, tableName) - } - return result - } - - outstanding, unprocessed := reqs, map[string][]*dynamodb.WriteRequest{} +// batchWrite writes requests to the underlying storage, handling retires and backoff. +func (d dynamoClientAdapter) BatchWrite(ctx context.Context, input WriteBatch) error { + outstanding := input.(dynamoDBWriteBatch) + unprocessed := map[string][]*dynamodb.WriteRequest{} backoff, numRetries := minBackoff, 0 for dictLen(outstanding)+dictLen(unprocessed) > 0 && numRetries < maxRetries { reqs := map[string][]*dynamodb.WriteRequest{} - fillReq(unprocessed, reqs) - fillReq(outstanding, reqs) + takeReqs(unprocessed, reqs, dynamoMaxBatchSize) + takeReqs(outstanding, reqs, dynamoMaxBatchSize) var resp *dynamodb.BatchWriteItemOutput err := instrument.TimeRequestHistogram(ctx, "DynamoDB.BatchWriteItem", dynamoRequestDuration, func(_ context.Context) error { var err error - resp, err = c.client.BatchWriteItem(&dynamodb.BatchWriteItemInput{ + resp, err = d.DynamoDB.BatchWriteItem(&dynamodb.BatchWriteItemInput{ RequestItems: reqs, ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), }) @@ -245,14 +122,14 @@ func (c *dynamoDBBackoffClient) batchWriteDynamo(ctx context.Context, reqs map[s } if err != nil { - for _, tableName := range tableNames(reqs) { + for tableName := range reqs { recordDynamoError(tableName, err) } } // If there are unprocessed items, backoff and retry those items. - if resp.UnprocessedItems != nil && dictLen(resp.UnprocessedItems) > 0 { - copyUnprocessed(resp.UnprocessedItems, unprocessed) + if unprocessedItems := resp.UnprocessedItems; unprocessedItems != nil && dictLen(unprocessedItems) > 0 { + takeReqs(unprocessedItems, unprocessed, -1) time.Sleep(backoff) backoff = nextBackoff(backoff) continue @@ -261,7 +138,7 @@ func (c *dynamoDBBackoffClient) batchWriteDynamo(ctx context.Context, reqs map[s // If we get provisionedThroughputExceededException, then no items were processed, // so back off and retry all. if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == provisionedThroughputExceededException { - copyUnprocessed(reqs, unprocessed) + takeReqs(reqs, unprocessed, -1) time.Sleep(backoff) backoff = nextBackoff(backoff) numRetries++ @@ -283,16 +160,36 @@ func (c *dynamoDBBackoffClient) batchWriteDynamo(ctx context.Context, reqs map[s return nil } -func (c *dynamoDBBackoffClient) queryPages(ctx context.Context, input *dynamodb.QueryInput, callback func(resp interface{}, lastPage bool) (shouldContinue bool)) error { - request, _ := c.client.QueryRequest(input) - backoff := minBackoff +func (d dynamoClientAdapter) QueryPages(ctx context.Context, tableName, hashValue string, rangePrefix []byte, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error { + input := &dynamodb.QueryInput{ + TableName: aws.String(tableName), + KeyConditions: map[string]*dynamodb.Condition{ + hashKey: { + AttributeValueList: []*dynamodb.AttributeValue{ + {S: aws.String(hashValue)}, + }, + ComparisonOperator: aws.String("EQ"), + }, + }, + ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), + } + if len(rangePrefix) > 0 { + input.KeyConditions[rangeKey] = &dynamodb.Condition{ + AttributeValueList: []*dynamodb.AttributeValue{ + {B: rangePrefix}, + }, + ComparisonOperator: aws.String(dynamodb.ComparisonOperatorBeginsWith), + } + } + request, _ := d.DynamoDB.QueryRequest(input) + backoff := minBackoff for page := request; page != nil; page = page.NextPage() { err := instrument.TimeRequestHistogram(ctx, "DynamoDB.QueryPages", dynamoRequestDuration, func(_ context.Context) error { return page.Send() }) - if cc := page.Data().(*dynamodb.QueryOutput).ConsumedCapacity; cc != nil { + if cc := page.Data.(*dynamodb.QueryOutput).ConsumedCapacity; cc != nil { dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages"). Add(float64(*cc.CapacityUnits)) } @@ -306,11 +203,12 @@ func (c *dynamoDBBackoffClient) queryPages(ctx context.Context, input *dynamodb. continue } - return page.Error() + return page.Error } - if getNextPage := callback(page.Data(), !page.HasNextPage()); !getNextPage { - return page.Error() + queryOutput := page.Data.(*dynamodb.QueryOutput) + if getNextPage := callback(dynamoDBReadBatch(queryOutput.Items), !page.HasNextPage()); !getNextPage { + return page.Error } backoff = minBackoff @@ -319,6 +217,104 @@ func (c *dynamoDBBackoffClient) queryPages(ctx context.Context, input *dynamodb. return nil } +func (d dynamoClientAdapter) ListTables() ([]string, error) { + table := []string{} + if err := d.DynamoDB.ListTablesPages(&dynamodb.ListTablesInput{}, func(resp *dynamodb.ListTablesOutput, _ bool) bool { + for _, s := range resp.TableNames { + table = append(table, *s) + } + return true + }); err != nil { + return nil, err + } + return table, nil +} + +func (d dynamoClientAdapter) CreateTable(name string, readCapacity, writeCapacity int64) error { + input := &dynamodb.CreateTableInput{ + TableName: aws.String(name), + AttributeDefinitions: []*dynamodb.AttributeDefinition{ + { + AttributeName: aws.String(hashKey), + AttributeType: aws.String(dynamodb.ScalarAttributeTypeS), + }, + { + AttributeName: aws.String(rangeKey), + AttributeType: aws.String(dynamodb.ScalarAttributeTypeB), + }, + }, + KeySchema: []*dynamodb.KeySchemaElement{ + { + AttributeName: aws.String(hashKey), + KeyType: aws.String(dynamodb.KeyTypeHash), + }, + { + AttributeName: aws.String(rangeKey), + KeyType: aws.String(dynamodb.KeyTypeRange), + }, + }, + ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacity), + WriteCapacityUnits: aws.Int64(writeCapacity), + }, + } + _, err := d.DynamoDB.CreateTable(input) + return err +} + +func (d dynamoClientAdapter) DescribeTable(name string) (readCapacity, writeCapacity int64, status string, err error) { + out, err := d.DynamoDB.DescribeTable(&dynamodb.DescribeTableInput{ + TableName: aws.String(name), + }) + if err != nil { + return 0, 0, "", err + } + + return *out.Table.ProvisionedThroughput.ReadCapacityUnits, *out.Table.ProvisionedThroughput.WriteCapacityUnits, *out.Table.TableStatus, nil +} + +func (d dynamoClientAdapter) UpdateTable(name string, readCapacity, writeCapacity int64) error { + _, err := d.DynamoDB.UpdateTable(&dynamodb.UpdateTableInput{ + TableName: aws.String(name), + ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacity), + WriteCapacityUnits: aws.Int64(writeCapacity), + }, + }) + return err +} + +type dynamoDBWriteBatch map[string][]*dynamodb.WriteRequest + +func (b dynamoDBWriteBatch) Add(tableName, hashValue string, rangeValue []byte) { + b[tableName] = append(b[tableName], &dynamodb.WriteRequest{ + PutRequest: &dynamodb.PutRequest{ + Item: map[string]*dynamodb.AttributeValue{ + hashKey: {S: aws.String(hashValue)}, + rangeKey: {B: rangeValue}, + }, + }, + }) +} + +type dynamoDBReadBatch []map[string]*dynamodb.AttributeValue + +func (b dynamoDBReadBatch) Len() int { + return len(b) +} + +func (b dynamoDBReadBatch) RangeValue(i int) []byte { + return b[i][rangeKey].B +} + +func (b dynamoDBReadBatch) Value(i int) []byte { + chunkValue, ok := b[i][chunkKey] + if !ok { + return nil + } + return chunkValue.B +} + func nextBackoff(lastBackoff time.Duration) time.Duration { // Based on the "Decorrelated Jitter" approach from https://www.awsarchitectureblog.com/2015/03/backoff.html // sleep = min(cap, random_between(base, sleep * 3)) @@ -328,3 +324,46 @@ func nextBackoff(lastBackoff time.Duration) time.Duration { } return backoff } + +func recordDynamoError(tableName string, err error) { + if awsErr, ok := err.(awserr.Error); ok { + dynamoFailures.WithLabelValues(tableName, awsErr.Code()).Add(float64(1)) + } else { + dynamoFailures.WithLabelValues(tableName, otherError).Add(float64(1)) + } +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func dictLen(b map[string][]*dynamodb.WriteRequest) int { + result := 0 + for _, reqs := range b { + result += len(reqs) + } + return result +} + +// Fill b with WriteRequests from 'from' until it 'b' has at most max requests. Remove those requests from 'from'. +func takeReqs(from, to map[string][]*dynamodb.WriteRequest, max int) { + outLen, inLen := dictLen(to), dictLen(from) + toFill := inLen + if max > 0 { + toFill = min(inLen, max-outLen) + } + for toFill > 0 { + for tableName := range from { + reqs := from[tableName] + taken := min(len(reqs), toFill) + if taken > 0 { + to[tableName] = append(from[tableName], reqs[:taken]...) + from[tableName] = reqs[taken:] + toFill -= taken + } + } + } +} diff --git a/chunk/dynamodb_test.go b/chunk/dynamodb_test.go deleted file mode 100644 index 66e8f15b9c..0000000000 --- a/chunk/dynamodb_test.go +++ /dev/null @@ -1,274 +0,0 @@ -package chunk - -import ( - "bytes" - "fmt" - "log" - "sort" - "sync" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/service/dynamodb" -) - -type MockDynamoDB struct { - mtx sync.RWMutex - unprocessed int - provisionedErr int - tables map[string]*mockDynamoDBTable -} - -type mockDynamoDBTable struct { - hashKey string - rangeKey string - items map[string][]mockDynamoDBItem - write, read int64 -} - -type mockDynamoDBItem map[string]*dynamodb.AttributeValue - -func NewMockDynamoDB(unprocessed int, provisionedErr int) *MockDynamoDB { - return &MockDynamoDB{ - tables: map[string]*mockDynamoDBTable{}, - unprocessed: unprocessed, - provisionedErr: provisionedErr, - } -} - -func (m *MockDynamoDB) ListTablesPages(_ *dynamodb.ListTablesInput, fn func(p *dynamodb.ListTablesOutput, lastPage bool) (shouldContinue bool)) error { - m.mtx.RLock() - defer m.mtx.RUnlock() - - var tableNames []*string - for tableName := range m.tables { - func(tableName string) { - tableNames = append(tableNames, &tableName) - }(tableName) - } - fn(&dynamodb.ListTablesOutput{ - TableNames: tableNames, - }, true) - return nil -} - -func (m *MockDynamoDB) CreateTable(input *dynamodb.CreateTableInput) (*dynamodb.CreateTableOutput, error) { - m.mtx.Lock() - defer m.mtx.Unlock() - - if _, ok := m.tables[*input.TableName]; ok { - return nil, fmt.Errorf("table already exists") - } - - var hashKey, rangeKey string - for _, schemaElement := range input.KeySchema { - if *schemaElement.KeyType == "HASH" { - hashKey = *schemaElement.AttributeName - } else if *schemaElement.KeyType == "RANGE" { - rangeKey = *schemaElement.AttributeName - } - } - - m.tables[*input.TableName] = &mockDynamoDBTable{ - hashKey: hashKey, - rangeKey: rangeKey, - items: map[string][]mockDynamoDBItem{}, - write: *input.ProvisionedThroughput.WriteCapacityUnits, - read: *input.ProvisionedThroughput.ReadCapacityUnits, - } - - return &dynamodb.CreateTableOutput{}, nil -} - -func (m *MockDynamoDB) DescribeTable(input *dynamodb.DescribeTableInput) (*dynamodb.DescribeTableOutput, error) { - m.mtx.RLock() - defer m.mtx.RUnlock() - - table, ok := m.tables[*input.TableName] - if !ok { - return nil, fmt.Errorf("not found") - } - - return &dynamodb.DescribeTableOutput{ - Table: &dynamodb.TableDescription{ - ItemCount: aws.Int64(int64(len(table.items))), - ProvisionedThroughput: &dynamodb.ProvisionedThroughputDescription{ - ReadCapacityUnits: aws.Int64(table.read), - WriteCapacityUnits: aws.Int64(table.write), - }, - TableStatus: aws.String(dynamodb.TableStatusActive), - }, - }, nil -} - -func (m *MockDynamoDB) UpdateTable(input *dynamodb.UpdateTableInput) (*dynamodb.UpdateTableOutput, error) { - m.mtx.Lock() - defer m.mtx.Unlock() - - table, ok := m.tables[*input.TableName] - if !ok { - return nil, fmt.Errorf("not found") - } - - table.read = *input.ProvisionedThroughput.ReadCapacityUnits - table.write = *input.ProvisionedThroughput.WriteCapacityUnits - - return &dynamodb.UpdateTableOutput{}, nil -} - -func (m *MockDynamoDB) BatchWriteItem(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) { - m.mtx.Lock() - defer m.mtx.Unlock() - - resp := &dynamodb.BatchWriteItemOutput{ - UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, - } - - if m.provisionedErr > 0 { - m.provisionedErr-- - return resp, awserr.New(provisionedThroughputExceededException, "", nil) - } - - for tableName, writeRequests := range input.RequestItems { - table, ok := m.tables[tableName] - if !ok { - return &dynamodb.BatchWriteItemOutput{}, fmt.Errorf("table not found") - } - - for _, writeRequest := range writeRequests { - if m.unprocessed > 0 { - m.unprocessed-- - resp.UnprocessedItems[tableName] = append(resp.UnprocessedItems[tableName], writeRequest) - continue - } - - hashValue := *writeRequest.PutRequest.Item[table.hashKey].S - rangeValue := writeRequest.PutRequest.Item[table.rangeKey].B - log.Printf("Write %s/%x", hashValue, rangeValue) - - items := table.items[hashValue] - - // insert in order - i := sort.Search(len(items), func(i int) bool { - return bytes.Compare(items[i][table.rangeKey].B, rangeValue) >= 0 - }) - if i >= len(items) || !bytes.Equal(items[i][table.rangeKey].B, rangeValue) { - items = append(items, nil) - copy(items[i+1:], items[i:]) - } - items[i] = writeRequest.PutRequest.Item - - table.items[hashValue] = items - } - } - return resp, nil -} - -func (m *MockDynamoDB) Query(input *dynamodb.QueryInput) (*dynamodb.QueryOutput, error) { - m.mtx.RLock() - defer m.mtx.RUnlock() - - table, ok := m.tables[*input.TableName] - if !ok { - return nil, fmt.Errorf("table not found") - } - - hashValueCondition, ok := input.KeyConditions[table.hashKey] - if !ok { - return &dynamodb.QueryOutput{}, fmt.Errorf("must specify hash value condition") - } - - hashValue := *hashValueCondition.AttributeValueList[0].S - items, ok := table.items[hashValue] - if !ok { - return &dynamodb.QueryOutput{}, nil - } - - var found []mockDynamoDBItem - rangeKeyCondition, ok := input.KeyConditions[table.rangeKey] - if !ok { - log.Printf("Lookup %s/* -> *", hashValue) - found = items - } else if *rangeKeyCondition.ComparisonOperator == dynamodb.ComparisonOperatorBetween { - rangeValueStart := rangeKeyCondition.AttributeValueList[0].B - rangeValueEnd := rangeKeyCondition.AttributeValueList[1].B - - log.Printf("Lookup %s/%x -> %x (%d)", hashValue, rangeValueStart, rangeValueEnd, len(items)) - - i := sort.Search(len(items), func(i int) bool { - return bytes.Compare(items[i][table.rangeKey].B, rangeValueStart) >= 0 - }) - - j := sort.Search(len(items), func(i int) bool { - return bytes.Compare(items[i][table.rangeKey].B, rangeValueEnd) > 0 - }) - - log.Printf(" found range [%d:%d]", i, j) - if i > len(items) || i == j { - return &dynamodb.QueryOutput{}, nil - } - found = items[i:j] - } else if *rangeKeyCondition.ComparisonOperator == dynamodb.ComparisonOperatorBeginsWith { - prefix := rangeKeyCondition.AttributeValueList[0].B - - log.Printf("Lookup prefix %s/%x (%d)", hashValue, prefix, len(items)) - - // the smallest index i in [0, n) at which f(i) is true - i := sort.Search(len(items), func(i int) bool { - if bytes.Compare(items[i][table.rangeKey].B, prefix) > 0 { - return true - } - return bytes.HasPrefix(items[i][table.rangeKey].B, prefix) - }) - j := sort.Search(len(items)-i, func(j int) bool { - if bytes.Compare(items[i+j][table.rangeKey].B, prefix) < 0 { - return false - } - return !bytes.HasPrefix(items[i+j][table.rangeKey].B, prefix) - }) - - log.Printf(" found range [%d:%d)", i, i+j) - if i > len(items) || j == 0 { - return &dynamodb.QueryOutput{}, nil - } - found = items[i : i+j] - } else { - panic(fmt.Sprintf("%s not supported", *rangeKeyCondition.ComparisonOperator)) - } - - result := make([]map[string]*dynamodb.AttributeValue, 0, len(found)) - for _, item := range found { - result = append(result, item) - } - - return &dynamodb.QueryOutput{ - Items: result, - }, nil -} - -func (m *MockDynamoDB) QueryPages(input *dynamodb.QueryInput, f func(p *dynamodb.QueryOutput, lastPage bool) bool) error { - output, err := m.Query(input) - if err != nil { - return err - } - - f(output, true) - return nil -} - -func (m *MockDynamoDB) QueryRequest(in *dynamodb.QueryInput) (req dynamoRequest, output *dynamodb.QueryOutput) { - output, err := m.Query(in) - return &mockDynamoRequest{output, err}, nil -} - -type mockDynamoRequest struct { - data *dynamodb.QueryOutput - err error -} - -func (m *mockDynamoRequest) NextPage() dynamoRequest { return nil } -func (m *mockDynamoRequest) HasNextPage() bool { return false } -func (m *mockDynamoRequest) Data() interface{} { return m.data } -func (m *mockDynamoRequest) OperationName() string { return "Query" } -func (m *mockDynamoRequest) Send() error { return m.err } -func (m *mockDynamoRequest) Error() error { return nil } diff --git a/chunk/s3_client.go b/chunk/s3_client.go index 8b5df5ad74..cc665ed199 100644 --- a/chunk/s3_client.go +++ b/chunk/s3_client.go @@ -51,21 +51,3 @@ func awsConfigFromURL(url *url.URL) (*aws.Config, error) { } return config, nil } - -// S3ClientValue is a flag.Value that parses a URL and produces a S3Client -type S3ClientValue struct { - url, BucketName string - S3Client -} - -// String implements flag.Value -func (c *S3ClientValue) String() string { - return c.url -} - -// Set implements flag.Value -func (c *S3ClientValue) Set(v string) error { - var err error - c.S3Client, c.BucketName, err = NewS3Client(v) - return err -} diff --git a/chunk/semaphore.go b/chunk/semaphore.go deleted file mode 100644 index 7aad434db6..0000000000 --- a/chunk/semaphore.go +++ /dev/null @@ -1,35 +0,0 @@ -package chunk - -// Semaphore allows users to control the level of concurrency of the Put function. -type Semaphore interface { - Acquire() - Release() -} - -type semaphore chan struct{} - -// NewSemaphore makes a new Semaphore -func NewSemaphore(size int) Semaphore { - s := semaphore(make(chan struct{}, size)) - for i := 0; i < size; i++ { - s.Release() - } - return s -} - -func (s semaphore) Acquire() { - <-s -} - -func (s semaphore) Release() { - s <- struct{}{} -} - -type noopSemaphore int - -func (noopSemaphore) Acquire() {} - -func (noopSemaphore) Release() {} - -// NoopSemaphore is a no-op semaphore -const NoopSemaphore = noopSemaphore(0) diff --git a/chunk/semaphore_test.go b/chunk/semaphore_test.go deleted file mode 100644 index f77d09028b..0000000000 --- a/chunk/semaphore_test.go +++ /dev/null @@ -1,10 +0,0 @@ -package chunk - -import "testing" - -func TestSemaphore(t *testing.T) { - // A very dump test - s := NewSemaphore(1) - s.Acquire() - s.Release() -} diff --git a/chunk/storage_client.go b/chunk/storage_client.go new file mode 100644 index 0000000000..0652991bd9 --- /dev/null +++ b/chunk/storage_client.go @@ -0,0 +1,36 @@ +package chunk + +import ( + "golang.org/x/net/context" +) + +// StorageClient is a client for DynamoDB +type StorageClient interface { + // For the write path + NewWriteBatch() WriteBatch + BatchWrite(context.Context, WriteBatch) error + + // For the read path + QueryPages(ctx context.Context, tableName, hashValue string, rangePrefix []byte, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error + + // For table management + ListTables() ([]string, error) + CreateTable(name string, readCapacity, writeCapacity int64) error + DescribeTable(name string) (readCapacity, writeCapacity int64, status string, err error) + UpdateTable(name string, readCapacity, writeCapacity int64) error +} + +// WriteBatch represents a batch of writes +type WriteBatch interface { + Add(tableName, hashValue string, rangeValue []byte) +} + +// ReadBatch represents the results of a QueryPages +type ReadBatch interface { + Len() int + RangeValue(index int) []byte + + // Value is deprecated - it exists to support an old schema where the chunk + // metadata was written to the chunk index. + Value(index int) []byte +} diff --git a/chunk/storage_client_mock_test.go b/chunk/storage_client_mock_test.go new file mode 100644 index 0000000000..f8a4620797 --- /dev/null +++ b/chunk/storage_client_mock_test.go @@ -0,0 +1,192 @@ +package chunk + +import ( + "bytes" + "fmt" + "log" + "sort" + "sync" + + "github.com/aws/aws-sdk-go/service/dynamodb" + "golang.org/x/net/context" +) + +type MockStorage struct { + mtx sync.RWMutex + tables map[string]*mockTable +} + +type mockTable struct { + items map[string][]mockItem + write, read int64 +} + +type mockItem []byte + +func NewMockStorage() *MockStorage { + return &MockStorage{ + tables: map[string]*mockTable{}, + } +} + +func (m *MockStorage) ListTables() ([]string, error) { + m.mtx.RLock() + defer m.mtx.RUnlock() + + var tableNames []string + for tableName := range m.tables { + func(tableName string) { + tableNames = append(tableNames, tableName) + }(tableName) + } + return tableNames, nil +} + +func (m *MockStorage) CreateTable(name string, read, write int64) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + if _, ok := m.tables[name]; ok { + return fmt.Errorf("table already exists") + } + + m.tables[name] = &mockTable{ + items: map[string][]mockItem{}, + write: write, + read: read, + } + + return nil +} + +func (m *MockStorage) DescribeTable(name string) (readCapacity, writeCapacity int64, status string, err error) { + m.mtx.RLock() + defer m.mtx.RUnlock() + + table, ok := m.tables[name] + if !ok { + return 0, 0, "", fmt.Errorf("not found") + } + + return table.read, table.write, dynamodb.TableStatusActive, nil +} + +func (m *MockStorage) UpdateTable(name string, readCapacity, writeCapacity int64) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + table, ok := m.tables[name] + if !ok { + return fmt.Errorf("not found") + } + + table.read = readCapacity + table.write = writeCapacity + + return nil +} + +func (m *MockStorage) NewWriteBatch() WriteBatch { + return &mockWriteBatch{} +} + +func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + for _, req := range *batch.(*mockWriteBatch) { + table, ok := m.tables[req.tableName] + if !ok { + return fmt.Errorf("table not found") + } + + log.Printf("Write %s/%x", req.hashValue, req.rangeValue) + + items := table.items[req.hashValue] + + // insert in order + i := sort.Search(len(items), func(i int) bool { + return bytes.Compare(items[i], req.rangeValue) >= 0 + }) + if i >= len(items) || !bytes.Equal(items[i], req.rangeValue) { + items = append(items, nil) + copy(items[i+1:], items[i:]) + } + items[i] = req.rangeValue + + table.items[req.hashValue] = items + } + return nil +} + +func (m *MockStorage) QueryPages(ctx context.Context, tableName, hashValue string, rangePrefix []byte, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error { + m.mtx.RLock() + defer m.mtx.RUnlock() + + table, ok := m.tables[tableName] + if !ok { + return fmt.Errorf("table not found") + } + + items, ok := table.items[hashValue] + if !ok { + return nil + } + + var found []mockItem + log.Printf("Lookup prefix %s/%x (%d)", hashValue, rangePrefix, len(items)) + + // the smallest index i in [0, n) at which f(i) is true + i := sort.Search(len(items), func(i int) bool { + if bytes.Compare(items[i], rangePrefix) > 0 { + return true + } + return bytes.HasPrefix(items[i], rangePrefix) + }) + j := sort.Search(len(items)-i, func(j int) bool { + if bytes.Compare(items[i+j], rangePrefix) < 0 { + return false + } + return !bytes.HasPrefix(items[i+j], rangePrefix) + }) + + log.Printf(" found range [%d:%d)", i, i+j) + if i > len(items) || j == 0 { + return nil + } + found = items[i : i+j] + + result := mockReadBatch{} + for _, item := range found { + result = append(result, item) + } + + callback(result, true) + return nil +} + +type mockWriteBatch []struct { + tableName, hashValue string + rangeValue []byte +} + +func (b *mockWriteBatch) Add(tableName, hashValue string, rangeValue []byte) { + *b = append(*b, struct { + tableName, hashValue string + rangeValue []byte + }{tableName, hashValue, rangeValue}) +} + +type mockReadBatch [][]byte + +func (b mockReadBatch) Len() int { + return len(b) +} + +func (b mockReadBatch) RangeValue(i int) []byte { + return b[i] +} + +func (b mockReadBatch) Value(i int) []byte { + return nil +} diff --git a/chunk/dynamo_table_manager.go b/chunk/table_manager.go similarity index 80% rename from chunk/dynamo_table_manager.go rename to chunk/table_manager.go index cfb68d7985..ca0d0befd2 100644 --- a/chunk/dynamo_table_manager.go +++ b/chunk/table_manager.go @@ -7,14 +7,13 @@ import ( "sync" "time" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" - "github.com/weaveworks/common/instrument" - "github.com/weaveworks/common/mtime" "golang.org/x/net/context" + "github.com/weaveworks/common/instrument" + "github.com/weaveworks/common/mtime" "github.com/weaveworks/cortex/util" ) @@ -43,9 +42,12 @@ func init() { // TableManagerConfig is the config for a DynamoTableManager type TableManagerConfig struct { - DynamoDB DynamoDBClientValue + DynamoDB util.URLValue DynamoDBPollInterval time.Duration + mockDynamoDB StorageClient + mockTableName string + PeriodicTableConfig // duration a table will be created before it is needed. @@ -91,16 +93,29 @@ func (cfg *PeriodicTableConfig) RegisterFlags(f *flag.FlagSet) { // DynamoTableManager creates and manages the provisioned throughput on DynamoDB tables type DynamoTableManager struct { - cfg TableManagerConfig - done chan struct{} - wait sync.WaitGroup + dynamoDB StorageClient + tableName string + cfg TableManagerConfig + done chan struct{} + wait sync.WaitGroup } // NewDynamoTableManager makes a new DynamoTableManager func NewDynamoTableManager(cfg TableManagerConfig) (*DynamoTableManager, error) { + dynamoDBClient, tableName := cfg.mockDynamoDB, cfg.mockTableName + if dynamoDBClient == nil { + var err error + dynamoDBClient, tableName, err = NewDynamoDBClient(cfg.DynamoDB.String()) + if err != nil { + return nil, err + } + } + m := &DynamoTableManager{ - cfg: cfg, - done: make(chan struct{}), + cfg: cfg, + dynamoDB: dynamoDBClient, + tableName: tableName, + done: make(chan struct{}), } return m, nil } @@ -175,7 +190,7 @@ func (m *DynamoTableManager) calculateExpectedTables() []tableDescription { if !m.cfg.UsePeriodicTables { return []tableDescription{ { - name: m.cfg.DynamoDB.TableName, + name: m.tableName, provisionedRead: m.cfg.ProvisionedReadThroughput, provisionedWrite: m.cfg.ProvisionedWriteThroughput, }, @@ -196,7 +211,7 @@ func (m *DynamoTableManager) calculateExpectedTables() []tableDescription { // Add the legacy table { legacyTable := tableDescription{ - name: m.cfg.DynamoDB.TableName, + name: m.tableName, provisionedRead: m.cfg.InactiveReadThroughput, provisionedWrite: m.cfg.InactiveWriteThroughput, } @@ -231,10 +246,15 @@ func (m *DynamoTableManager) calculateExpectedTables() []tableDescription { // partitionTables works out tables that need to be created vs tables that need to be updated func (m *DynamoTableManager) partitionTables(ctx context.Context, descriptions []tableDescription) ([]tableDescription, []tableDescription, error) { - existingTables, err := m.listTables(ctx) - if err != nil { + var existingTables []string + if err := instrument.TimeRequestHistogram(ctx, "DynamoDB.ListTablesPages", dynamoRequestDuration, func(_ context.Context) error { + var err error + existingTables, err = m.dynamoDB.ListTables() + return err + }); err != nil { return nil, nil, err } + sort.Strings(existingTables) toCreate, toCheckThroughput := []tableDescription{}, []tableDescription{} i, j := 0, 0 @@ -260,55 +280,11 @@ func (m *DynamoTableManager) partitionTables(ctx context.Context, descriptions [ return toCreate, toCheckThroughput, nil } -func (m *DynamoTableManager) listTables(ctx context.Context) ([]string, error) { - table := []string{} - if err := instrument.TimeRequestHistogram(ctx, "DynamoDB.ListTablesPages", dynamoRequestDuration, func(_ context.Context) error { - return m.cfg.DynamoDB.ListTablesPages(&dynamodb.ListTablesInput{}, func(resp *dynamodb.ListTablesOutput, _ bool) bool { - for _, s := range resp.TableNames { - table = append(table, *s) - } - return true - }) - }); err != nil { - return nil, err - } - sort.Strings(table) - return table, nil -} - func (m *DynamoTableManager) createTables(ctx context.Context, descriptions []tableDescription) error { for _, desc := range descriptions { - params := &dynamodb.CreateTableInput{ - TableName: aws.String(desc.name), - AttributeDefinitions: []*dynamodb.AttributeDefinition{ - { - AttributeName: aws.String(hashKey), - AttributeType: aws.String("S"), - }, - { - AttributeName: aws.String(rangeKey), - AttributeType: aws.String("B"), - }, - }, - KeySchema: []*dynamodb.KeySchemaElement{ - { - AttributeName: aws.String(hashKey), - KeyType: aws.String("HASH"), - }, - { - AttributeName: aws.String(rangeKey), - KeyType: aws.String("RANGE"), - }, - }, - ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(desc.provisionedRead), - WriteCapacityUnits: aws.Int64(desc.provisionedWrite), - }, - } log.Infof("Creating table %s", desc.name) if err := instrument.TimeRequestHistogram(ctx, "DynamoDB.CreateTable", dynamoRequestDuration, func(_ context.Context) error { - _, err := m.cfg.DynamoDB.CreateTable(params) - return err + return m.dynamoDB.CreateTable(desc.name, desc.provisionedRead, desc.provisionedWrite) }); err != nil { return err } @@ -319,40 +295,32 @@ func (m *DynamoTableManager) createTables(ctx context.Context, descriptions []ta func (m *DynamoTableManager) updateTables(ctx context.Context, descriptions []tableDescription) error { for _, desc := range descriptions { log.Infof("Checking provisioned throughput on table %s", desc.name) - var out *dynamodb.DescribeTableOutput + var readCapacity, writeCapacity int64 + var status string if err := instrument.TimeRequestHistogram(ctx, "DynamoDB.DescribeTable", dynamoRequestDuration, func(_ context.Context) error { var err error - out, err = m.cfg.DynamoDB.DescribeTable(&dynamodb.DescribeTableInput{ - TableName: aws.String(desc.name), - }) + readCapacity, writeCapacity, status, err = m.dynamoDB.DescribeTable(desc.name) return err }); err != nil { return err } - if *out.Table.TableStatus != dynamodb.TableStatusActive { - log.Infof("Skipping update on table %s, not yet ACTIVE", desc.name) + if status != dynamodb.TableStatusActive { + log.Infof("Skipping update on table %s, not yet ACTIVE (%s)", desc.name, status) continue } - tableCapacity.WithLabelValues(readLabel, desc.name).Set(float64(*out.Table.ProvisionedThroughput.ReadCapacityUnits)) - tableCapacity.WithLabelValues(writeLabel, desc.name).Set(float64(*out.Table.ProvisionedThroughput.WriteCapacityUnits)) + tableCapacity.WithLabelValues(readLabel, desc.name).Set(float64(readCapacity)) + tableCapacity.WithLabelValues(writeLabel, desc.name).Set(float64(writeCapacity)) - if *out.Table.ProvisionedThroughput.ReadCapacityUnits == desc.provisionedRead && *out.Table.ProvisionedThroughput.WriteCapacityUnits == desc.provisionedWrite { - log.Infof(" Provisioned throughput: read = %d, write = %d, skipping.", *out.Table.ProvisionedThroughput.ReadCapacityUnits, *out.Table.ProvisionedThroughput.WriteCapacityUnits) + if readCapacity == desc.provisionedRead && writeCapacity == desc.provisionedWrite { + log.Infof(" Provisioned throughput: read = %d, write = %d, skipping.", readCapacity, writeCapacity) continue } log.Infof(" Updating provisioned throughput on table %s to read = %d, write = %d", desc.name, desc.provisionedRead, desc.provisionedWrite) if err := instrument.TimeRequestHistogram(ctx, "DynamoDB.DescribeTable", dynamoRequestDuration, func(_ context.Context) error { - _, err := m.cfg.DynamoDB.UpdateTable(&dynamodb.UpdateTableInput{ - TableName: aws.String(desc.name), - ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(desc.provisionedRead), - WriteCapacityUnits: aws.Int64(desc.provisionedWrite), - }, - }) - return err + return m.dynamoDB.UpdateTable(desc.name, desc.provisionedRead, desc.provisionedWrite) }); err != nil { return err } diff --git a/chunk/dynamo_table_manager_test.go b/chunk/table_manager_test.go similarity index 84% rename from chunk/dynamo_table_manager_test.go rename to chunk/table_manager_test.go index 410baaccdd..a389efc84c 100644 --- a/chunk/dynamo_table_manager_test.go +++ b/chunk/table_manager_test.go @@ -5,8 +5,6 @@ import ( "testing" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/prometheus/common/model" "github.com/weaveworks/common/mtime" "golang.org/x/net/context" @@ -26,12 +24,10 @@ const ( ) func TestDynamoTableManager(t *testing.T) { - dynamoDB := NewMockDynamoDB(0, 0) + dynamoDB := NewMockStorage() cfg := TableManagerConfig{ - DynamoDB: DynamoDBClientValue{ - DynamoDBClient: dynamoDB, - }, + mockDynamoDB: dynamoDB, PeriodicTableConfig: PeriodicTableConfig{ UsePeriodicTables: true, @@ -149,14 +145,9 @@ func TestDynamoTableManager(t *testing.T) { ) } -func expectTables(t *testing.T, dynamo DynamoDBClient, expected []tableDescription) { - tables := []string{} - if err := dynamo.ListTablesPages(&dynamodb.ListTablesInput{}, func(resp *dynamodb.ListTablesOutput, _ bool) bool { - for _, s := range resp.TableNames { - tables = append(tables, *s) - } - return true - }); err != nil { +func expectTables(t *testing.T, dynamo StorageClient, expected []tableDescription) { + tables, err := dynamo.ListTables() + if err != nil { t.Fatal(err) } @@ -172,19 +163,17 @@ func expectTables(t *testing.T, dynamo DynamoDBClient, expected []tableDescripti t.Fatalf("Expected '%s', found '%s'", desc.name, tables[i]) } - out, err := dynamo.DescribeTable(&dynamodb.DescribeTableInput{ - TableName: aws.String(desc.name), - }) + read, write, _, err := dynamo.DescribeTable(desc.name) if err != nil { t.Fatal(err) } - if *out.Table.ProvisionedThroughput.ReadCapacityUnits != desc.provisionedRead { - t.Fatalf("Expected '%d', found '%d' for table '%s'", desc.provisionedRead, *out.Table.ProvisionedThroughput.ReadCapacityUnits, desc.name) + if read != desc.provisionedRead { + t.Fatalf("Expected '%d', found '%d' for table '%s'", desc.provisionedRead, read, desc.name) } - if *out.Table.ProvisionedThroughput.WriteCapacityUnits != desc.provisionedWrite { - t.Fatalf("Expected '%d', found '%d' for table '%s'", desc.provisionedWrite, *out.Table.ProvisionedThroughput.WriteCapacityUnits, desc.name) + if write != desc.provisionedWrite { + t.Fatalf("Expected '%d', found '%d' for table '%s'", desc.provisionedWrite, write, desc.name) } } } diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index f9fd70a13d..4f90a7995a 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -34,7 +34,7 @@ func main() { defer registration.Ring.Stop() server := server.New(serverConfig, registration.Ring) - chunkStore, err := chunk.NewAWSStore(chunkStoreConfig) + chunkStore, err := chunk.NewStore(chunkStoreConfig) if err != nil { log.Fatal(err) } diff --git a/cmd/querier/main.go b/cmd/querier/main.go index a60078498d..252935232b 100644 --- a/cmd/querier/main.go +++ b/cmd/querier/main.go @@ -48,7 +48,7 @@ func main() { server := server.New(serverConfig, r) defer server.Stop() - chunkStore, err := chunk.NewAWSStore(chunkStoreConfig) + chunkStore, err := chunk.NewStore(chunkStoreConfig) if err != nil { log.Fatal(err) } diff --git a/cmd/ruler/main.go b/cmd/ruler/main.go index 760dcada1d..464ba5de73 100644 --- a/cmd/ruler/main.go +++ b/cmd/ruler/main.go @@ -25,7 +25,7 @@ func main() { util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &rulerConfig, &chunkStoreConfig) flag.Parse() - chunkStore, err := chunk.NewAWSStore(chunkStoreConfig) + chunkStore, err := chunk.NewStore(chunkStoreConfig) if err != nil { log.Fatal(err) } diff --git a/ingester/ingester.go b/ingester/ingester.go index 20c52e7d8e..bcaa366783 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -73,7 +73,7 @@ var ( // Its like MemorySeriesStorage, but simpler. type Ingester struct { cfg Config - chunkStore cortex_chunk.Store + chunkStore ChunkStore userStates *userStates ring *ring.Ring @@ -99,6 +99,11 @@ type Ingester struct { memoryChunks prometheus.Gauge } +// ChunkStore is the interface we need to store chunks +type ChunkStore interface { + Put(ctx context.Context, chunks []cortex_chunk.Chunk) error +} + // Config configures an Ingester. type Config struct { FlushCheckPeriod time.Duration @@ -136,7 +141,7 @@ func (o *flushOp) Priority() int64 { } // New constructs a new Ingester. -func New(cfg Config, chunkStore cortex_chunk.Store, ring *ring.Ring) (*Ingester, error) { +func New(cfg Config, chunkStore ChunkStore, ring *ring.Ring) (*Ingester, error) { if cfg.FlushCheckPeriod == 0 { cfg.FlushCheckPeriod = 1 * time.Minute } diff --git a/ingester/ingester_test.go b/ingester/ingester_test.go index 83a7af004f..4e6df1517c 100644 --- a/ingester/ingester_test.go +++ b/ingester/ingester_test.go @@ -37,10 +37,6 @@ func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error { return nil } -func (s *testStore) Get(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]chunk.Chunk, error) { - return nil, nil -} - func (s *testStore) Stop() {} func buildTestMatrix(numSeries int, samplesPerSeries int, offset int) model.Matrix { diff --git a/querier/querier.go b/querier/querier.go index 136ee410fc..7f7d6aabe9 100644 --- a/querier/querier.go +++ b/querier/querier.go @@ -15,14 +15,19 @@ import ( "github.com/weaveworks/cortex/util" ) +// ChunkStore is the interface we need to get chunks +type ChunkStore interface { + Get(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]chunk.Chunk, error) +} + // NewEngine creates a new promql.Engine for cortex. -func NewEngine(distributor Querier, chunkStore chunk.Store) *promql.Engine { +func NewEngine(distributor Querier, chunkStore ChunkStore) *promql.Engine { queryable := NewQueryable(distributor, chunkStore) return promql.NewEngine(queryable, nil) } // NewQueryable creates a new Queryable for cortex. -func NewQueryable(distributor Querier, chunkStore chunk.Store) Queryable { +func NewQueryable(distributor Querier, chunkStore ChunkStore) Queryable { return Queryable{ Q: MergeQuerier{ Queriers: []Querier{ @@ -45,7 +50,7 @@ type Querier interface { // A ChunkQuerier is a Querier that fetches samples from a ChunkStore. type ChunkQuerier struct { - Store chunk.Store + Store ChunkStore } // Query implements Querier and transforms a list of chunks into sample diff --git a/ruler/ruler.go b/ruler/ruler.go index 11867716cb..fd63ee200e 100644 --- a/ruler/ruler.go +++ b/ruler/ruler.go @@ -80,7 +80,7 @@ type Ruler struct { } // NewRuler creates a new ruler from a distributor and chunk store. -func NewRuler(cfg Config, d *distributor.Distributor, c chunk.Store) Ruler { +func NewRuler(cfg Config, d *distributor.Distributor, c *chunk.Store) Ruler { return Ruler{querier.NewEngine(d, c), d, cfg.ExternalURL.URL} }