Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract away the interface between the chunk store and DynamoDB #275

Merged
merged 1 commit into from
Feb 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 79 additions & 91 deletions chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sort"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
Expand All @@ -20,17 +19,11 @@ import (
"github.com/weaveworks/cortex/util"
)

const (
hashKey = "h"
rangeKey = "r"
chunkKey = "c"
)

var (
indexEntriesPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "chunk_store_index_entries_per_chunk",
Help: "Number of entries written to dynamodb per chunk.",
Help: "Number of entries written to storage per chunk.",
Buckets: prometheus.ExponentialBuckets(1, 2, 5),
})
s3RequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Expand All @@ -43,7 +36,7 @@ var (
HistogramOpts: prometheus.HistogramOpts{
Namespace: "cortex",
Name: "chunk_store_row_writes_distribution",
Help: "Distribution of writes to individual DynamoDB rows",
Help: "Distribution of writes to individual storage rows",
Buckets: prometheus.DefBuckets,
},
HashBuckets: 1024,
Expand All @@ -56,18 +49,17 @@ func init() {
prometheus.MustRegister(rowWrites)
}

// Store type stores and indexes chunks
type Store interface {
Put(ctx context.Context, chunks []Chunk) error
Get(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]Chunk, error)
}

// StoreConfig specifies config for a ChunkStore
type StoreConfig struct {
SchemaConfig
CacheConfig
S3 S3ClientValue
DynamoDB DynamoDBClientValue
S3 util.URLValue
DynamoDB util.URLValue

mockS3 S3Client
mockBucketName string
mockDynamoDB StorageClient
mockTableName string

// For injecting different schemas in tests.
schemaFactory func(cfg SchemaConfig) Schema
Expand All @@ -82,17 +74,39 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.DynamoDB, "dynamodb.url", "DynamoDB endpoint URL.")
}

// AWSStore implements ChunkStore for AWS
type AWSStore struct {
cfg StoreConfig
cache *Cache
dynamo *dynamoDBBackoffClient
schema Schema
// Store implements Store
type Store struct {
cfg StoreConfig

storage StorageClient
tableName string
s3 S3Client
bucketName string
cache *Cache
schema Schema
}

// NewAWSStore makes a new ChunkStore
func NewAWSStore(cfg StoreConfig) (*AWSStore, error) {
cfg.SchemaConfig.OriginalTableName = cfg.DynamoDB.TableName
// NewStore makes a new ChunkStore
func NewStore(cfg StoreConfig) (*Store, error) {
dynamoDBClient, tableName := cfg.mockDynamoDB, cfg.mockTableName
if dynamoDBClient == nil {
var err error
dynamoDBClient, tableName, err = NewDynamoDBClient(cfg.DynamoDB.String())
if err != nil {
return nil, err
}
}

s3Client, bucketName := cfg.mockS3, cfg.mockBucketName
if s3Client == nil {
var err error
s3Client, bucketName, err = NewS3Client(cfg.S3.String())
if err != nil {
return nil, err
}
}

cfg.SchemaConfig.OriginalTableName = tableName
var schema Schema
var err error
if cfg.schemaFactory == nil {
Expand All @@ -104,11 +118,14 @@ func NewAWSStore(cfg StoreConfig) (*AWSStore, error) {
return nil, err
}

return &AWSStore{
cfg: cfg,
schema: schema,
cache: NewCache(cfg.CacheConfig),
dynamo: newDynamoDBBackoffClient(cfg.DynamoDB),
return &Store{
cfg: cfg,
storage: dynamoDBClient,
tableName: tableName,
s3: s3Client,
bucketName: bucketName,
schema: schema,
cache: NewCache(cfg.CacheConfig),
}, nil
}

Expand All @@ -117,7 +134,7 @@ func chunkName(userID, chunkID string) string {
}

// Put implements ChunkStore
func (c *AWSStore) Put(ctx context.Context, chunks []Chunk) error {
func (c *Store) Put(ctx context.Context, chunks []Chunk) error {
userID, err := user.GetID(ctx)
if err != nil {
return err
Expand All @@ -132,7 +149,7 @@ func (c *AWSStore) Put(ctx context.Context, chunks []Chunk) error {
}

// putChunks writes a collection of chunks to S3 in parallel.
func (c *AWSStore) putChunks(ctx context.Context, userID string, chunks []Chunk) error {
func (c *Store) putChunks(ctx context.Context, userID string, chunks []Chunk) error {
incomingErrors := make(chan error)
for _, chunk := range chunks {
go func(chunk Chunk) {
Expand All @@ -151,17 +168,17 @@ func (c *AWSStore) putChunks(ctx context.Context, userID string, chunks []Chunk)
}

// putChunk puts a chunk into S3.
func (c *AWSStore) putChunk(ctx context.Context, userID string, chunk *Chunk) error {
func (c *Store) putChunk(ctx context.Context, userID string, chunk *Chunk) error {
body, err := chunk.reader()
if err != nil {
return err
}

err = instrument.TimeRequestHistogram(ctx, "S3.PutObject", s3RequestDuration, func(_ context.Context) error {
var err error
_, err = c.cfg.S3.PutObject(&s3.PutObjectInput{
_, err = c.s3.PutObject(&s3.PutObjectInput{
Body: body,
Bucket: aws.String(c.cfg.S3.BucketName),
Bucket: aws.String(c.bucketName),
Key: aws.String(chunkName(userID, chunk.ID)),
})
return err
Expand All @@ -176,19 +193,19 @@ func (c *AWSStore) putChunk(ctx context.Context, userID string, chunk *Chunk) er
return nil
}

func (c *AWSStore) updateIndex(ctx context.Context, userID string, chunks []Chunk) error {
func (c *Store) updateIndex(ctx context.Context, userID string, chunks []Chunk) error {
writeReqs, err := c.calculateDynamoWrites(userID, chunks)
if err != nil {
return err
}

return c.dynamo.batchWriteDynamo(ctx, writeReqs)
return c.storage.BatchWrite(ctx, writeReqs)
}

// calculateDynamoWrites creates a set of WriteRequests to dynamo for all `the
// chunks it is given.
func (c *AWSStore) calculateDynamoWrites(userID string, chunks []Chunk) (map[string][]*dynamodb.WriteRequest, error) {
writeReqs := map[string][]*dynamodb.WriteRequest{}
// calculateDynamoWrites creates a set of batched WriteRequests to dynamo for all
// the chunks it is given.
func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch, error) {
writeReqs := c.storage.NewWriteBatch()
for _, chunk := range chunks {
metricName, err := util.ExtractMetricNameFromMetric(chunk.Metric)
if err != nil {
Expand All @@ -203,31 +220,23 @@ func (c *AWSStore) calculateDynamoWrites(userID string, chunks []Chunk) (map[str

for _, entry := range entries {
rowWrites.Observe(entry.HashKey, 1)

writeReqs[entry.TableName] = append(writeReqs[entry.TableName], &dynamodb.WriteRequest{
PutRequest: &dynamodb.PutRequest{
Item: map[string]*dynamodb.AttributeValue{
hashKey: {S: aws.String(entry.HashKey)},
rangeKey: {B: entry.RangeKey},
},
},
})
writeReqs.Add(entry.TableName, entry.HashKey, entry.RangeKey)
}
}
return writeReqs, nil
}

// Get implements ChunkStore
func (c *AWSStore) Get(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]Chunk, error) {
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]Chunk, error) {
userID, err := user.GetID(ctx)
if err != nil {
return nil, err
}

filters, matchers := util.SplitFiltersAndMatchers(allMatchers)

// Fetch chunk descriptors (just ID really) from DynamoDB
chunks, err := c.lookupChunks(ctx, userID, from, through, matchers)
// Fetch chunk descriptors (just ID really) from storage
chunks, err := c.lookupMatchers(ctx, userID, from, through, matchers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -281,7 +290,7 @@ outer:
return filteredChunks, nil
}

func (c *AWSStore) lookupChunks(ctx context.Context, userID string, from, through model.Time, matchers []*metric.LabelMatcher) ([]Chunk, error) {
func (c *Store) lookupMatchers(ctx context.Context, userID string, from, through model.Time, matchers []*metric.LabelMatcher) ([]Chunk, error) {
metricName, matchers, err := util.ExtractMetricNameFromMatchers(matchers)
if err != nil {
return nil, err
Expand Down Expand Up @@ -333,7 +342,7 @@ func (c *AWSStore) lookupChunks(ctx context.Context, userID string, from, throug
return nWayIntersect(chunkSets), lastErr
}

func (c *AWSStore) lookupEntries(ctx context.Context, entries []IndexEntry, matcher *metric.LabelMatcher) (ByID, error) {
func (c *Store) lookupEntries(ctx context.Context, entries []IndexEntry, matcher *metric.LabelMatcher) (ByID, error) {
incomingChunkSets := make(chan ByID)
incomingErrors := make(chan error)
for _, entry := range entries {
Expand Down Expand Up @@ -361,50 +370,29 @@ func (c *AWSStore) lookupEntries(ctx context.Context, entries []IndexEntry, matc
return chunks, lastErr
}

func (c *AWSStore) lookupEntry(ctx context.Context, entry IndexEntry, matcher *metric.LabelMatcher) (ByID, error) {
input := &dynamodb.QueryInput{
TableName: aws.String(entry.TableName),
KeyConditions: map[string]*dynamodb.Condition{
hashKey: {
AttributeValueList: []*dynamodb.AttributeValue{
{S: aws.String(entry.HashKey)},
},
ComparisonOperator: aws.String("EQ"),
},
},
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
}
if len(entry.RangeKey) > 0 {
input.KeyConditions[rangeKey] = &dynamodb.Condition{
AttributeValueList: []*dynamodb.AttributeValue{
{B: entry.RangeKey},
},
ComparisonOperator: aws.String(dynamodb.ComparisonOperatorBeginsWith),
}
}

func (c *Store) lookupEntry(ctx context.Context, entry IndexEntry, matcher *metric.LabelMatcher) (ByID, error) {
var chunkSet ByID
var processingError error
if err := c.dynamo.queryPages(ctx, input, func(resp interface{}, lastPage bool) (shouldContinue bool) {
processingError = processResponse(resp.(*dynamodb.QueryOutput), &chunkSet, matcher)
if err := c.storage.QueryPages(ctx, entry.TableName, entry.HashKey, entry.RangeKey, func(resp ReadBatch, lastPage bool) (shouldContinue bool) {
processingError = processResponse(resp, &chunkSet, matcher)
return processingError != nil && !lastPage
}); err != nil {
log.Errorf("Error querying DynamoDB: %v", err)
log.Errorf("Error querying storage: %v", err)
return nil, err
} else if processingError != nil {
log.Errorf("Error processing DynamoDB response: %v", processingError)
log.Errorf("Error processing storage response: %v", processingError)
return nil, processingError
}
sort.Sort(ByID(chunkSet))
chunkSet = unique(chunkSet)
return chunkSet, nil
}

func processResponse(resp *dynamodb.QueryOutput, chunkSet *ByID, matcher *metric.LabelMatcher) error {
for _, item := range resp.Items {
rangeValue := item[rangeKey].B
func processResponse(resp ReadBatch, chunkSet *ByID, matcher *metric.LabelMatcher) error {
for i := 0; i < resp.Len(); i++ {
rangeValue := resp.RangeValue(i)
if rangeValue == nil {
return fmt.Errorf("invalid item: %v", item)
return fmt.Errorf("invalid item: %d", i)
}
_, value, chunkID, err := parseRangeValue(rangeValue)
if err != nil {
Expand All @@ -415,8 +403,8 @@ func processResponse(resp *dynamodb.QueryOutput, chunkSet *ByID, matcher *metric
ID: chunkID,
}

if chunkValue, ok := item[chunkKey]; ok && chunkValue.B != nil {
if err := json.Unmarshal(chunkValue.B, &chunk); err != nil {
if value := resp.Value(i); value != nil {
if err := json.Unmarshal(value, &chunk); err != nil {
return err
}
chunk.metadataInIndex = true
Expand All @@ -431,16 +419,16 @@ func processResponse(resp *dynamodb.QueryOutput, chunkSet *ByID, matcher *metric
return nil
}

func (c *AWSStore) fetchChunkData(ctx context.Context, userID string, chunkSet []Chunk) ([]Chunk, error) {
func (c *Store) fetchChunkData(ctx context.Context, userID string, chunkSet []Chunk) ([]Chunk, error) {
incomingChunks := make(chan Chunk)
incomingErrors := make(chan error)
for _, chunk := range chunkSet {
go func(chunk Chunk) {
var resp *s3.GetObjectOutput
err := instrument.TimeRequestHistogram(ctx, "S3.GetObject", s3RequestDuration, func(_ context.Context) error {
var err error
resp, err = c.cfg.S3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(c.cfg.S3.BucketName),
resp, err = c.s3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(c.bucketName),
Key: aws.String(chunkName(userID, chunk.ID)),
})
return err
Expand Down
Loading