diff --git a/src/cmd/services/m3dbnode/config/config.go b/src/cmd/services/m3dbnode/config/config.go index 98024c2560..ccd79c6a0f 100644 --- a/src/cmd/services/m3dbnode/config/config.go +++ b/src/cmd/services/m3dbnode/config/config.go @@ -176,8 +176,23 @@ type CommitLogPolicy struct { FlushEvery time.Duration `yaml:"flushEvery" validate:"nonzero"` // The queue the commit log will keep in front of the current commit log segment. + // Modifying values in this policy will control how many pending writes can be + // in the commitlog queue before M3DB will begin rejecting writes. Queue CommitLogQueuePolicy `yaml:"queue" validate:"nonzero"` + // The actual Golang channel that implements the commit log queue. We separate this + // from the Queue field for historical / legacy reasons. Generally speaking, the + // values in this config should not need to be modified, but we leave it in for + // tuning purposes. Unlike the Queue field, values in this policy control the size + // of the channel that backs the queue. Since writes to the commitlog are batched, + // setting the size of this policy will control how many batches can be queued, and + // indrectly how many writes can be queued, but that is dependent on the batch size + // of the client. As a result, we recommend that users avoid tuning this field and + // modify the Queue size instead which maps directly to the number of writes. This + // works in most cases because the default size of the QueueChannel should be large + // enough for almost all workloads assuming a reasonable batch size is used. + QueueChannel *CommitLogQueuePolicy `yaml:"queueChannel"` + // The commit log block size. BlockSize time.Duration `yaml:"blockSize" validate:"nonzero"` } diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 86cee751d5..87973a4dd5 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -206,6 +206,13 @@ db: size: 8192 lowWatermark: 0.01 highWatermark: 0.02 + writeBatchPool: + initialBatchSize: 128 + maxBatchSize: 100000 + pool: + size: 8192 + lowWatermark: 0.01 + highWatermark: 0.02 identifierPool: size: 9437184 lowWatermark: 0.01 @@ -389,6 +396,7 @@ db: queue: calculationType: fixed size: 2097152 + queueChannel: null blockSize: 10m0s repair: enabled: false @@ -522,6 +530,13 @@ db: size: 8192 lowWatermark: 0.01 highWatermark: 0.02 + writeBatchPool: + initialBatchSize: 128 + maxBatchSize: 100000 + pool: + size: 8192 + lowWatermark: 0.01 + highWatermark: 0.02 config: service: zone: embedded diff --git a/src/cmd/services/m3dbnode/config/pooling.go b/src/cmd/services/m3dbnode/config/pooling.go index 45508e8c3d..8e986b94b4 100644 --- a/src/cmd/services/m3dbnode/config/pooling.go +++ b/src/cmd/services/m3dbnode/config/pooling.go @@ -34,120 +34,123 @@ const ( // PoolingPolicy specifies the pooling policy. type PoolingPolicy struct { - // The initial alloc size for a block + // The initial alloc size for a block. BlockAllocSize int `yaml:"blockAllocSize"` // The general pool type (currently only supported: simple). Type PoolingType `yaml:"type"` - // The Bytes pool buckets to use + // The Bytes pool buckets to use. BytesPool BucketPoolPolicy `yaml:"bytesPool"` - // The policy for the Closers pool + // The policy for the Closers pool. ClosersPool PoolPolicy `yaml:"closersPool"` - // The policy for the Context pool + // The policy for the Context pool. ContextPool ContextPoolPolicy `yaml:"contextPool"` - // The policy for the DatabaseSeries pool + // The policy for the DatabaseSeries pool. SeriesPool PoolPolicy `yaml:"seriesPool"` - // The policy for the DatabaseBlock pool + // The policy for the DatabaseBlock pool. BlockPool PoolPolicy `yaml:"blockPool"` - // The policy for the Encoder pool + // The policy for the Encoder pool. EncoderPool PoolPolicy `yaml:"encoderPool"` - // The policy for the Iterator pool + // The policy for the Iterator pool. IteratorPool PoolPolicy `yaml:"iteratorPool"` - // The policy for the Segment Reader pool + // The policy for the Segment Reader pool. SegmentReaderPool PoolPolicy `yaml:"segmentReaderPool"` - // The policy for the Identifier pool + // The policy for the Identifier pool. IdentifierPool PoolPolicy `yaml:"identifierPool"` - // The policy for the FetchBlockMetadataResult pool + // The policy for the FetchBlockMetadataResult pool. FetchBlockMetadataResultsPool CapacityPoolPolicy `yaml:"fetchBlockMetadataResultsPool"` - // The policy for the FetchBlocksMetadataResults pool + // The policy for the FetchBlocksMetadataResults pool. FetchBlocksMetadataResultsPool CapacityPoolPolicy `yaml:"fetchBlocksMetadataResultsPool"` - // The policy for the HostBlockMetadataSlice pool + // The policy for the HostBlockMetadataSlice pool. HostBlockMetadataSlicePool CapacityPoolPolicy `yaml:"hostBlockMetadataSlicePool"` - // The policy for the BlockMetadat pool + // The policy for the BlockMetadat pool. BlockMetadataPool PoolPolicy `yaml:"blockMetadataPool"` - // The policy for the BlockMetadataSlice pool + // The policy for the BlockMetadataSlice pool. BlockMetadataSlicePool CapacityPoolPolicy `yaml:"blockMetadataSlicePool"` - // The policy for the BlocksMetadata pool + // The policy for the BlocksMetadata pool. BlocksMetadataPool PoolPolicy `yaml:"blocksMetadataPool"` - // The policy for the BlocksMetadataSlice pool + // The policy for the BlocksMetadataSlice pool. BlocksMetadataSlicePool CapacityPoolPolicy `yaml:"blocksMetadataSlicePool"` - // The policy for the tags pool + // The policy for the tags pool. TagsPool MaxCapacityPoolPolicy `yaml:"tagsPool"` - // The policy for the tags iterator pool + // The policy for the tags iterator pool. TagsIteratorPool PoolPolicy `yaml:"tagIteratorPool"` - // The policy for the index.ResultsPool + // The policy for the index.ResultsPool. IndexResultsPool PoolPolicy `yaml:"indexResultsPool"` - // The policy for the TagEncoderPool + // The policy for the TagEncoderPool. TagEncoderPool PoolPolicy `yaml:"tagEncoderPool"` - // The policy for the TagDecoderPool + // The policy for the TagDecoderPool. TagDecoderPool PoolPolicy `yaml:"tagDecoderPool"` + + // The policy for the WriteBatchPool. + WriteBatchPool WriteBatchPoolPolicy `yaml:"writeBatchPool"` } // PoolPolicy specifies a single pool policy. type PoolPolicy struct { - // The size of the pool + // The size of the pool. Size int `yaml:"size"` - // The low watermark to start refilling the pool, if zero none + // The low watermark to start refilling the pool, if zero none. RefillLowWaterMark float64 `yaml:"lowWatermark" validate:"min=0.0,max=1.0"` - // The high watermark to stop refilling the pool, if zero none + // The high watermark to stop refilling the pool, if zero none. RefillHighWaterMark float64 `yaml:"highWatermark" validate:"min=0.0,max=1.0"` } // CapacityPoolPolicy specifies a single pool policy that has a // per element capacity. type CapacityPoolPolicy struct { - // The size of the pool + // The size of the pool. Size int `yaml:"size"` - // The capacity of items in the pool + // The capacity of items in the pool. Capacity int `yaml:"capacity"` - // The low watermark to start refilling the pool, if zero none + // The low watermark to start refilling the pool, if zero none. RefillLowWaterMark float64 `yaml:"lowWatermark" validate:"min=0.0,max=1.0"` - // The high watermark to stop refilling the pool, if zero none + // The high watermark to stop refilling the pool, if zero none. RefillHighWaterMark float64 `yaml:"highWatermark" validate:"min=0.0,max=1.0"` } // MaxCapacityPoolPolicy specifies a single pool policy that has a // per element capacity, and a maximum allowed capacity as well. type MaxCapacityPoolPolicy struct { - // The size of the pool + // The size of the pool. Size int `yaml:"size"` - // The capacity of items in the pool + // The capacity of items in the pool. Capacity int `yaml:"capacity"` - // The max capacity of items in the pool + // The max capacity of items in the pool. MaxCapacity int `yaml:"maxCapacity"` - // The low watermark to start refilling the pool, if zero none + // The low watermark to start refilling the pool, if zero none. RefillLowWaterMark float64 `yaml:"lowWatermark" validate:"min=0.0,max=1.0"` - // The high watermark to stop refilling the pool, if zero none + // The high watermark to stop refilling the pool, if zero none. RefillHighWaterMark float64 `yaml:"highWatermark" validate:"min=0.0,max=1.0"` } @@ -157,15 +160,15 @@ type BucketPoolPolicy struct { Buckets []CapacityPoolPolicy `yaml:"buckets"` } -// ContextPoolPolicy specifies the policy for the context pool +// ContextPoolPolicy specifies the policy for the context pool. type ContextPoolPolicy struct { // The size of the pool Size int `yaml:"size"` - // The low watermark to start refilling the pool, if zero none + // The low watermark to start refilling the pool, if zero none. RefillLowWaterMark float64 `yaml:"lowWatermark" validate:"min=0.0,max=1.0"` - // The high watermark to stop refilling the pool, if zero none + // The high watermark to stop refilling the pool, if zero none. RefillHighWaterMark float64 `yaml:"highWatermark" validate:"min=0.0,max=1.0"` // The maximum allowable size for a slice of finalizers that the @@ -175,7 +178,21 @@ type ContextPoolPolicy struct { MaxFinalizerCapacity int `yaml:"maxFinalizerCapacity" validate:"min=0"` } -// PoolPolicy returns the PoolPolicy that is represented by the ContextPoolPolicy +// WriteBatchPoolPolicy specifies the pooling policy for the WriteBatch pool. +type WriteBatchPoolPolicy struct { + // InitialBatchSize controls the initial batch size for each WriteBatch when + // the pool is being constructed / refilled. + InitialBatchSize *int `yaml:"initialBatchSize"` + + // MaxBatchSize controls the maximum size that a pooled WriteBatch can grow to + // and still remain in the pool. + MaxBatchSize *int `yaml:"maxBatchSize"` + + // Pool is the Pooling Policy for the WriteBatch pool. + Pool PoolPolicy `yaml:"pool"` +} + +// PoolPolicy returns the PoolPolicy that is represented by the ContextPoolPolicy. func (c ContextPoolPolicy) PoolPolicy() PoolPolicy { return PoolPolicy{ Size: c.Size, @@ -185,7 +202,7 @@ func (c ContextPoolPolicy) PoolPolicy() PoolPolicy { } // MaxFinalizerCapacityWithDefault returns the maximum finalizer capacity and -// fallsback to the default value if its not set +// fallsback to the default value if its not set. func (c ContextPoolPolicy) MaxFinalizerCapacityWithDefault() int { if c.MaxFinalizerCapacity == 0 { return defaultMaxFinalizerCapacity diff --git a/src/dbnode/integration/commitlog_bootstrap_helpers.go b/src/dbnode/integration/commitlog_bootstrap_helpers.go index 167d1c619f..c706a0f1ec 100644 --- a/src/dbnode/integration/commitlog_bootstrap_helpers.go +++ b/src/dbnode/integration/commitlog_bootstrap_helpers.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/integration/generate" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/storage/namespace" + "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3x/context" "github.com/m3db/m3x/ident" xtime "github.com/m3db/m3x/time" @@ -152,18 +153,18 @@ func writeCommitLogDataBase( ) // Write out commit log data - for ts, blk := range data { + for currTs, blk := range data { if specifiedTS != nil { s.setNowFn(*specifiedTS) } else { - s.setNowFn(ts.ToTime()) + s.setNowFn(currTs.ToTime()) } ctx := context.NewContext() defer ctx.Close() m := map[xtime.UnixNano]generate.SeriesBlock{ - ts: blk, + currTs: blk, } points := generate. @@ -179,7 +180,7 @@ func writeCommitLogDataBase( for _, point := range points { series, ok := seriesLookup[point.ID.String()] require.True(t, ok) - cId := commitlog.Series{ + cId := ts.Series{ Namespace: namespace.ID(), Shard: shardSet.Lookup(point.ID), ID: point.ID, diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 4991d85f43..9c20b343e6 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -35,6 +35,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/dbnode/x/xpool" "github.com/m3db/m3/src/x/serialize" @@ -63,6 +64,8 @@ var ( const ( initSegmentArrayPoolLength = 4 maxSegmentArrayPooledLength = 32 + // Any pooled error slices that grow beyond this capcity will be thrown away. + writeBatchPooledReqPoolMaxErrorsSliceSize = 4096 ) var ( @@ -824,50 +827,58 @@ func (s *service) WriteBatchRaw(tctx thrift.Context, req *rpc.WriteBatchRawReque pooledReq.writeReq = req ctx.RegisterFinalizer(pooledReq) - nsID := s.newPooledID(ctx, req.NameSpace, pooledReq) - var ( - errs []*rpc.WriteBatchRawError - success int + nsID = s.newPooledID(ctx, req.NameSpace, pooledReq) retryableErrors int nonRetryableErrors int ) + + batchWriter, err := s.db.BatchWriter(nsID, len(req.Elements)) + if err != nil { + return err + } + for i, elem := range req.Elements { unit, unitErr := convert.ToUnit(elem.Datapoint.TimestampTimeType) if unitErr != nil { nonRetryableErrors++ - errs = append(errs, tterrors.NewBadRequestWriteBatchRawError(i, unitErr)) + pooledReq.addError(tterrors.NewBadRequestWriteBatchRawError(i, unitErr)) continue } d, err := unit.Value() if err != nil { nonRetryableErrors++ - errs = append(errs, tterrors.NewBadRequestWriteBatchRawError(i, err)) + pooledReq.addError(tterrors.NewBadRequestWriteBatchRawError(i, err)) continue } seriesID := s.newPooledID(ctx, elem.ID, pooledReq) - if err = s.db.Write( - ctx, nsID, seriesID, + batchWriter.Add( + i, + seriesID, xtime.FromNormalizedTime(elem.Datapoint.Timestamp, d), - elem.Datapoint.Value, unit, elem.Datapoint.Annotation, - ); err != nil && xerrors.IsInvalidParams(err) { - nonRetryableErrors++ - errs = append(errs, tterrors.NewBadRequestWriteBatchRawError(i, err)) - } else if err != nil { - retryableErrors++ - errs = append(errs, tterrors.NewWriteBatchRawError(i, err)) - } else { - success++ - } + elem.Datapoint.Value, + unit, + elem.Datapoint.Annotation, + ) } - s.metrics.writeBatchRaw.ReportSuccess(success) + err = s.db.WriteBatch(ctx, nsID, batchWriter.(ts.WriteBatch), pooledReq) + if err != nil { + return err + } + + nonRetryableErrors += pooledReq.numNonRetryableErrors() + retryableErrors += pooledReq.numRetryableErrors() + totalErrors := nonRetryableErrors + retryableErrors + + s.metrics.writeBatchRaw.ReportSuccess(len(req.Elements) - totalErrors) s.metrics.writeBatchRaw.ReportRetryableErrors(retryableErrors) s.metrics.writeBatchRaw.ReportNonRetryableErrors(nonRetryableErrors) s.metrics.writeBatchRaw.ReportLatency(s.nowFn().Sub(callStart)) + errs := pooledReq.writeBatchRawErrors() if len(errs) > 0 { batchErrs := rpc.NewWriteBatchRawErrors() batchErrs.Errors = errs @@ -889,57 +900,65 @@ func (s *service) WriteTaggedBatchRaw(tctx thrift.Context, req *rpc.WriteTaggedB pooledReq.writeTaggedReq = req ctx.RegisterFinalizer(pooledReq) - nsID := s.newPooledID(ctx, req.NameSpace, pooledReq) - var ( - errs []*rpc.WriteBatchRawError - success int + nsID = s.newPooledID(ctx, req.NameSpace, pooledReq) retryableErrors int nonRetryableErrors int ) + + batchWriter, err := s.db.BatchWriter(nsID, len(req.Elements)) + if err != nil { + return err + } + for i, elem := range req.Elements { unit, unitErr := convert.ToUnit(elem.Datapoint.TimestampTimeType) if unitErr != nil { nonRetryableErrors++ - errs = append(errs, tterrors.NewBadRequestWriteBatchRawError(i, unitErr)) + pooledReq.addError(tterrors.NewBadRequestWriteBatchRawError(i, unitErr)) continue } d, err := unit.Value() if err != nil { nonRetryableErrors++ - errs = append(errs, tterrors.NewBadRequestWriteBatchRawError(i, err)) + pooledReq.addError(tterrors.NewBadRequestWriteBatchRawError(i, err)) continue } dec, err := s.newPooledTagsDecoder(ctx, elem.EncodedTags, pooledReq) if err != nil { nonRetryableErrors++ - errs = append(errs, tterrors.NewBadRequestWriteBatchRawError(i, err)) + pooledReq.addError(tterrors.NewBadRequestWriteBatchRawError(i, err)) continue } seriesID := s.newPooledID(ctx, elem.ID, pooledReq) - if err = s.db.WriteTagged( - ctx, nsID, seriesID, dec, + batchWriter.AddTagged( + i, + seriesID, + dec, xtime.FromNormalizedTime(elem.Datapoint.Timestamp, d), - elem.Datapoint.Value, unit, elem.Datapoint.Annotation, - ); err != nil && xerrors.IsInvalidParams(err) { - nonRetryableErrors++ - errs = append(errs, tterrors.NewBadRequestWriteBatchRawError(i, err)) - } else if err != nil { - retryableErrors++ - errs = append(errs, tterrors.NewWriteBatchRawError(i, err)) - } else { - success++ - } + elem.Datapoint.Value, + unit, + elem.Datapoint.Annotation) } - s.metrics.writeTaggedBatchRaw.ReportSuccess(success) + err = s.db.WriteTaggedBatch(ctx, nsID, batchWriter, pooledReq) + if err != nil { + return err + } + + nonRetryableErrors += pooledReq.numNonRetryableErrors() + retryableErrors += pooledReq.numRetryableErrors() + totalErrors := nonRetryableErrors + retryableErrors + + s.metrics.writeTaggedBatchRaw.ReportSuccess(len(req.Elements) - totalErrors) s.metrics.writeTaggedBatchRaw.ReportRetryableErrors(retryableErrors) s.metrics.writeTaggedBatchRaw.ReportNonRetryableErrors(nonRetryableErrors) s.metrics.writeTaggedBatchRaw.ReportLatency(s.nowFn().Sub(callStart)) + errs := pooledReq.writeBatchRawErrors() if len(errs) > 0 { batchErrs := rpc.NewWriteBatchRawErrors() batchErrs.Errors = errs @@ -1206,6 +1225,17 @@ type writeBatchPooledReq struct { writeReq *rpc.WriteBatchRawRequest writeTaggedReq *rpc.WriteTaggedBatchRawRequest + // We want to avoid allocating an intermediary slice of []error so we + // just include all the error handling in this struct for performance + // reasons since its pooled on a per-request basis anyways. This allows + // us to use this object as a storage.IndexedErrorHandler and avoid allocating + // []error in the storage package, as well as pool the []*rpc.WriteBatchRawError, + // although the individual *rpc.WriteBatchRawError still need to be allocated + // each time. + nonRetryableErrors int + retryableErrors int + errs []*rpc.WriteBatchRawError + pool *writeBatchPooledReqPool } @@ -1263,10 +1293,55 @@ func (r *writeBatchPooledReq) Finalize() { r.writeTaggedReq = nil } + r.nonRetryableErrors = 0 + r.retryableErrors = 0 + if cap(r.errs) <= writeBatchPooledReqPoolMaxErrorsSliceSize { + r.errs = r.errs[:0] + } else { + // Slice grew too large, throw it away and let a new one be + // allocated on the next append call. + r.errs = nil + } + // Return to pool r.pool.Put(r) } +func (r *writeBatchPooledReq) HandleError(index int, err error) { + if err == nil { + return + } + + if xerrors.IsInvalidParams(err) { + r.nonRetryableErrors++ + r.errs = append( + r.errs, + tterrors.NewBadRequestWriteBatchRawError(index, err)) + return + } + + r.retryableErrors++ + r.errs = append( + r.errs, + tterrors.NewWriteBatchRawError(index, err)) +} + +func (r *writeBatchPooledReq) addError(err *rpc.WriteBatchRawError) { + r.errs = append(r.errs, err) +} + +func (r *writeBatchPooledReq) writeBatchRawErrors() []*rpc.WriteBatchRawError { + return r.errs +} + +func (r *writeBatchPooledReq) numRetryableErrors() int { + return r.retryableErrors +} + +func (r *writeBatchPooledReq) numNonRetryableErrors() int { + return r.nonRetryableErrors +} + type writeBatchPooledReqID struct { bytes checked.Bytes id ident.ID diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index a88f71ac8e..25204d8552 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -1278,11 +1278,15 @@ func TestServiceWriteBatchRaw(t *testing.T) { {"foo", time.Now().Truncate(time.Second), 12.34}, {"bar", time.Now().Truncate(time.Second), 42.42}, } - for _, w := range values { - mockDB.EXPECT(). - Write(ctx, ident.NewIDMatcher(nsID), ident.NewIDMatcher(w.id), w.t, w.v, xtime.Second, nil). - Return(nil) - } + + writeBatch := ts.NewWriteBatch(len(values), ident.StringID(nsID), nil) + mockDB.EXPECT(). + BatchWriter(ident.NewIDMatcher(nsID), len(values)). + Return(writeBatch, nil) + + mockDB.EXPECT(). + WriteBatch(ctx, ident.NewIDMatcher(nsID), writeBatch, gomock.Any()). + Return(nil) var elements []*rpc.WriteBatchRawRequestElement for _, w := range values { @@ -1338,13 +1342,15 @@ func TestServiceWriteTaggedBatchRaw(t *testing.T) { {"foo", "a|b", time.Now().Truncate(time.Second), 12.34}, {"bar", "c|dd", time.Now().Truncate(time.Second), 42.42}, } - for _, w := range values { - mockDB.EXPECT(). - WriteTagged(ctx, ident.NewIDMatcher(nsID), ident.NewIDMatcher(w.id), - mockDecoder, - w.t, w.v, xtime.Second, nil). - Return(nil) - } + + writeBatch := ts.NewWriteBatch(len(values), ident.StringID(nsID), nil) + mockDB.EXPECT(). + BatchWriter(ident.NewIDMatcher(nsID), len(values)). + Return(writeBatch, nil) + + mockDB.EXPECT(). + WriteTaggedBatch(ctx, ident.NewIDMatcher(nsID), writeBatch, gomock.Any()). + Return(nil) var elements []*rpc.WriteTaggedBatchRawRequestElement for _, w := range values { diff --git a/src/dbnode/persist/fs/commitlog/commit_log.go b/src/dbnode/persist/fs/commitlog/commit_log.go index b9bc8a26c3..047afd3ae1 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log.go +++ b/src/dbnode/persist/fs/commitlog/commit_log.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/m3db/m3/src/dbnode/clock" @@ -52,14 +53,19 @@ type newCommitLogWriterFn func( type writeCommitLogFn func( ctx context.Context, - series Series, - datapoint ts.Datapoint, - unit xtime.Unit, - annotation ts.Annotation, + writes writeOrWriteBatch, ) error type commitLogFailFn func(err error) +// writeOrWriteBatch is a union type of write or writeBatch so that +// we can handle both cases without having to allocate as slice of size +// 1 to handle a single write. +type writeOrWriteBatch struct { + write ts.Write + writeBatch ts.WriteBatch +} + type commitLog struct { // The commitlog has two different locks that it maintains: // @@ -83,6 +89,7 @@ type commitLog struct { writes chan commitLogWrite pendingFlushFns []callbackFn + maxQueueSize int64 opts Options nowFn clock.NowFn @@ -93,6 +100,8 @@ type commitLog struct { commitLogFailFn commitLogFailFn metrics commitLogMetrics + + numWritesInQueue int64 } // Use the helper methods when interacting with this struct, the mutex @@ -127,14 +136,15 @@ type closedState struct { } type commitLogMetrics struct { - queued tally.Gauge - queueCapacity tally.Gauge - success tally.Counter - errors tally.Counter - openErrors tally.Counter - closeErrors tally.Counter - flushErrors tally.Counter - flushDone tally.Counter + numWritesInQueue tally.Gauge + queueLength tally.Gauge + queueCapacity tally.Gauge + success tally.Counter + errors tally.Counter + openErrors tally.Counter + closeErrors tally.Counter + flushErrors tally.Counter + flushDone tally.Counter } type eventType int @@ -193,12 +203,8 @@ func (r callbackResult) rotateLogsResult() (rotateLogsResult, error) { } type commitLogWrite struct { - eventType eventType - - series Series - datapoint ts.Datapoint - unit xtime.Unit - annotation ts.Annotation + eventType eventType + write writeOrWriteBatch callbackFn callbackFn } @@ -216,17 +222,19 @@ func NewCommitLog(opts Options) (CommitLog, error) { nowFn: opts.ClockOptions().NowFn(), log: iopts.Logger(), newCommitLogWriterFn: newCommitLogWriter, - writes: make(chan commitLogWrite, opts.BacklogQueueSize()), + writes: make(chan commitLogWrite, opts.BacklogQueueChannelSize()), + maxQueueSize: int64(opts.BacklogQueueSize()), closeErr: make(chan error), metrics: commitLogMetrics{ - queued: scope.Gauge("writes.queued"), - queueCapacity: scope.Gauge("writes.queue-capacity"), - success: scope.Counter("writes.success"), - errors: scope.Counter("writes.errors"), - openErrors: scope.Counter("writes.open-errors"), - closeErrors: scope.Counter("writes.close-errors"), - flushErrors: scope.Counter("writes.flush-errors"), - flushDone: scope.Counter("writes.flush-done"), + numWritesInQueue: scope.Gauge("writes.queued"), + queueLength: scope.Gauge("writes.queue-length"), + queueCapacity: scope.Gauge("writes.queue-capacity"), + success: scope.Counter("writes.success"), + errors: scope.Counter("writes.errors"), + openErrors: scope.Counter("writes.open-errors"), + closeErrors: scope.Counter("writes.close-errors"), + flushErrors: scope.Counter("writes.flush-errors"), + flushDone: scope.Counter("writes.flush-done"), }, } @@ -355,7 +363,11 @@ func (l *commitLog) flushEvery(interval time.Duration) { var sleepForOverride time.Duration for { - l.metrics.queued.Update(float64(len(l.writes))) + // The number of actual metrics / writes in the queue. + l.metrics.numWritesInQueue.Update(float64(atomic.LoadInt64(&l.numWritesInQueue))) + // The current length of the queue, different from number of writes due to each + // item in the queue could (potentially) be a batch of many writes. + l.metrics.queueLength.Update(float64(len(l.writes))) l.metrics.queueCapacity.Update(float64(cap(l.writes))) sleepFor := interval @@ -387,6 +399,12 @@ func (l *commitLog) flushEvery(interval time.Duration) { } func (l *commitLog) write() { + // We use these to make the batch and non-batched write paths the same + // by turning non-batched writes into a batch of size one while avoiding + // any allocations. + var singleBatch = make([]ts.BatchWrite, 1) + var batch []ts.BatchWrite + for write := range l.writes { if write.eventType == flushEventType { l.writerState.writer.Flush(false) @@ -443,20 +461,47 @@ func (l *commitLog) write() { } } - err := l.writerState.writer.Write(write.series, - write.datapoint, write.unit, write.annotation) + var ( + numWritesSuccess int64 + numDequeued int + ) - if err != nil { - l.metrics.errors.Inc(1) - l.log.Errorf("failed to write to commit log: %v", err) + if write.write.writeBatch == nil { + singleBatch[0].Write = write.write.write + batch = singleBatch + } else { + batch = write.write.writeBatch.Iter() + } + numDequeued = len(batch) + + for _, writeBatch := range batch { + if writeBatch.Err != nil { + // This entry was not written successfully to the in-memory datastructures so + // we should not persist it to the commitlog. This is important to maintain + // consistency and the integrity of M3DB's business logic, but also because if + // the write does not succeed to the in-memory datastructures then we don't have + // access to long-lived identifiers like the seriesID (which is pooled) so + // attempting to write would cause pooling / lifecycle issues as well. + continue + } - if l.commitLogFailFn != nil { - l.commitLogFailFn(err) + write := writeBatch.Write + err := l.writerState.writer.Write(write.Series, + write.Datapoint, write.Unit, write.Annotation) + if err != nil { + l.handleWriteErr(err) + continue } + numWritesSuccess++ + } - continue + // Return the write batch to the pool. + if write.write.writeBatch != nil { + write.write.writeBatch.Finalize() } - l.metrics.success.Inc(1) + + atomic.AddInt64(&l.numWritesInQueue, int64(-numDequeued)) + l.metrics.success.Inc(numWritesSuccess) } writer := l.writerState.writer @@ -530,20 +575,33 @@ func (l *commitLog) openWriter(now time.Time) (File, error) { func (l *commitLog) Write( ctx context.Context, - series Series, + series ts.Series, datapoint ts.Datapoint, unit xtime.Unit, annotation ts.Annotation, ) error { - return l.writeFn(ctx, series, datapoint, unit, annotation) + return l.writeFn(ctx, writeOrWriteBatch{ + write: ts.Write{ + Series: series, + Datapoint: datapoint, + Unit: unit, + Annotation: annotation, + }, + }) +} + +func (l *commitLog) WriteBatch( + ctx context.Context, + writes ts.WriteBatch, +) error { + return l.writeFn(ctx, writeOrWriteBatch{ + writeBatch: writes, + }) } func (l *commitLog) writeWait( ctx context.Context, - series Series, - datapoint ts.Datapoint, - unit xtime.Unit, - annotation ts.Annotation, + write writeOrWriteBatch, ) error { l.closedState.RLock() if l.closedState.closed { @@ -563,28 +621,34 @@ func (l *commitLog) writeWait( wg.Done() } - write := commitLogWrite{ - series: series, - datapoint: datapoint, - unit: unit, - annotation: annotation, + writeToEnqueue := commitLogWrite{ + write: write, callbackFn: completion, } - enqueued := false - - select { - case l.writes <- write: - enqueued = true - default: + numToEnqueue := int64(1) + if writeToEnqueue.write.writeBatch != nil { + numToEnqueue = int64(len(writeToEnqueue.write.writeBatch.Iter())) } - l.closedState.RUnlock() + // Optimistically increment the number of enqueued writes. + numEnqueued := atomic.AddInt64(&l.numWritesInQueue, int64(numToEnqueue)) - if !enqueued { + // If we exceeded the limit, decrement the number of enqueued writes and bail. + if numEnqueued > l.maxQueueSize { + atomic.AddInt64(&l.numWritesInQueue, int64(-numToEnqueue)) + l.closedState.RUnlock() return ErrCommitLogQueueFull } + // Otherwise submit the write. + l.writes <- commitLogWrite{ + write: write, + callbackFn: completion, + } + + l.closedState.RUnlock() + wg.Wait() return result @@ -592,10 +656,7 @@ func (l *commitLog) writeWait( func (l *commitLog) writeBehind( ctx context.Context, - series Series, - datapoint ts.Datapoint, - unit xtime.Unit, - annotation ts.Annotation, + write writeOrWriteBatch, ) error { l.closedState.RLock() if l.closedState.closed { @@ -603,27 +664,28 @@ func (l *commitLog) writeBehind( return errCommitLogClosed } - write := commitLogWrite{ - series: series, - datapoint: datapoint, - unit: unit, - annotation: annotation, + numToEnqueue := int64(1) + if write.writeBatch != nil { + numToEnqueue = int64(len(write.writeBatch.Iter())) } - enqueued := false + // Optimistically increment the number of enqueued writes. + numEnqueued := atomic.AddInt64(&l.numWritesInQueue, int64(numToEnqueue)) - select { - case l.writes <- write: - enqueued = true - default: + // If we exceeded the limit, decrement the number of enqueued writes and bail. + if numEnqueued > l.maxQueueSize { + atomic.AddInt64(&l.numWritesInQueue, int64(-numToEnqueue)) + l.closedState.RUnlock() + return ErrCommitLogQueueFull } - l.closedState.RUnlock() - - if !enqueued { - return ErrCommitLogQueueFull + // Otherwise submit the write. + l.writes <- commitLogWrite{ + write: write, } + l.closedState.RUnlock() + return nil } @@ -641,3 +703,12 @@ func (l *commitLog) Close() error { // Receive the result of closing the writer from asynchronous writer return <-l.closeErr } + +func (l *commitLog) handleWriteErr(err error) { + l.metrics.errors.Inc(1) + l.log.Errorf("failed to write to commit log: %v", err) + + if l.commitLogFailFn != nil { + l.commitLogFailFn(err) + } +} diff --git a/src/dbnode/persist/fs/commitlog/commit_log_mock.go b/src/dbnode/persist/fs/commitlog/commit_log_mock.go index 4d9d0e381d..3d3455ab64 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_mock.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_mock.go @@ -76,7 +76,7 @@ func (mr *MockCommitLogMockRecorder) Open() *gomock.Call { } // Write mocks base method -func (m *MockCommitLog) Write(ctx context.Context, series Series, datapoint ts.Datapoint, unit time0.Unit, annotation ts.Annotation) error { +func (m *MockCommitLog) Write(ctx context.Context, series ts.Series, datapoint ts.Datapoint, unit time0.Unit, annotation ts.Annotation) error { ret := m.ctrl.Call(m, "Write", ctx, series, datapoint, unit, annotation) ret0, _ := ret[0].(error) return ret0 @@ -87,6 +87,18 @@ func (mr *MockCommitLogMockRecorder) Write(ctx, series, datapoint, unit, annotat return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockCommitLog)(nil).Write), ctx, series, datapoint, unit, annotation) } +// WriteBatch mocks base method +func (m *MockCommitLog) WriteBatch(ctx context.Context, writes ts.WriteBatch) error { + ret := m.ctrl.Call(m, "WriteBatch", ctx, writes) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteBatch indicates an expected call of WriteBatch +func (mr *MockCommitLogMockRecorder) WriteBatch(ctx, writes interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteBatch", reflect.TypeOf((*MockCommitLog)(nil).WriteBatch), ctx, writes) +} + // Close mocks base method func (m *MockCommitLog) Close() error { ret := m.ctrl.Call(m, "Close") @@ -161,9 +173,9 @@ func (mr *MockIteratorMockRecorder) Next() *gomock.Call { } // Current mocks base method -func (m *MockIterator) Current() (Series, ts.Datapoint, time0.Unit, ts.Annotation) { +func (m *MockIterator) Current() (ts.Series, ts.Datapoint, time0.Unit, ts.Annotation) { ret := m.ctrl.Call(m, "Current") - ret0, _ := ret[0].(Series) + ret0, _ := ret[0].(ts.Series) ret1, _ := ret[1].(ts.Datapoint) ret2, _ := ret[2].(time0.Unit) ret3, _ := ret[3].(ts.Annotation) @@ -424,6 +436,30 @@ func (mr *MockOptionsMockRecorder) BacklogQueueSize() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BacklogQueueSize", reflect.TypeOf((*MockOptions)(nil).BacklogQueueSize)) } +// SetBacklogQueueChannelSize mocks base method +func (m *MockOptions) SetBacklogQueueChannelSize(value int) Options { + ret := m.ctrl.Call(m, "SetBacklogQueueChannelSize", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetBacklogQueueChannelSize indicates an expected call of SetBacklogQueueChannelSize +func (mr *MockOptionsMockRecorder) SetBacklogQueueChannelSize(value interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetBacklogQueueChannelSize", reflect.TypeOf((*MockOptions)(nil).SetBacklogQueueChannelSize), value) +} + +// BacklogQueueChannelSize mocks base method +func (m *MockOptions) BacklogQueueChannelSize() int { + ret := m.ctrl.Call(m, "BacklogQueueChannelSize") + ret0, _ := ret[0].(int) + return ret0 +} + +// BacklogQueueChannelSize indicates an expected call of BacklogQueueChannelSize +func (mr *MockOptionsMockRecorder) BacklogQueueChannelSize() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BacklogQueueChannelSize", reflect.TypeOf((*MockOptions)(nil).BacklogQueueChannelSize)) +} + // SetBytesPool mocks base method func (m *MockOptions) SetBytesPool(value pool.CheckedBytesPool) Options { ret := m.ctrl.Call(m, "SetBytesPool", value) diff --git a/src/dbnode/persist/fs/commitlog/commit_log_test.go b/src/dbnode/persist/fs/commitlog/commit_log_test.go index 35339ba3f3..b27e514111 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_test.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_test.go @@ -100,7 +100,7 @@ func cleanup(t *testing.T, opts Options) { } type testWrite struct { - series Series + series ts.Series t time.Time v float64 u xtime.Unit @@ -113,8 +113,8 @@ func testSeries( id string, tags ident.Tags, shard uint32, -) Series { - return Series{ +) ts.Series { + return ts.Series{ UniqueIndex: uniqueIndex, Namespace: ident.StringID("testNS"), ID: ident.StringID(id), @@ -125,7 +125,7 @@ func testSeries( func (w testWrite) assert( t *testing.T, - series Series, + series ts.Series, datapoint ts.Datapoint, unit xtime.Unit, annotation []byte, @@ -154,7 +154,7 @@ func snapshotCounterValue( type mockCommitLogWriter struct { openFn func(start time.Time, duration time.Duration) (File, error) - writeFn func(Series, ts.Datapoint, xtime.Unit, ts.Annotation) error + writeFn func(ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation) error flushFn func(sync bool) error closeFn func() error } @@ -164,7 +164,7 @@ func newMockCommitLogWriter() *mockCommitLogWriter { openFn: func(start time.Time, duration time.Duration) (File, error) { return File{}, nil }, - writeFn: func(Series, ts.Datapoint, xtime.Unit, ts.Annotation) error { + writeFn: func(ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation) error { return nil }, flushFn: func(sync bool) error { @@ -181,7 +181,7 @@ func (w *mockCommitLogWriter) Open(start time.Time, duration time.Duration) (Fil } func (w *mockCommitLogWriter) Write( - series Series, + series ts.Series, datapoint ts.Datapoint, unit xtime.Unit, annotation ts.Annotation, @@ -391,7 +391,7 @@ func TestReadCommitLogMissingMetadata(t *testing.T) { // Generate fake series, where approximately half will be missing metadata. // This works because the commitlog writer uses the bitset to determine if // the metadata for a particular series had already been written to disk. - allSeries := []Series{} + allSeries := []ts.Series{} for i := 0; i < 200; i++ { willNotHaveMetadata := !(i%2 == 0) allSeries = append(allSeries, testSeries( @@ -676,7 +676,7 @@ func TestCommitLogFailOnWriteError(t *testing.T) { commitLog := commitLogI.(*commitLog) writer := newMockCommitLogWriter() - writer.writeFn = func(Series, ts.Datapoint, xtime.Unit, ts.Annotation) error { + writer.writeFn = func(ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation) error { return fmt.Errorf("an error") } diff --git a/src/dbnode/persist/fs/commitlog/files_test.go b/src/dbnode/persist/fs/commitlog/files_test.go index ef0d94b97c..a265feb59a 100644 --- a/src/dbnode/persist/fs/commitlog/files_test.go +++ b/src/dbnode/persist/fs/commitlog/files_test.go @@ -103,7 +103,7 @@ func createTestCommitLogFiles( commitLog, err := NewCommitLog(opts) require.NoError(t, err) require.NoError(t, commitLog.Open()) - series := Series{ + series := ts.Series{ UniqueIndex: 0, Namespace: ident.StringID("some-namespace"), ID: ident.StringID("some-id"), diff --git a/src/dbnode/persist/fs/commitlog/iterator.go b/src/dbnode/persist/fs/commitlog/iterator.go index 606dcfa01d..eaa3d904e5 100644 --- a/src/dbnode/persist/fs/commitlog/iterator.go +++ b/src/dbnode/persist/fs/commitlog/iterator.go @@ -56,7 +56,7 @@ type iterator struct { } type iteratorRead struct { - series Series + series ts.Series datapoint ts.Datapoint unit xtime.Unit annotation []byte @@ -127,7 +127,7 @@ func (i *iterator) Next() bool { return true } -func (i *iterator) Current() (Series, ts.Datapoint, xtime.Unit, ts.Annotation) { +func (i *iterator) Current() (ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation) { read := i.read if i.hasError() || i.closed || !i.setRead { read = iteratorRead{} diff --git a/src/dbnode/persist/fs/commitlog/options.go b/src/dbnode/persist/fs/commitlog/options.go index eac9c378d1..307198e4e7 100644 --- a/src/dbnode/persist/fs/commitlog/options.go +++ b/src/dbnode/persist/fs/commitlog/options.go @@ -22,6 +22,7 @@ package commitlog import ( "errors" + "fmt" "runtime" "time" @@ -47,44 +48,54 @@ const ( // defaultReadConcurrency is the default read concurrency defaultReadConcurrency = 4 + + // MaximumQueueSizeQueueChannelSizeRatio is the maximum ratio between the + // backlog queue size and backlog queue channel size. + MaximumQueueSizeQueueChannelSizeRatio = 8.0 ) var ( - // defaultBacklogQueueSize is the default commit log backlog queue size + // defaultBacklogQueueSize is the default commit log backlog queue size. defaultBacklogQueueSize = 1024 * runtime.NumCPU() + + // defaultBacklogQueueChannelSize is the default commit log backlog queue channel size. + defaultBacklogQueueChannelSize = int(float64(defaultBacklogQueueSize) / MaximumQueueSizeQueueChannelSizeRatio) ) var ( errFlushIntervalNonNegative = errors.New("flush interval must be non-negative") errBlockSizePositive = errors.New("block size must be a positive duration") errReadConcurrencyPositive = errors.New("read concurrency must be a positive integer") + errBacklogQueueChannelSize = errors.New("read concurrency must be a positive integer") ) type options struct { - clockOpts clock.Options - instrumentOpts instrument.Options - blockSize time.Duration - fsOpts fs.Options - strategy Strategy - flushSize int - flushInterval time.Duration - backlogQueueSize int - bytesPool pool.CheckedBytesPool - identPool ident.Pool - readConcurrency int + clockOpts clock.Options + instrumentOpts instrument.Options + blockSize time.Duration + fsOpts fs.Options + strategy Strategy + flushSize int + flushInterval time.Duration + backlogQueueSize int + backlogQueueChannelSize int + bytesPool pool.CheckedBytesPool + identPool ident.Pool + readConcurrency int } // NewOptions creates new commit log options func NewOptions() Options { o := &options{ - clockOpts: clock.NewOptions(), - instrumentOpts: instrument.NewOptions(), - blockSize: defaultBlockSize, - fsOpts: fs.NewOptions(), - strategy: defaultStrategy, - flushSize: defaultFlushSize, - flushInterval: defaultFlushInterval, - backlogQueueSize: defaultBacklogQueueSize, + clockOpts: clock.NewOptions(), + instrumentOpts: instrument.NewOptions(), + blockSize: defaultBlockSize, + fsOpts: fs.NewOptions(), + strategy: defaultStrategy, + flushSize: defaultFlushSize, + flushInterval: defaultFlushInterval, + backlogQueueSize: defaultBacklogQueueSize, + backlogQueueChannelSize: defaultBacklogQueueChannelSize, bytesPool: pool.NewCheckedBytesPool(nil, nil, func(s []pool.Bucket) pool.BytesPool { return pool.NewBytesPool(s, nil) }), @@ -99,12 +110,21 @@ func (o *options) Validate() error { if o.FlushInterval() < 0 { return errFlushIntervalNonNegative } + if o.BlockSize() <= 0 { return errBlockSizePositive } + if o.ReadConcurrency() <= 0 { return errReadConcurrencyPositive } + + if float64(o.BacklogQueueSize())/float64(o.BacklogQueueChannelSize()) > MaximumQueueSizeQueueChannelSizeRatio { + return fmt.Errorf( + "BacklogQueueSize / BacklogQueueChannelSize ratio must be at least: %f, but was: %f", + MaximumQueueSizeQueueChannelSizeRatio, float64(o.BacklogQueueSize())/float64(o.BacklogQueueChannelSize())) + } + return nil } @@ -188,6 +208,16 @@ func (o *options) BacklogQueueSize() int { return o.backlogQueueSize } +func (o *options) SetBacklogQueueChannelSize(value int) Options { + opts := *o + opts.backlogQueueChannelSize = value + return &opts +} + +func (o *options) BacklogQueueChannelSize() int { + return o.backlogQueueChannelSize +} + func (o *options) SetBytesPool(value pool.CheckedBytesPool) Options { opts := *o opts.bytesPool = value diff --git a/src/dbnode/persist/fs/commitlog/read_write_prop_test.go b/src/dbnode/persist/fs/commitlog/read_write_prop_test.go index 0199bf5b8c..f12ff43725 100644 --- a/src/dbnode/persist/fs/commitlog/read_write_prop_test.go +++ b/src/dbnode/persist/fs/commitlog/read_write_prop_test.go @@ -543,7 +543,7 @@ func (s *clState) writesArePresent(writes ...generatedWrite) error { } type generatedWrite struct { - series Series + series ts.Series datapoint ts.Datapoint unit xtime.Unit annotation ts.Annotation @@ -569,7 +569,7 @@ func genWrite() gopter.Gen { shard := val[4].(uint32) return generatedWrite{ - series: Series{ + series: ts.Series{ ID: ident.StringID(id), Namespace: ident.StringID(ns), Shard: shard, diff --git a/src/dbnode/persist/fs/commitlog/reader.go b/src/dbnode/persist/fs/commitlog/reader.go index 2084520c8f..6858f4e4ab 100644 --- a/src/dbnode/persist/fs/commitlog/reader.go +++ b/src/dbnode/persist/fs/commitlog/reader.go @@ -63,7 +63,7 @@ func ReadAllSeriesPredicate() SeriesFilterPredicate { } type seriesMetadata struct { - Series + ts.Series passedPredicate bool } @@ -72,14 +72,14 @@ type commitLogReader interface { Open(filePath string) (time.Time, time.Duration, int64, error) // Read returns the next id and data pair or error, will return io.EOF at end of volume - Read() (Series, ts.Datapoint, xtime.Unit, ts.Annotation, error) + Read() (ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation, error) // Close the reader Close() error } type readResponse struct { - series Series + series ts.Series datapoint ts.Datapoint unit xtime.Unit annotation ts.Annotation @@ -191,7 +191,7 @@ func (r *reader) Open(filePath string) (time.Time, time.Duration, int64, error) // Then the caller is guaranteed to receive A1 before A2 and A2 before A3, and they are guaranteed // to see B1 before B2, but they may see B1 before A1 and D2 before B3. func (r *reader) Read() ( - series Series, + series ts.Series, datapoint ts.Datapoint, unit xtime.Unit, annotation ts.Annotation, @@ -200,12 +200,12 @@ func (r *reader) Read() ( if r.nextIndex == 0 { err := r.startBackgroundWorkers() if err != nil { - return Series{}, ts.Datapoint{}, xtime.Unit(0), ts.Annotation(nil), err + return ts.Series{}, ts.Datapoint{}, xtime.Unit(0), ts.Annotation(nil), err } } rr, ok := <-r.outChan if !ok { - return Series{}, ts.Datapoint{}, xtime.Unit(0), ts.Annotation(nil), io.EOF + return ts.Series{}, ts.Datapoint{}, xtime.Unit(0), ts.Annotation(nil), io.EOF } r.nextIndex++ return rr.series, rr.datapoint, rr.unit, rr.annotation, rr.resultErr @@ -444,7 +444,7 @@ func (r *reader) decodeAndHandleMetadata( } } - metadata := Series{ + metadata := ts.Series{ UniqueIndex: entry.Index, ID: ident.BinaryID(id), Namespace: ident.BinaryID(namespace), diff --git a/src/dbnode/persist/fs/commitlog/types.go b/src/dbnode/persist/fs/commitlog/types.go index 58b081aaa7..77467860a7 100644 --- a/src/dbnode/persist/fs/commitlog/types.go +++ b/src/dbnode/persist/fs/commitlog/types.go @@ -56,12 +56,18 @@ type CommitLog interface { // Write will write an entry in the commit log for a given series Write( ctx context.Context, - series Series, + series ts.Series, datapoint ts.Datapoint, unit xtime.Unit, annotation ts.Annotation, ) error + // WriteBatch is the same as Write, but in batch. + WriteBatch( + ctx context.Context, + writes ts.WriteBatch, + ) error + // Close the commit log Close() error @@ -79,7 +85,7 @@ type Iterator interface { Next() bool // Current returns the current commit log entry - Current() (Series, ts.Datapoint, xtime.Unit, ts.Annotation) + Current() (ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation) // Err returns an error if an error occurred Err() error @@ -95,87 +101,77 @@ type IteratorOpts struct { SeriesFilterPredicate SeriesFilterPredicate } -// Series describes a series in the commit log -type Series struct { - // UniqueIndex is the unique index assigned to this series - UniqueIndex uint64 - - // Namespace is the namespace the series belongs to - Namespace ident.ID - - // ID is the series identifier - ID ident.ID - - // Tags are the series tags - Tags ident.Tags // FOLLOWUP(prateek): wire Tags to commit log writer - - // Shard is the shard the series belongs to - Shard uint32 -} - -// Options represents the options for the commit log +// Options represents the options for the commit log. type Options interface { - // Validate validates the Options + // Validate validates the Options. Validate() error - // SetClockOptions sets the clock options + // SetClockOptions sets the clock options. SetClockOptions(value clock.Options) Options - // ClockOptions returns the clock options + // ClockOptions returns the clock options. ClockOptions() clock.Options - // SetInstrumentOptions sets the instrumentation options + // SetInstrumentOptions sets the instrumentation options. SetInstrumentOptions(value instrument.Options) Options - // InstrumentOptions returns the instrumentation options + // InstrumentOptions returns the instrumentation options, InstrumentOptions() instrument.Options - // SetBlockSize sets the block size + // SetBlockSize sets the block size. SetBlockSize(value time.Duration) Options - // BlockSize returns the block size + // BlockSize returns the block size. BlockSize() time.Duration - // SetFilesystemOptions sets the filesystem options + // SetFilesystemOptions sets the filesystem options. SetFilesystemOptions(value fs.Options) Options - // FilesystemOptions returns the filesystem options + // FilesystemOptions returns the filesystem options. FilesystemOptions() fs.Options - // SetFlushSize sets the flush size + // SetFlushSize sets the flush size. SetFlushSize(value int) Options - // FlushSize returns the flush size + // FlushSize returns the flush size. FlushSize() int - // SetStrategy sets the strategy + // SetStrategy sets the strategy. SetStrategy(value Strategy) Options - // Strategy returns the strategy + // Strategy returns the strategy. Strategy() Strategy - // SetFlushInterval sets the flush interval + // SetFlushInterval sets the flush interval. SetFlushInterval(value time.Duration) Options - // FlushInterval returns the flush interval + // FlushInterval returns the flush interval. FlushInterval() time.Duration - // SetBacklogQueueSize sets the backlog queue size + // SetBacklogQueueSize sets the backlog queue size. SetBacklogQueueSize(value int) Options - // BacklogQueueSize returns the backlog queue size + // BacklogQueueSize returns the backlog queue size. BacklogQueueSize() int - // SetBytesPool sets the checked bytes pool + // SetBacklogQueueChannelSize sets the size of the Golang channel + // that backs the queue. + SetBacklogQueueChannelSize(value int) Options + + // BacklogQueueChannelSize returns the size of the Golang channel + // that backs the queue. + BacklogQueueChannelSize() int + + // SetBytesPool sets the checked bytes pool. SetBytesPool(value pool.CheckedBytesPool) Options - // BytesPool returns the checked bytes pool + // BytesPool returns the checked bytes pool. BytesPool() pool.CheckedBytesPool - // SetReadConcurrency sets the concurrency of the reader + // SetReadConcurrency sets the concurrency of the reader. SetReadConcurrency(concurrency int) Options - // ReadConcurrency returns the concurrency of the reader + // ReadConcurrency returns the concurrency of the reader. ReadConcurrency() int // SetIdentifierPool sets the IdentifierPool to use for pooling identifiers. @@ -186,7 +182,7 @@ type Options interface { } // FileFilterPredicate is a predicate that allows the caller to determine -// which commitlogs the iterator should read from +// which commitlogs the iterator should read from. type FileFilterPredicate func(f File) bool // SeriesFilterPredicate is a predicate that determines whether datapoints for a given series diff --git a/src/dbnode/persist/fs/commitlog/writer.go b/src/dbnode/persist/fs/commitlog/writer.go index bb39dfdfc2..24fefab446 100644 --- a/src/dbnode/persist/fs/commitlog/writer.go +++ b/src/dbnode/persist/fs/commitlog/writer.go @@ -71,7 +71,7 @@ type commitLogWriter interface { // Write will write an entry in the commit log for a given series Write( - series Series, + series ts.Series, datapoint ts.Datapoint, unit xtime.Unit, annotation ts.Annotation, @@ -198,7 +198,7 @@ func (w *writer) isOpen() bool { } func (w *writer) Write( - series Series, + series ts.Series, datapoint ts.Datapoint, unit xtime.Unit, annotation ts.Annotation, diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 530dbaca4b..e331de0b29 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -322,6 +322,22 @@ func Run(runOpts RunOptions) { cfg.CommitLog.Queue.CalculationType) } + var commitLogQueueChannelSize int + if cfg.CommitLog.QueueChannel != nil { + specified := cfg.CommitLog.QueueChannel.Size + switch cfg.CommitLog.Queue.CalculationType { + case config.CalculationTypeFixed: + commitLogQueueChannelSize = specified + case config.CalculationTypePerCPU: + commitLogQueueChannelSize = specified * runtime.NumCPU() + default: + logger.Fatalf("unknown commit log queue channel size type: %v", + cfg.CommitLog.Queue.CalculationType) + } + } else { + commitLogQueueChannelSize = int(float64(commitLogQueueSize) / commitlog.MaximumQueueSizeQueueChannelSizeRatio) + } + opts = opts.SetCommitLogOptions(opts.CommitLogOptions(). SetInstrumentOptions(opts.InstrumentOptions()). SetFilesystemOptions(fsopts). @@ -329,6 +345,7 @@ func Run(runOpts RunOptions) { SetFlushSize(cfg.CommitLog.FlushMaxBytes). SetFlushInterval(cfg.CommitLog.FlushEvery). SetBacklogQueueSize(commitLogQueueSize). + SetBacklogQueueChannelSize(commitLogQueueChannelSize). SetBlockSize(cfg.CommitLog.BlockSize)) // Set the series cache policy @@ -938,6 +955,19 @@ func withEncodingAndPoolingOptions( multiIteratorPool := encoding.NewMultiReaderIteratorPool( poolOptions(policy.IteratorPool, scope.SubScope("multi-iterator-pool"))) + var writeBatchPoolInitialBatchSize *int + if policy.WriteBatchPool.InitialBatchSize != nil { + writeBatchPoolInitialBatchSize = policy.WriteBatchPool.InitialBatchSize + } + var writeBatchPoolMaxBatchSize *int + if policy.WriteBatchPool.MaxBatchSize != nil { + writeBatchPoolMaxBatchSize = policy.WriteBatchPool.MaxBatchSize + } + writeBatchPool := ts.NewWriteBatchPool( + poolOptions(policy.WriteBatchPool.Pool, scope.SubScope("write-batch-pool")), + writeBatchPoolInitialBatchSize, + writeBatchPoolMaxBatchSize) + identifierPool := ident.NewPool(bytesPool, ident.PoolOptions{ IDPoolOptions: poolOptions(policy.IdentifierPool, scope.SubScope("identifier-pool")), TagsPoolOptions: maxCapacityPoolOptions(policy.TagsPool, scope.SubScope("tags-pool")), @@ -976,6 +1006,8 @@ func withEncodingAndPoolingOptions( return iter }) + writeBatchPool.Init() + opts = opts. SetBytesPool(bytesPool). SetContextPool(contextPool). @@ -984,7 +1016,8 @@ func withEncodingAndPoolingOptions( SetMultiReaderIteratorPool(multiIteratorPool). SetIdentifierPool(identifierPool). SetFetchBlockMetadataResultsPool(fetchBlockMetadataResultsPool). - SetFetchBlocksMetadataResultsPool(fetchBlocksMetadataResultsPool) + SetFetchBlocksMetadataResultsPool(fetchBlocksMetadataResultsPool). + SetWriteBatchPool(writeBatchPool) blockOpts := opts.DatabaseBlockOptions(). SetDatabaseBlockAllocSize(policy.BlockAllocSize). diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 9ccc48f7c0..30f25e7e43 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -931,7 +931,7 @@ func (s *commitLogSource) startM3TSZEncodingWorker( func (s *commitLogSource) shouldEncodeForData( unmerged []shardData, dataBlockSize time.Duration, - series commitlog.Series, + series ts.Series, timestamp time.Time, ) bool { // Check if the shard number is higher the amount of space we pre-allocated. @@ -1642,7 +1642,7 @@ type metadataAndEncodersByTime struct { // encoderArg contains all the information a worker go-routine needs to encode // a data point as M3TSZ type encoderArg struct { - series commitlog.Series + series ts.Series dp ts.Datapoint unit xtime.Unit annotation ts.Annotation diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go index 8947cc8e1d..0ee22afd1b 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go @@ -149,9 +149,9 @@ func TestReadOrderedValues(t *testing.T) { End: end, }) - foo := commitlog.Series{Namespace: testNamespaceID, Shard: 0, ID: ident.StringID("foo")} - bar := commitlog.Series{Namespace: testNamespaceID, Shard: 1, ID: ident.StringID("bar")} - baz := commitlog.Series{Namespace: testNamespaceID, Shard: 2, ID: ident.StringID("baz")} + foo := ts.Series{Namespace: testNamespaceID, Shard: 0, ID: ident.StringID("foo")} + bar := ts.Series{Namespace: testNamespaceID, Shard: 1, ID: ident.StringID("bar")} + baz := ts.Series{Namespace: testNamespaceID, Shard: 2, ID: ident.StringID("baz")} values := []testValue{ {foo, start, 1.0, xtime.Second, nil}, @@ -194,7 +194,7 @@ func TestReadUnorderedValues(t *testing.T) { End: end, }) - foo := commitlog.Series{Namespace: testNamespaceID, Shard: 0, ID: ident.StringID("foo")} + foo := ts.Series{Namespace: testNamespaceID, Shard: 0, ID: ident.StringID("foo")} values := []testValue{ {foo, start.Add(10 * time.Minute), 1.0, xtime.Second, nil}, @@ -240,9 +240,9 @@ func TestReadHandlesDifferentSeriesWithIdenticalUniqueIndex(t *testing.T) { }) // All series need to be in the same shard to exercise the regression. - foo := commitlog.Series{ + foo := ts.Series{ Namespace: testNamespaceID, Shard: 0, ID: ident.StringID("foo"), UniqueIndex: 0} - bar := commitlog.Series{ + bar := ts.Series{ Namespace: testNamespaceID, Shard: 0, ID: ident.StringID("bar"), UniqueIndex: 0} values := []testValue{ @@ -282,7 +282,7 @@ func TestReadTrimsToRanges(t *testing.T) { End: end, }) - foo := commitlog.Series{Namespace: testNamespaceID, Shard: 0, ID: ident.StringID("foo")} + foo := ts.Series{Namespace: testNamespaceID, Shard: 0, ID: ident.StringID("foo")} values := []testValue{ {foo, start.Add(-1 * time.Minute), 1.0, xtime.Nanosecond, nil}, @@ -318,7 +318,7 @@ func TestItMergesSnapshotsAndCommitLogs(t *testing.T) { end = now.Truncate(blockSize) ranges = xtime.Ranges{} - foo = commitlog.Series{Namespace: testNamespaceID, Shard: 0, ID: ident.StringID("foo")} + foo = ts.Series{Namespace: testNamespaceID, Shard: 0, ID: ident.StringID("foo")} commitLogValues = []testValue{ {foo, start.Add(2 * time.Minute), 1.0, xtime.Nanosecond, nil}, {foo, start.Add(3 * time.Minute), 2.0, xtime.Nanosecond, nil}, @@ -413,7 +413,7 @@ func TestItMergesSnapshotsAndCommitLogs(t *testing.T) { } type testValue struct { - s commitlog.Series + s ts.Series t time.Time v float64 u xtime.Unit @@ -698,7 +698,7 @@ func (i *testCommitLogIterator) Next() bool { return i.idx < len(i.values) } -func (i *testCommitLogIterator) Current() (commitlog.Series, ts.Datapoint, xtime.Unit, ts.Annotation) { +func (i *testCommitLogIterator) Current() (ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation) { idx := i.idx if idx == -1 { idx = 0 diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_index_test.go index 0e712487cc..18d96d56e0 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_index_test.go @@ -29,6 +29,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/namespace" + "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3x/ident" xtime "github.com/m3db/m3x/time" @@ -81,20 +82,20 @@ func testBootstrapIndex(t *testing.T, bootstrapDataFirst bool) { bazTags := ident.NewTags(ident.StringTag("city", "oakland")) shardn := func(n int) uint32 { return uint32(n) } - foo := commitlog.Series{UniqueIndex: 0, Namespace: testNamespaceID, Shard: shardn(0), ID: ident.StringID("foo"), Tags: fooTags} - bar := commitlog.Series{UniqueIndex: 1, Namespace: testNamespaceID, Shard: shardn(0), ID: ident.StringID("bar"), Tags: barTags} - baz := commitlog.Series{UniqueIndex: 2, Namespace: testNamespaceID, Shard: shardn(5), ID: ident.StringID("baz"), Tags: bazTags} + foo := ts.Series{UniqueIndex: 0, Namespace: testNamespaceID, Shard: shardn(0), ID: ident.StringID("foo"), Tags: fooTags} + bar := ts.Series{UniqueIndex: 1, Namespace: testNamespaceID, Shard: shardn(0), ID: ident.StringID("bar"), Tags: barTags} + baz := ts.Series{UniqueIndex: 2, Namespace: testNamespaceID, Shard: shardn(5), ID: ident.StringID("baz"), Tags: bazTags} // Make sure we can handle series that don't have tags. - untagged := commitlog.Series{UniqueIndex: 3, Namespace: testNamespaceID, Shard: shardn(5), ID: ident.StringID("untagged"), Tags: ident.Tags{}} + untagged := ts.Series{UniqueIndex: 3, Namespace: testNamespaceID, Shard: shardn(5), ID: ident.StringID("untagged"), Tags: ident.Tags{}} // Make sure we skip series that are not within the bootstrap range. - outOfRange := commitlog.Series{UniqueIndex: 4, Namespace: testNamespaceID, Shard: shardn(3), ID: ident.StringID("outOfRange"), Tags: ident.Tags{}} + outOfRange := ts.Series{UniqueIndex: 4, Namespace: testNamespaceID, Shard: shardn(3), ID: ident.StringID("outOfRange"), Tags: ident.Tags{}} // Make sure we skip and dont panic on writes for shards that are higher than the maximum we're trying to bootstrap. - shardTooHigh := commitlog.Series{UniqueIndex: 5, Namespace: testNamespaceID, Shard: shardn(100), ID: ident.StringID("shardTooHigh"), Tags: ident.Tags{}} + shardTooHigh := ts.Series{UniqueIndex: 5, Namespace: testNamespaceID, Shard: shardn(100), ID: ident.StringID("shardTooHigh"), Tags: ident.Tags{}} // Make sure we skip series for shards that have no requested bootstrap ranges. The shard for this write needs // to be less than the highest shard we actually plan to bootstrap. - noShardBootstrapRange := commitlog.Series{UniqueIndex: 6, Namespace: testNamespaceID, Shard: shardn(4), ID: ident.StringID("noShardBootstrapRange"), Tags: ident.Tags{}} + noShardBootstrapRange := ts.Series{UniqueIndex: 6, Namespace: testNamespaceID, Shard: shardn(4), ID: ident.StringID("noShardBootstrapRange"), Tags: ident.Tags{}} // Make sure it handles multiple namespaces - someOtherNamespace := commitlog.Series{UniqueIndex: 7, Namespace: testNamespaceID2, Shard: shardn(0), ID: ident.StringID("someOtherNamespace"), Tags: ident.Tags{}} + someOtherNamespace := ts.Series{UniqueIndex: 7, Namespace: testNamespaceID2, Shard: shardn(0), ID: ident.StringID("someOtherNamespace"), Tags: ident.Tags{}} seriesNotToExpect := map[string]struct{}{ outOfRange.ID.String(): struct{}{}, diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go index 63a02c33de..31a561dfe4 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go @@ -470,7 +470,7 @@ type generatedWrite struct { // arrivedAt is used to simulate out-of-order writes which arrive somewhere // between time.Now().Add(-bufferFuture) and time.Now().Add(bufferPast). arrivedAt time.Time - series commitlog.Series + series ts.Series datapoint ts.Datapoint unit xtime.Unit annotation ts.Annotation @@ -591,7 +591,7 @@ func genWrite(start time.Time, bufferPast, bufferFuture time.Duration, ns string return generatedWrite{ arrivedAt: a, - series: commitlog.Series{ + series: ts.Series{ ID: ident.StringID(id), Tags: seriesUniqueTags(id, tagKey, tagVal, includeTags), Namespace: ident.StringID(ns), diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 4e4d1647c8..fda8514ea5 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/namespace" + "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xcounter" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3x/context" @@ -46,17 +47,21 @@ import ( ) var ( - // errDatabaseAlreadyOpen raised when trying to open a database that is already open + // errDatabaseAlreadyOpen raised when trying to open a database that is already open. errDatabaseAlreadyOpen = errors.New("database is already open") - // errDatabaseNotOpen raised when trying to close a database that is not open + // errDatabaseNotOpen raised when trying to close a database that is not open. errDatabaseNotOpen = errors.New("database is not open") - // errDatabaseAlreadyClosed raised when trying to open a database that is already closed + // errDatabaseAlreadyClosed raised when trying to open a database that is already closed. errDatabaseAlreadyClosed = errors.New("database is already closed") - // errDatabaseIsClosed raised when trying to perform an action that requires an open database + // errDatabaseIsClosed raised when trying to perform an action that requires an open database. errDatabaseIsClosed = errors.New("database is closed") + + // errWriterDoesNotImplementWriteBatch is raised when the provided ts.BatchWriter does not implement + // ts.WriteBatch. + errWriterDoesNotImplementWriteBatch = errors.New("provided writer does not implement ts.WriteBatch") ) type databaseState int @@ -95,12 +100,17 @@ type db struct { errors xcounter.FrequencyCounter errWindow time.Duration errThreshold int64 + + writeBatchPool *ts.WriteBatchPool } type databaseMetrics struct { unknownNamespaceRead tally.Counter unknownNamespaceWrite tally.Counter unknownNamespaceWriteTagged tally.Counter + unknownNamespaceBatchWriter tally.Counter + unknownNamespaceWriteBatch tally.Counter + unknownNamespaceWriteTaggedBatch tally.Counter unknownNamespaceFetchBlocks tally.Counter unknownNamespaceFetchBlocksMetadata tally.Counter unknownNamespaceQueryIDs tally.Counter @@ -115,6 +125,9 @@ func newDatabaseMetrics(scope tally.Scope) databaseMetrics { unknownNamespaceRead: unknownNamespaceScope.Counter("read"), unknownNamespaceWrite: unknownNamespaceScope.Counter("write"), unknownNamespaceWriteTagged: unknownNamespaceScope.Counter("write-tagged"), + unknownNamespaceBatchWriter: unknownNamespaceScope.Counter("batch-writer"), + unknownNamespaceWriteBatch: unknownNamespaceScope.Counter("write-batch"), + unknownNamespaceWriteTaggedBatch: unknownNamespaceScope.Counter("write-tagged-batch"), unknownNamespaceFetchBlocks: unknownNamespaceScope.Counter("fetch-blocks"), unknownNamespaceFetchBlocksMetadata: unknownNamespaceScope.Counter("fetch-blocks-metadata"), unknownNamespaceQueryIDs: unknownNamespaceScope.Counter("query-ids"), @@ -123,7 +136,7 @@ func newDatabaseMetrics(scope tally.Scope) databaseMetrics { } } -// NewDatabase creates a new time series database +// NewDatabase creates a new time series database. func NewDatabase( shardSet sharding.ShardSet, opts Options, @@ -145,17 +158,18 @@ func NewDatabase( logger := iopts.Logger() d := &db{ - opts: opts, - nowFn: opts.ClockOptions().NowFn(), - shardSet: shardSet, - namespaces: newDatabaseNamespacesMap(databaseNamespacesMapOptions{}), - commitLog: commitLog, - scope: scope, - metrics: newDatabaseMetrics(scope), - log: logger, - errors: xcounter.NewFrequencyCounter(opts.ErrorCounterOptions()), - errWindow: opts.ErrorWindowForLoad(), - errThreshold: opts.ErrorThresholdForLoad(), + opts: opts, + nowFn: opts.ClockOptions().NowFn(), + shardSet: shardSet, + namespaces: newDatabaseNamespacesMap(databaseNamespacesMapOptions{}), + commitLog: commitLog, + scope: scope, + metrics: newDatabaseMetrics(scope), + log: logger, + errors: xcounter.NewFrequencyCounter(opts.ErrorCounterOptions()), + errWindow: opts.ErrorWindowForLoad(), + errThreshold: opts.ErrorThresholdForLoad(), + writeBatchPool: opts.WriteBatchPool(), } databaseIOpts := iopts.SetMetricsScope(scope) @@ -490,11 +504,16 @@ func (d *db) Write( return err } - err = n.Write(ctx, id, timestamp, value, unit, annotation) + series, err := n.Write(ctx, id, timestamp, value, unit, annotation) if err == commitlog.ErrCommitLogQueueFull { d.errors.Record(1) } - return err + if err != nil { + return err + } + + dp := ts.Datapoint{Timestamp: timestamp, Value: value} + return d.commitLog.Write(ctx, series, dp, unit, annotation) } func (d *db) WriteTagged( @@ -513,11 +532,119 @@ func (d *db) WriteTagged( return err } - err = n.WriteTagged(ctx, id, tags, timestamp, value, unit, annotation) + series, err := n.WriteTagged(ctx, id, tags, timestamp, value, unit, annotation) if err == commitlog.ErrCommitLogQueueFull { d.errors.Record(1) } - return err + if err != nil { + return err + } + + dp := ts.Datapoint{Timestamp: timestamp, Value: value} + return d.commitLog.Write(ctx, series, dp, unit, annotation) +} + +func (d *db) BatchWriter(namespace ident.ID, batchSize int) (ts.BatchWriter, error) { + n, err := d.namespaceFor(namespace) + if err != nil { + d.metrics.unknownNamespaceBatchWriter.Inc(1) + return nil, err + } + + var ( + nsID = n.ID() + batchWriter = d.writeBatchPool.Get() + ) + batchWriter.Reset(batchSize, nsID) + return batchWriter, nil +} + +func (d *db) WriteBatch( + ctx context.Context, + namespace ident.ID, + writer ts.BatchWriter, + errHandler IndexedErrorHandler, +) error { + return d.writeBatch(ctx, namespace, writer, errHandler, false) +} + +func (d *db) WriteTaggedBatch( + ctx context.Context, + namespace ident.ID, + writer ts.BatchWriter, + errHandler IndexedErrorHandler, +) error { + return d.writeBatch(ctx, namespace, writer, errHandler, true) +} + +func (d *db) writeBatch( + ctx context.Context, + namespace ident.ID, + writer ts.BatchWriter, + errHandler IndexedErrorHandler, + tagged bool, +) error { + writes, ok := writer.(ts.WriteBatch) + if !ok { + return errWriterDoesNotImplementWriteBatch + } + + n, err := d.namespaceFor(namespace) + if err != nil { + if tagged { + d.metrics.unknownNamespaceWriteTaggedBatch.Inc(1) + } else { + d.metrics.unknownNamespaceWriteBatch.Inc(1) + } + return err + } + + iter := writes.Iter() + for i, write := range iter { + var ( + series ts.Series + err error + ) + + if tagged { + series, err = n.WriteTagged( + ctx, + write.Write.Series.ID, + write.TagIter, + write.Write.Datapoint.Timestamp, + write.Write.Datapoint.Value, + write.Write.Unit, + write.Write.Annotation, + ) + } else { + series, err = n.Write( + ctx, + write.Write.Series.ID, + write.Write.Datapoint.Timestamp, + write.Write.Datapoint.Value, + write.Write.Unit, + write.Write.Annotation, + ) + } + + if err == commitlog.ErrCommitLogQueueFull { + d.errors.Record(1) + } + if err != nil { + // Return errors with the original index provided by the caller so they + // can associate the error with the write that caused it. + errHandler.HandleError(write.OriginalIndex, err) + } + + // Need to set the outcome in the success case so the commitlog gets the updated + // series object which contains identifiers (like the series ID) whose lifecycle + // live longer than the span of this request, making them safe for use by the async + // commitlog. Need to set the outcome in the error case so that the commitlog knows + // to skip this entry. + writes.SetOutcome(i, series, err) + } + + return d.commitLog.WriteBatch(ctx, writes) } func (d *db) QueryIDs( diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index 45868f230c..bc7136d46a 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -21,6 +21,7 @@ package storage import ( + "errors" "fmt" "sort" "sync" @@ -38,6 +39,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" + "github.com/m3db/m3/src/dbnode/ts" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" xclock "github.com/m3db/m3x/clock" "github.com/m3db/m3x/context" @@ -610,16 +612,26 @@ func TestDatabaseNamespaceIndexFunctions(t *testing.T) { ns.EXPECT().BootstrapState().Return(ShardBootstrapStates{}).AnyTimes() require.NoError(t, d.Open()) - ctx := context.NewContext() + var ( + namespace = ident.StringID("testns") + ctx = context.NewContext() + id = ident.StringID("foo") + tagsIter = ident.EmptyTagIterator + series = ts.Series{ + ID: id, + Tags: ident.Tags{}, + Namespace: namespace, + } + ) ns.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("foo"), gomock.Any(), - time.Time{}, 1.0, xtime.Second, nil).Return(nil) - require.NoError(t, d.WriteTagged(ctx, ident.StringID("testns"), - ident.StringID("foo"), ident.EmptyTagIterator, time.Time{}, + time.Time{}, 1.0, xtime.Second, nil).Return(series, nil) + require.NoError(t, d.WriteTagged(ctx, namespace, + id, tagsIter, time.Time{}, 1.0, xtime.Second, nil)) ns.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("foo"), gomock.Any(), - time.Time{}, 1.0, xtime.Second, nil).Return(fmt.Errorf("random err")) - require.Error(t, d.WriteTagged(ctx, ident.StringID("testns"), + time.Time{}, 1.0, xtime.Second, nil).Return(series, fmt.Errorf("random err")) + require.Error(t, d.WriteTagged(ctx, namespace, ident.StringID("foo"), ident.EmptyTagIterator, time.Time{}, 1.0, xtime.Second, nil)) @@ -642,6 +654,172 @@ func TestDatabaseNamespaceIndexFunctions(t *testing.T) { require.NoError(t, d.Close()) } +func TestDatabaseWriteBatchNoNamespace(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + d, mapCh, _ := newTestDatabase(t, ctrl, BootstrapNotStarted) + defer func() { + close(mapCh) + }() + require.NoError(t, d.Open()) + + var ( + notExistNamespace = ident.StringID("not-exist-namespace") + batchSize = 100 + ) + _, err := d.BatchWriter(notExistNamespace, batchSize) + require.Error(t, err) + + err = d.WriteBatch(nil, notExistNamespace, nil, nil) + require.Error(t, err) + + require.NoError(t, d.Close()) +} + +func TestDatabaseWriteTaggedBatchNoNamespace(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + d, mapCh, _ := newTestDatabase(t, ctrl, BootstrapNotStarted) + defer func() { + close(mapCh) + }() + require.NoError(t, d.Open()) + + var ( + notExistNamespace = ident.StringID("not-exist-namespace") + batchSize = 100 + ) + _, err := d.BatchWriter(notExistNamespace, batchSize) + require.Error(t, err) + + err = d.WriteTaggedBatch(nil, notExistNamespace, nil, nil) + require.Error(t, err) + + require.NoError(t, d.Close()) +} + +func TestDatabaseWriteBatch(t *testing.T) { + testDatabaseWriteBatch(t, false) +} + +func TestDatabaseWriteTaggedBatch(t *testing.T) { + testDatabaseWriteBatch(t, true) +} + +type fakeIndexedErrorHandler struct { + errs []indexedErr +} + +func (f *fakeIndexedErrorHandler) HandleError(index int, err error) { + f.errs = append(f.errs, indexedErr{index, err}) +} + +type indexedErr struct { + index int + err error +} + +func testDatabaseWriteBatch(t *testing.T, tagged bool) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + d, mapCh, _ := newTestDatabase(t, ctrl, BootstrapNotStarted) + defer func() { + close(mapCh) + }() + + ns := dbAddNewMockNamespace(ctrl, d, "testns") + ns.EXPECT().GetOwnedShards().Return([]databaseShard{}).AnyTimes() + ns.EXPECT().Tick(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + ns.EXPECT().BootstrapState().Return(ShardBootstrapStates{}).AnyTimes() + ns.EXPECT().Close().Return(nil).Times(1) + require.NoError(t, d.Open()) + + var ( + namespace = ident.StringID("testns") + ctx = context.NewContext() + tagsIter = ident.EmptyTagIterator + ) + + writes := []struct { + series string + t time.Time + v float64 + err error + }{ + { + series: "foo", + t: time.Time{}.Add(10 * time.Second), + v: 1.0, + }, + { + series: "foo", + t: time.Time{}.Add(20 * time.Second), + v: 2.0, + }, + { + series: "bar", + t: time.Time{}.Add(20 * time.Second), + v: 3.0, + }, + { + series: "bar", + t: time.Time{}.Add(30 * time.Second), + v: 4.0, + }, + { + series: "error-series", + err: errors.New("some-error"), + }, + } + + batchWriter, err := d.BatchWriter(namespace, 10) + require.NoError(t, err) + + var i int + for _, write := range writes { + // Write with the provided index as i*2 so we can assert later that the + // ErrorHandler is called with the provided index, not the actual position + // in the WriteBatch slice. + if tagged { + batchWriter.AddTagged(i*2, ident.StringID(write.series), tagsIter, write.t, write.v, xtime.Second, nil) + ns.EXPECT().WriteTagged(ctx, ident.NewIDMatcher(write.series), gomock.Any(), + write.t, write.v, xtime.Second, nil).Return( + ts.Series{ + ID: ident.StringID(write.series + "-updated"), + Namespace: namespace, + Tags: ident.Tags{}, + }, write.err) + } else { + batchWriter.Add(i*2, ident.StringID(write.series), write.t, write.v, xtime.Second, nil) + ns.EXPECT().Write(ctx, ident.NewIDMatcher(write.series), + write.t, write.v, xtime.Second, nil).Return( + ts.Series{ + ID: ident.StringID(write.series + "-updated"), + Namespace: namespace, + Tags: ident.Tags{}, + }, write.err) + } + i++ + } + + errHandler := &fakeIndexedErrorHandler{} + if tagged { + err = d.WriteTaggedBatch(ctx, namespace, batchWriter.(ts.WriteBatch), errHandler) + } else { + err = d.WriteBatch(ctx, namespace, batchWriter.(ts.WriteBatch), errHandler) + } + + require.NoError(t, err) + require.Len(t, errHandler.errs, 1) + // Make sure it calls the error handler with the "original" provided index, not the position + // of the write in the WriteBatch slice. + require.Equal(t, (i-1)*2, errHandler.errs[0].index) + require.NoError(t, d.Close()) +} + func TestDatabaseBootstrapState(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index d6d62817b9..85c15643c7 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -31,7 +31,6 @@ import ( "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" - "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/bootstrap" @@ -60,7 +59,7 @@ var ( type commitLogWriter interface { Write( ctx context.Context, - series commitlog.Series, + series ts.Series, datapoint ts.Datapoint, unit xtime.Unit, annotation ts.Annotation, @@ -69,7 +68,7 @@ type commitLogWriter interface { type commitLogWriterFn func( ctx context.Context, - series commitlog.Series, + series ts.Series, datapoint ts.Datapoint, unit xtime.Unit, annotation ts.Annotation, @@ -77,7 +76,7 @@ type commitLogWriterFn func( func (fn commitLogWriterFn) Write( ctx context.Context, - series commitlog.Series, + series ts.Series, datapoint ts.Datapoint, unit xtime.Unit, annotation ts.Annotation, @@ -87,7 +86,7 @@ func (fn commitLogWriterFn) Write( var commitLogWriteNoOp = commitLogWriter(commitLogWriterFn(func( ctx context.Context, - series commitlog.Series, + series ts.Series, datapoint ts.Datapoint, unit xtime.Unit, annotation ts.Annotation, @@ -420,7 +419,7 @@ func (n *dbNamespace) AssignShardSet(shardSet sharding.ShardSet) { } else { bootstrapEnabled := n.nopts.BootstrapEnabled() n.shards[shard] = newDatabaseShard(n.metadata, shard, n.blockRetriever, - n.namespaceReaderMgr, n.increasingIndex, n.commitLogWriter, n.reverseIndex, + n.namespaceReaderMgr, n.increasingIndex, n.reverseIndex, bootstrapEnabled, n.opts, n.seriesOpts) n.metrics.shards.add.Inc(1) } @@ -557,16 +556,16 @@ func (n *dbNamespace) Write( value float64, unit xtime.Unit, annotation []byte, -) error { +) (ts.Series, error) { callStart := n.nowFn() shard, err := n.shardFor(id) if err != nil { n.metrics.write.ReportError(n.nowFn().Sub(callStart)) - return err + return ts.Series{}, err } - err = shard.Write(ctx, id, timestamp, value, unit, annotation) + series, err := shard.Write(ctx, id, timestamp, value, unit, annotation) n.metrics.write.ReportSuccessOrError(err, n.nowFn().Sub(callStart)) - return err + return series, err } func (n *dbNamespace) WriteTagged( @@ -577,20 +576,20 @@ func (n *dbNamespace) WriteTagged( value float64, unit xtime.Unit, annotation []byte, -) error { +) (ts.Series, error) { callStart := n.nowFn() if n.reverseIndex == nil { // only happens if indexing is enabled. n.metrics.writeTagged.ReportError(n.nowFn().Sub(callStart)) - return errNamespaceIndexingDisabled + return ts.Series{}, errNamespaceIndexingDisabled } shard, err := n.shardFor(id) if err != nil { n.metrics.writeTagged.ReportError(n.nowFn().Sub(callStart)) - return err + return ts.Series{}, err } - err = shard.WriteTagged(ctx, id, tags, timestamp, value, unit, annotation) + series, err := shard.WriteTagged(ctx, id, tags, timestamp, value, unit, annotation) n.metrics.writeTagged.ReportSuccessOrError(err, n.nowFn().Sub(callStart)) - return err + return series, err } func (n *dbNamespace) QueryIDs( @@ -1202,7 +1201,7 @@ func (n *dbNamespace) initShards(needBootstrap bool) { dbShards := make([]databaseShard, n.shardSet.Max()+1) for _, shard := range shards { dbShards[shard] = newDatabaseShard(n.metadata, shard, n.blockRetriever, - n.namespaceReaderMgr, n.increasingIndex, n.commitLogWriter, n.reverseIndex, + n.namespaceReaderMgr, n.increasingIndex, n.reverseIndex, needBootstrap, n.opts, n.seriesOpts) } n.shards = dbShards diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index bb850308e2..941381cc17 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -37,6 +37,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/dbnode/storage/repair" + "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/metrics" "github.com/m3db/m3x/context" xerrors "github.com/m3db/m3x/errors" @@ -140,7 +141,7 @@ func TestNamespaceWriteShardNotOwned(t *testing.T) { for i := range ns.shards { ns.shards[i] = nil } - err := ns.Write(ctx, ident.StringID("foo"), time.Now(), 0.0, xtime.Second, nil) + _, err := ns.Write(ctx, ident.StringID("foo"), time.Now(), 0.0, xtime.Second, nil) require.Error(t, err) require.True(t, xerrors.IsRetryableError(err)) require.Equal(t, "not responsible for shard 0", err.Error()) @@ -154,7 +155,7 @@ func TestNamespaceWriteShardOwned(t *testing.T) { defer ctx.Close() id := ident.StringID("foo") - ts := time.Now() + now := time.Now() val := 0.0 unit := xtime.Second ant := []byte(nil) @@ -162,10 +163,11 @@ func TestNamespaceWriteShardOwned(t *testing.T) { ns, closer := newTestNamespace(t) defer closer() shard := NewMockdatabaseShard(ctrl) - shard.EXPECT().Write(ctx, id, ts, val, unit, ant).Return(nil) + shard.EXPECT().Write(ctx, id, now, val, unit, ant).Return(ts.Series{}, nil) ns.shards[testShardIDs[0].ID()] = shard - require.NoError(t, ns.Write(ctx, id, ts, val, unit, ant)) + _, err := ns.Write(ctx, id, now, val, unit, ant) + require.NoError(t, err) } func TestNamespaceReadEncodedShardNotOwned(t *testing.T) { @@ -1074,15 +1076,15 @@ func TestNamespaceIndexInsert(t *testing.T) { defer closer() ctx := context.NewContext() - ts := time.Now() + now := time.Now() shard := NewMockdatabaseShard(ctrl) shard.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator, - ts, 1.0, xtime.Second, nil).Return(nil) + now, 1.0, xtime.Second, nil).Return(ts.Series{}, nil) ns.shards[testShardIDs[0].ID()] = shard - err := ns.WriteTagged(ctx, ident.StringID("a"), - ident.EmptyTagIterator, ts, 1.0, xtime.Second, nil) + _, err := ns.WriteTagged(ctx, ident.StringID("a"), + ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) require.NoError(t, err) shard.EXPECT().Close() diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index f409d6bd22..e93fba4fb0 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -41,6 +41,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" + "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xcounter" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3x/context" @@ -142,6 +143,7 @@ type options struct { fetchBlockMetadataResultsPool block.FetchBlockMetadataResultsPool fetchBlocksMetadataResultsPool block.FetchBlocksMetadataResultsPool queryIDsWorkerPool xsync.WorkerPool + writeBatchPool *ts.WriteBatchPool } // NewOptions creates a new set of storage options with defaults @@ -160,6 +162,9 @@ func newOptions(poolOpts pool.ObjectPoolOptions) Options { queryIDsWorkerPool := xsync.NewWorkerPool(int(math.Ceil(float64(runtime.NumCPU()) / 2))) queryIDsWorkerPool.Init() + writeBatchPool := ts.NewWriteBatchPool(poolOpts, nil, nil) + writeBatchPool.Init() + o := &options{ clockOpts: clock.NewOptions(), instrumentOpts: instrument.NewOptions(), @@ -195,6 +200,7 @@ func newOptions(poolOpts pool.ObjectPoolOptions) Options { fetchBlockMetadataResultsPool: block.NewFetchBlockMetadataResultsPool(poolOpts, 0), fetchBlocksMetadataResultsPool: block.NewFetchBlocksMetadataResultsPool(poolOpts, 0), queryIDsWorkerPool: queryIDsWorkerPool, + writeBatchPool: writeBatchPool, } return o.SetEncodingM3TSZPooled() } @@ -622,3 +628,13 @@ func (o *options) SetQueryIDsWorkerPool(value xsync.WorkerPool) Options { func (o *options) QueryIDsWorkerPool() xsync.WorkerPool { return o.queryIDsWorkerPool } + +func (o *options) SetWriteBatchPool(value *ts.WriteBatchPool) Options { + opts := *o + opts.writeBatchPool = value + return &opts +} + +func (o *options) WriteBatchPool() *ts.WriteBatchPool { + return o.writeBatchPool +} diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 67a1a942b1..bf731be216 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -34,7 +34,6 @@ import ( "github.com/m3db/m3/src/dbnode/generated/proto/pagetoken" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" - "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/block" @@ -146,7 +145,6 @@ type dbShard struct { namespaceReaderMgr databaseNamespaceReaderManager increasingIndex increasingIndex seriesPool series.DatabaseSeriesPool - commitLogWriter commitLogWriter reverseIndex namespaceIndex insertQueue *dbShardInsertQueue lookup *shardMap @@ -243,7 +241,6 @@ func newDatabaseShard( blockRetriever block.DatabaseBlockRetriever, namespaceReaderMgr databaseNamespaceReaderManager, increasingIndex increasingIndex, - commitLogWriter commitLogWriter, reverseIndex namespaceIndex, needsBootstrap bool, opts Options, @@ -262,7 +259,6 @@ func newDatabaseShard( namespaceReaderMgr: namespaceReaderMgr, increasingIndex: increasingIndex, seriesPool: opts.DatabaseSeriesPool(), - commitLogWriter: commitLogWriter, reverseIndex: reverseIndex, lookup: newShardMap(shardMapOptions{}), list: list.New(), @@ -753,7 +749,7 @@ func (s *dbShard) WriteTagged( value float64, unit xtime.Unit, annotation []byte, -) error { +) (ts.Series, error) { return s.writeAndIndex(ctx, id, tags, timestamp, value, unit, annotation, true) } @@ -765,7 +761,7 @@ func (s *dbShard) Write( value float64, unit xtime.Unit, annotation []byte, -) error { +) (ts.Series, error) { return s.writeAndIndex(ctx, id, ident.EmptyTagIterator, timestamp, value, unit, annotation, false) } @@ -779,11 +775,11 @@ func (s *dbShard) writeAndIndex( unit xtime.Unit, annotation []byte, shouldReverseIndex bool, -) error { +) (ts.Series, error) { // Prepare write entry, opts, err := s.tryRetrieveWritableSeries(id) if err != nil { - return err + return ts.Series{}, err } writable := entry != nil @@ -799,7 +795,7 @@ func (s *dbShard) writeAndIndex( }, }) if err != nil { - return err + return ts.Series{}, err } // Wait for the insert to be batched together and inserted @@ -808,7 +804,7 @@ func (s *dbShard) writeAndIndex( // Retrieve the inserted entry entry, err = s.writableSeries(id, tags) if err != nil { - return err + return ts.Series{}, err } writable = true @@ -842,7 +838,7 @@ func (s *dbShard) writeAndIndex( // release the reference we got on entry from `writableSeries` entry.DecrementReaderWriterCount() if err != nil { - return err + return ts.Series{}, err } } else { // This is an asynchronous insert and write @@ -861,7 +857,7 @@ func (s *dbShard) writeAndIndex( }, }) if err != nil { - return err + return ts.Series{}, err } // NB(r): Make sure to use the copied ID which will eventually // be set to the newly series inserted ID. @@ -874,7 +870,7 @@ func (s *dbShard) writeAndIndex( } // Write commit log - series := commitlog.Series{ + series := ts.Series{ UniqueIndex: commitLogSeriesUniqueIndex, Namespace: s.namespace.ID(), ID: commitLogSeriesID, @@ -882,13 +878,7 @@ func (s *dbShard) writeAndIndex( Shard: s.shard, } - datapoint := ts.Datapoint{ - Timestamp: timestamp, - Value: value, - } - - return s.commitLogWriter.Write(ctx, series, datapoint, - unit, annotation) + return series, nil } func (s *dbShard) ReadEncoded( diff --git a/src/dbnode/storage/shard_index_test.go b/src/dbnode/storage/shard_index_test.go index 4656d8f155..60e95a7e4e 100644 --- a/src/dbnode/storage/shard_index_test.go +++ b/src/dbnode/storage/shard_index_test.go @@ -78,18 +78,19 @@ func TestShardInsertNamespaceIndex(t *testing.T) { ctx := context.NewContext() defer ctx.Close() - require.NoError(t, - shard.WriteTagged(ctx, ident.StringID("foo"), - ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), - now, 1.0, xtime.Second, nil)) + _, err := shard.WriteTagged(ctx, ident.StringID("foo"), + ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), + now, 1.0, xtime.Second, nil) + require.NoError(t, err) - require.NoError(t, - shard.WriteTagged(ctx, ident.StringID("foo"), - ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), - now, 2.0, xtime.Second, nil)) + _, err = shard.WriteTagged(ctx, ident.StringID("foo"), + ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), + now, 2.0, xtime.Second, nil) + require.NoError(t, err) - require.NoError(t, - shard.Write(ctx, ident.StringID("baz"), now, 1.0, xtime.Second, nil)) + _, err = shard.Write( + ctx, ident.StringID("baz"), now, 1.0, xtime.Second, nil) + require.NoError(t, err) lock.Lock() defer lock.Unlock() @@ -124,21 +125,21 @@ func TestShardAsyncInsertNamespaceIndex(t *testing.T) { ctx := context.NewContext() defer ctx.Close() - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("foo"), - ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), - time.Now(), 1.0, xtime.Second, nil)) + _, err := shard.WriteTagged(ctx, ident.StringID("foo"), + ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), + time.Now(), 1.0, xtime.Second, nil) + assert.NoError(t, err) - assert.NoError(t, - shard.Write(ctx, ident.StringID("bar"), time.Now(), 1.0, xtime.Second, nil)) + _, err = shard.Write(ctx, ident.StringID("bar"), time.Now(), 1.0, xtime.Second, nil) + assert.NoError(t, err) - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("baz"), - ident.NewTagsIterator(ident.NewTags( - ident.StringTag("all", "tags"), - ident.StringTag("should", "be-present"), - )), - time.Now(), 1.0, xtime.Second, nil)) + _, err = shard.WriteTagged(ctx, ident.StringID("baz"), + ident.NewTagsIterator(ident.NewTags( + ident.StringTag("all", "tags"), + ident.StringTag("should", "be-present"), + )), + time.Now(), 1.0, xtime.Second, nil) + assert.NoError(t, err) for { lock.RLock() @@ -207,10 +208,10 @@ func TestShardAsyncIndexOnlyWhenNotIndexed(t *testing.T) { ctx := context.NewContext() defer ctx.Close() - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("foo"), - ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), - now, 1.0, xtime.Second, nil)) + _, err := shard.WriteTagged(ctx, ident.StringID("foo"), + ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), + now, 1.0, xtime.Second, nil) + assert.NoError(t, err) for { if l := atomic.LoadInt32(&numCalls); l == 1 { @@ -220,10 +221,10 @@ func TestShardAsyncIndexOnlyWhenNotIndexed(t *testing.T) { } // ensure we don't index once we have already indexed - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("foo"), - ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), - now.Add(time.Second), 2.0, xtime.Second, nil)) + _, err = shard.WriteTagged(ctx, ident.StringID("foo"), + ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), + now.Add(time.Second), 2.0, xtime.Second, nil) + assert.NoError(t, err) l := atomic.LoadInt32(&numCalls) assert.Equal(t, int32(1), l) @@ -270,10 +271,11 @@ func TestShardAsyncIndexIfExpired(t *testing.T) { ctx := context.NewContext() defer ctx.Close() - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("foo"), - ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), - now, 1.0, xtime.Second, nil)) + _, err := shard.WriteTagged(ctx, ident.StringID("foo"), + ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), + now, 1.0, xtime.Second, nil) + + assert.NoError(t, err) // wait till we're done indexing. indexed := xclock.WaitUntil(func() bool { @@ -283,10 +285,10 @@ func TestShardAsyncIndexIfExpired(t *testing.T) { // ensure we index because it's expired nextWriteTime := now.Add(blockSize) - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("foo"), - ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), - nextWriteTime, 2.0, xtime.Second, nil)) + _, err = shard.WriteTagged(ctx, ident.StringID("foo"), + ident.NewTagsIterator(ident.NewTags(ident.StringTag("name", "value"))), + nextWriteTime, 2.0, xtime.Second, nil) + assert.NoError(t, err) // wait till we're done indexing. reIndexed := xclock.WaitUntil(func() bool { diff --git a/src/dbnode/storage/shard_race_prop_test.go b/src/dbnode/storage/shard_race_prop_test.go index a3fc2a386f..d7c720dc20 100644 --- a/src/dbnode/storage/shard_race_prop_test.go +++ b/src/dbnode/storage/shard_race_prop_test.go @@ -157,7 +157,8 @@ func TestShardTickWriteRace(t *testing.T) { <-barrier ctx := context.NewContext() now := time.Now() - assert.NoError(t, shard.Write(ctx, id, now, 1.0, xtime.Second, nil)) + _, err := shard.Write(ctx, id, now, 1.0, xtime.Second, nil) + assert.NoError(t, err) ctx.BlockingClose() }() } diff --git a/src/dbnode/storage/shard_ref_count_test.go b/src/dbnode/storage/shard_ref_count_test.go index 57df9168e4..0b7ff8ba69 100644 --- a/src/dbnode/storage/shard_ref_count_test.go +++ b/src/dbnode/storage/shard_ref_count_test.go @@ -22,16 +22,13 @@ package storage import ( "fmt" - "sync/atomic" "testing" "time" "github.com/m3db/m3/src/dbnode/clock" - "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/namespace" - "github.com/m3db/m3/src/dbnode/ts" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" xclock "github.com/m3db/m3x/clock" "github.com/m3db/m3x/context" @@ -49,20 +46,7 @@ func TestShardWriteSyncRefCount(t *testing.T) { now := time.Now() opts := testDatabaseOptions() - numCommitLogWrites := int32(0) - mockCommitLogWriter := commitLogWriter(commitLogWriterFn(func( - ctx context.Context, - series commitlog.Series, - datapoint ts.Datapoint, - unit xtime.Unit, - annotation ts.Annotation, - ) error { - atomic.AddInt32(&numCommitLogWrites, 1) - return nil - })) - shard := testDatabaseShard(t, opts) - shard.commitLogWriter = mockCommitLogWriter shard.SetRuntimeOptions(runtime.NewOptions(). SetWriteNewSeriesAsync(false)) defer shard.Close() @@ -70,12 +54,14 @@ func TestShardWriteSyncRefCount(t *testing.T) { ctx := context.NewContext() defer ctx.Close() - assert.NoError(t, - shard.Write(ctx, ident.StringID("foo"), now, 1.0, xtime.Second, nil)) - assert.NoError(t, - shard.Write(ctx, ident.StringID("bar"), now, 2.0, xtime.Second, nil)) - assert.NoError(t, - shard.Write(ctx, ident.StringID("baz"), now, 3.0, xtime.Second, nil)) + _, err := shard.Write(ctx, ident.StringID("foo"), now, 1.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.Write(ctx, ident.StringID("bar"), now, 2.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.Write(ctx, ident.StringID("baz"), now, 3.0, xtime.Second, nil) + assert.NoError(t, err) // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { @@ -88,17 +74,15 @@ func TestShardWriteSyncRefCount(t *testing.T) { // write already inserted series' next := now.Add(time.Minute) - assert.NoError(t, - shard.Write(ctx, ident.StringID("foo"), next, 1.0, xtime.Second, nil)) - assert.NoError(t, - shard.Write(ctx, ident.StringID("bar"), next, 2.0, xtime.Second, nil)) - assert.NoError(t, - shard.Write(ctx, ident.StringID("baz"), next, 3.0, xtime.Second, nil)) - - written := xclock.WaitUntil(func() bool { - return atomic.LoadInt32(&numCommitLogWrites) == 6 - }, 2*time.Second) - assert.True(t, written) + + _, err = shard.Write(ctx, ident.StringID("foo"), next, 1.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.Write(ctx, ident.StringID("bar"), next, 2.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.Write(ctx, ident.StringID("baz"), next, 3.0, xtime.Second, nil) + assert.NoError(t, err) // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { @@ -162,23 +146,12 @@ func TestShardWriteTaggedSyncRefCountSyncIndex(t *testing.T) { } func testShardWriteTaggedSyncRefCount(t *testing.T, idx namespaceIndex) { - numCommitLogWrites := int32(0) - mockCommitLogWriter := commitLogWriter(commitLogWriterFn(func( - ctx context.Context, - series commitlog.Series, - datapoint ts.Datapoint, - unit xtime.Unit, - annotation ts.Annotation, - ) error { - atomic.AddInt32(&numCommitLogWrites, 1) - return nil - })) - - now := time.Now() - opts := testDatabaseOptions() + var ( + now = time.Now() + opts = testDatabaseOptions() + shard = testDatabaseShardWithIndexFn(t, opts, idx) + ) - shard := testDatabaseShardWithIndexFn(t, opts, idx) - shard.commitLogWriter = mockCommitLogWriter shard.SetRuntimeOptions(runtime.NewOptions(). SetWriteNewSeriesAsync(false)) defer shard.Close() @@ -186,12 +159,14 @@ func testShardWriteTaggedSyncRefCount(t *testing.T, idx namespaceIndex) { ctx := context.NewContext() defer ctx.Close() - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("foo"), ident.EmptyTagIterator, now, 1.0, xtime.Second, nil)) - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("bar"), ident.EmptyTagIterator, now, 2.0, xtime.Second, nil)) - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("baz"), ident.EmptyTagIterator, now, 3.0, xtime.Second, nil)) + _, err := shard.WriteTagged(ctx, ident.StringID("foo"), ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.WriteTagged(ctx, ident.StringID("bar"), ident.EmptyTagIterator, now, 2.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.WriteTagged(ctx, ident.StringID("baz"), ident.EmptyTagIterator, now, 3.0, xtime.Second, nil) + assert.NoError(t, err) // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { @@ -204,17 +179,15 @@ func testShardWriteTaggedSyncRefCount(t *testing.T, idx namespaceIndex) { // write already inserted series' next := now.Add(time.Minute) - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("foo"), ident.EmptyTagIterator, next, 1.0, xtime.Second, nil)) - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("bar"), ident.EmptyTagIterator, next, 2.0, xtime.Second, nil)) - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("baz"), ident.EmptyTagIterator, next, 3.0, xtime.Second, nil)) - - written := xclock.WaitUntil(func() bool { - return atomic.LoadInt32(&numCommitLogWrites) == 6 - }, 2*time.Second) - assert.True(t, written) + + _, err = shard.WriteTagged(ctx, ident.StringID("foo"), ident.EmptyTagIterator, next, 1.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.WriteTagged(ctx, ident.StringID("bar"), ident.EmptyTagIterator, next, 2.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.WriteTagged(ctx, ident.StringID("baz"), ident.EmptyTagIterator, next, 3.0, xtime.Second, nil) + assert.NoError(t, err) // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { @@ -233,18 +206,6 @@ func TestShardWriteAsyncRefCount(t *testing.T) { }, 100*time.Millisecond) defer closer.Close() - numCommitLogWrites := int32(0) - mockCommitLogWriter := commitLogWriter(commitLogWriterFn(func( - ctx context.Context, - series commitlog.Series, - datapoint ts.Datapoint, - unit xtime.Unit, - annotation ts.Annotation, - ) error { - atomic.AddInt32(&numCommitLogWrites, 1) - return nil - })) - now := time.Now() opts := testDatabaseOptions() opts = opts.SetInstrumentOptions( @@ -253,7 +214,6 @@ func TestShardWriteAsyncRefCount(t *testing.T) { SetReportInterval(100 * time.Millisecond)) shard := testDatabaseShard(t, opts) - shard.commitLogWriter = mockCommitLogWriter shard.SetRuntimeOptions(runtime.NewOptions(). SetWriteNewSeriesAsync(true)) defer shard.Close() @@ -261,12 +221,14 @@ func TestShardWriteAsyncRefCount(t *testing.T) { ctx := context.NewContext() defer ctx.Close() - assert.NoError(t, - shard.Write(ctx, ident.StringID("foo"), now, 1.0, xtime.Second, nil)) - assert.NoError(t, - shard.Write(ctx, ident.StringID("bar"), now, 2.0, xtime.Second, nil)) - assert.NoError(t, - shard.Write(ctx, ident.StringID("baz"), now, 3.0, xtime.Second, nil)) + _, err := shard.Write(ctx, ident.StringID("foo"), now, 1.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.Write(ctx, ident.StringID("bar"), now, 2.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.Write(ctx, ident.StringID("baz"), now, 3.0, xtime.Second, nil) + assert.NoError(t, err) inserted := xclock.WaitUntil(func() bool { counter, ok := testReporter.Counters()["dbshard.insert-queue.inserts"] @@ -285,17 +247,15 @@ func TestShardWriteAsyncRefCount(t *testing.T) { // write already inserted series' next := now.Add(time.Minute) - assert.NoError(t, - shard.Write(ctx, ident.StringID("foo"), next, 1.0, xtime.Second, nil)) - assert.NoError(t, - shard.Write(ctx, ident.StringID("bar"), next, 2.0, xtime.Second, nil)) - assert.NoError(t, - shard.Write(ctx, ident.StringID("baz"), next, 3.0, xtime.Second, nil)) - - written := xclock.WaitUntil(func() bool { - return atomic.LoadInt32(&numCommitLogWrites) == 6 - }, 2*time.Second) - assert.True(t, written) + + _, err = shard.Write(ctx, ident.StringID("foo"), next, 1.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.Write(ctx, ident.StringID("bar"), next, 2.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.Write(ctx, ident.StringID("baz"), next, 3.0, xtime.Second, nil) + assert.NoError(t, err) // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { @@ -361,18 +321,6 @@ func testShardWriteTaggedAsyncRefCount(t *testing.T, idx namespaceIndex) { }, 100*time.Millisecond) defer closer.Close() - numCommitLogWrites := int32(0) - mockCommitLogWriter := commitLogWriter(commitLogWriterFn(func( - ctx context.Context, - series commitlog.Series, - datapoint ts.Datapoint, - unit xtime.Unit, - annotation ts.Annotation, - ) error { - atomic.AddInt32(&numCommitLogWrites, 1) - return nil - })) - now := time.Now() opts := testDatabaseOptions() opts = opts.SetInstrumentOptions( @@ -381,7 +329,6 @@ func testShardWriteTaggedAsyncRefCount(t *testing.T, idx namespaceIndex) { SetReportInterval(100 * time.Millisecond)) shard := testDatabaseShardWithIndexFn(t, opts, idx) - shard.commitLogWriter = mockCommitLogWriter shard.SetRuntimeOptions(runtime.NewOptions(). SetWriteNewSeriesAsync(true)) defer shard.Close() @@ -389,12 +336,14 @@ func testShardWriteTaggedAsyncRefCount(t *testing.T, idx namespaceIndex) { ctx := context.NewContext() defer ctx.Close() - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("foo"), ident.EmptyTagIterator, now, 1.0, xtime.Second, nil)) - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("bar"), ident.EmptyTagIterator, now, 2.0, xtime.Second, nil)) - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("baz"), ident.EmptyTagIterator, now, 3.0, xtime.Second, nil)) + _, err := shard.WriteTagged(ctx, ident.StringID("foo"), ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.WriteTagged(ctx, ident.StringID("bar"), ident.EmptyTagIterator, now, 2.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.WriteTagged(ctx, ident.StringID("baz"), ident.EmptyTagIterator, now, 3.0, xtime.Second, nil) + assert.NoError(t, err) inserted := xclock.WaitUntil(func() bool { counter, ok := testReporter.Counters()["dbshard.insert-queue.inserts"] @@ -413,17 +362,15 @@ func testShardWriteTaggedAsyncRefCount(t *testing.T, idx namespaceIndex) { // write already inserted series' next := now.Add(time.Minute) - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("foo"), ident.EmptyTagIterator, next, 1.0, xtime.Second, nil)) - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("bar"), ident.EmptyTagIterator, next, 2.0, xtime.Second, nil)) - assert.NoError(t, - shard.WriteTagged(ctx, ident.StringID("baz"), ident.EmptyTagIterator, next, 3.0, xtime.Second, nil)) - - written := xclock.WaitUntil(func() bool { - return atomic.LoadInt32(&numCommitLogWrites) == 6 - }, 5*time.Second) - assert.True(t, written) + + _, err = shard.WriteTagged(ctx, ident.StringID("foo"), ident.EmptyTagIterator, next, 1.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.WriteTagged(ctx, ident.StringID("bar"), ident.EmptyTagIterator, next, 2.0, xtime.Second, nil) + assert.NoError(t, err) + + _, err = shard.WriteTagged(ctx, ident.StringID("baz"), ident.EmptyTagIterator, next, 3.0, xtime.Second, nil) + assert.NoError(t, err) // ensure all entries have no references left for _, id := range []string{"foo", "bar", "baz"} { diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index fe63451372..42bc7c8e29 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -77,7 +77,7 @@ func testDatabaseShardWithIndexFn( nsReaderMgr := newNamespaceReaderManager(metadata, tally.NoopScope, opts) seriesOpts := NewSeriesOptionsFromOptions(opts, defaultTestNs1Opts.RetentionOptions()) return newDatabaseShard(metadata, 0, nil, nsReaderMgr, - &testIncreasingIndex{}, commitLogWriteNoOp, idx, true, opts, seriesOpts).(*dbShard) + &testIncreasingIndex{}, idx, true, opts, seriesOpts).(*dbShard) } func addMockSeries(ctrl *gomock.Controller, shard *dbShard, id ident.ID, tags ident.Tags, index uint64) *series.MockDatabaseSeries { @@ -97,7 +97,7 @@ func TestShardDontNeedBootstrap(t *testing.T) { defer closer() seriesOpts := NewSeriesOptionsFromOptions(opts, testNs.Options().RetentionOptions()) shard := newDatabaseShard(testNs.metadata, 0, nil, nil, - &testIncreasingIndex{}, commitLogWriteNoOp, nil, false, opts, seriesOpts).(*dbShard) + &testIncreasingIndex{}, nil, false, opts, seriesOpts).(*dbShard) defer shard.Close() require.Equal(t, Bootstrapped, shard.bootstrapState) @@ -110,7 +110,7 @@ func TestShardBootstrapState(t *testing.T) { defer closer() seriesOpts := NewSeriesOptionsFromOptions(opts, testNs.Options().RetentionOptions()) shard := newDatabaseShard(testNs.metadata, 0, nil, nil, - &testIncreasingIndex{}, commitLogWriteNoOp, nil, false, opts, seriesOpts).(*dbShard) + &testIncreasingIndex{}, nil, false, opts, seriesOpts).(*dbShard) defer shard.Close() require.Equal(t, Bootstrapped, shard.bootstrapState) diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index ec9c87fda8..5ca7cfae6e 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -42,6 +42,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" + "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xcounter" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3x/context" @@ -54,6 +55,39 @@ import ( "github.com/golang/mock/gomock" ) +// MockIndexedErrorHandler is a mock of IndexedErrorHandler interface +type MockIndexedErrorHandler struct { + ctrl *gomock.Controller + recorder *MockIndexedErrorHandlerMockRecorder +} + +// MockIndexedErrorHandlerMockRecorder is the mock recorder for MockIndexedErrorHandler +type MockIndexedErrorHandlerMockRecorder struct { + mock *MockIndexedErrorHandler +} + +// NewMockIndexedErrorHandler creates a new mock instance +func NewMockIndexedErrorHandler(ctrl *gomock.Controller) *MockIndexedErrorHandler { + mock := &MockIndexedErrorHandler{ctrl: ctrl} + mock.recorder = &MockIndexedErrorHandlerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockIndexedErrorHandler) EXPECT() *MockIndexedErrorHandlerMockRecorder { + return m.recorder +} + +// HandleError mocks base method +func (m *MockIndexedErrorHandler) HandleError(index int, err error) { + m.ctrl.Call(m, "HandleError", index, err) +} + +// HandleError indicates an expected call of HandleError +func (mr *MockIndexedErrorHandlerMockRecorder) HandleError(index, err interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleError", reflect.TypeOf((*MockIndexedErrorHandler)(nil).HandleError), index, err) +} + // MockDatabase is a mock of Database interface type MockDatabase struct { ctrl *gomock.Controller @@ -196,6 +230,43 @@ func (mr *MockDatabaseMockRecorder) WriteTagged(ctx, namespace, id, tags, timest return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTagged", reflect.TypeOf((*MockDatabase)(nil).WriteTagged), ctx, namespace, id, tags, timestamp, value, unit, annotation) } +// BatchWriter mocks base method +func (m *MockDatabase) BatchWriter(namespace ident.ID, batchSize int) (ts.BatchWriter, error) { + ret := m.ctrl.Call(m, "BatchWriter", namespace, batchSize) + ret0, _ := ret[0].(ts.BatchWriter) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BatchWriter indicates an expected call of BatchWriter +func (mr *MockDatabaseMockRecorder) BatchWriter(namespace, batchSize interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchWriter", reflect.TypeOf((*MockDatabase)(nil).BatchWriter), namespace, batchSize) +} + +// WriteBatch mocks base method +func (m *MockDatabase) WriteBatch(ctx context.Context, namespace ident.ID, writes ts.BatchWriter, errHandler IndexedErrorHandler) error { + ret := m.ctrl.Call(m, "WriteBatch", ctx, namespace, writes, errHandler) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteBatch indicates an expected call of WriteBatch +func (mr *MockDatabaseMockRecorder) WriteBatch(ctx, namespace, writes, errHandler interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteBatch", reflect.TypeOf((*MockDatabase)(nil).WriteBatch), ctx, namespace, writes, errHandler) +} + +// WriteTaggedBatch mocks base method +func (m *MockDatabase) WriteTaggedBatch(ctx context.Context, namespace ident.ID, writes ts.BatchWriter, errHandler IndexedErrorHandler) error { + ret := m.ctrl.Call(m, "WriteTaggedBatch", ctx, namespace, writes, errHandler) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteTaggedBatch indicates an expected call of WriteTaggedBatch +func (mr *MockDatabaseMockRecorder) WriteTaggedBatch(ctx, namespace, writes, errHandler interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTaggedBatch", reflect.TypeOf((*MockDatabase)(nil).WriteTaggedBatch), ctx, namespace, writes, errHandler) +} + // QueryIDs mocks base method func (m *MockDatabase) QueryIDs(ctx context.Context, namespace ident.ID, query index.Query, opts index.QueryOptions) (index.QueryResults, error) { ret := m.ctrl.Call(m, "QueryIDs", ctx, namespace, query, opts) @@ -464,6 +535,43 @@ func (mr *MockdatabaseMockRecorder) WriteTagged(ctx, namespace, id, tags, timest return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTagged", reflect.TypeOf((*Mockdatabase)(nil).WriteTagged), ctx, namespace, id, tags, timestamp, value, unit, annotation) } +// BatchWriter mocks base method +func (m *Mockdatabase) BatchWriter(namespace ident.ID, batchSize int) (ts.BatchWriter, error) { + ret := m.ctrl.Call(m, "BatchWriter", namespace, batchSize) + ret0, _ := ret[0].(ts.BatchWriter) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BatchWriter indicates an expected call of BatchWriter +func (mr *MockdatabaseMockRecorder) BatchWriter(namespace, batchSize interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchWriter", reflect.TypeOf((*Mockdatabase)(nil).BatchWriter), namespace, batchSize) +} + +// WriteBatch mocks base method +func (m *Mockdatabase) WriteBatch(ctx context.Context, namespace ident.ID, writes ts.BatchWriter, errHandler IndexedErrorHandler) error { + ret := m.ctrl.Call(m, "WriteBatch", ctx, namespace, writes, errHandler) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteBatch indicates an expected call of WriteBatch +func (mr *MockdatabaseMockRecorder) WriteBatch(ctx, namespace, writes, errHandler interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteBatch", reflect.TypeOf((*Mockdatabase)(nil).WriteBatch), ctx, namespace, writes, errHandler) +} + +// WriteTaggedBatch mocks base method +func (m *Mockdatabase) WriteTaggedBatch(ctx context.Context, namespace ident.ID, writes ts.BatchWriter, errHandler IndexedErrorHandler) error { + ret := m.ctrl.Call(m, "WriteTaggedBatch", ctx, namespace, writes, errHandler) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteTaggedBatch indicates an expected call of WriteTaggedBatch +func (mr *MockdatabaseMockRecorder) WriteTaggedBatch(ctx, namespace, writes, errHandler interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTaggedBatch", reflect.TypeOf((*Mockdatabase)(nil).WriteTaggedBatch), ctx, namespace, writes, errHandler) +} + // QueryIDs mocks base method func (m *Mockdatabase) QueryIDs(ctx context.Context, namespace ident.ID, query index.Query, opts index.QueryOptions) (index.QueryResults, error) { ret := m.ctrl.Call(m, "QueryIDs", ctx, namespace, query, opts) @@ -817,10 +925,11 @@ func (mr *MockdatabaseNamespaceMockRecorder) Tick(c, tickStart interface{}) *gom } // Write mocks base method -func (m *MockdatabaseNamespace) Write(ctx context.Context, id ident.ID, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) error { +func (m *MockdatabaseNamespace) Write(ctx context.Context, id ident.ID, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) (ts.Series, error) { ret := m.ctrl.Call(m, "Write", ctx, id, timestamp, value, unit, annotation) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(ts.Series) + ret1, _ := ret[1].(error) + return ret0, ret1 } // Write indicates an expected call of Write @@ -829,10 +938,11 @@ func (mr *MockdatabaseNamespaceMockRecorder) Write(ctx, id, timestamp, value, un } // WriteTagged mocks base method -func (m *MockdatabaseNamespace) WriteTagged(ctx context.Context, id ident.ID, tags ident.TagIterator, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) error { +func (m *MockdatabaseNamespace) WriteTagged(ctx context.Context, id ident.ID, tags ident.TagIterator, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) (ts.Series, error) { ret := m.ctrl.Call(m, "WriteTagged", ctx, id, tags, timestamp, value, unit, annotation) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(ts.Series) + ret1, _ := ret[1].(error) + return ret0, ret1 } // WriteTagged indicates an expected call of WriteTagged @@ -1181,10 +1291,11 @@ func (mr *MockdatabaseShardMockRecorder) Tick(c, tickStart interface{}) *gomock. } // Write mocks base method -func (m *MockdatabaseShard) Write(ctx context.Context, id ident.ID, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) error { +func (m *MockdatabaseShard) Write(ctx context.Context, id ident.ID, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) (ts.Series, error) { ret := m.ctrl.Call(m, "Write", ctx, id, timestamp, value, unit, annotation) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(ts.Series) + ret1, _ := ret[1].(error) + return ret0, ret1 } // Write indicates an expected call of Write @@ -1193,10 +1304,11 @@ func (mr *MockdatabaseShardMockRecorder) Write(ctx, id, timestamp, value, unit, } // WriteTagged mocks base method -func (m *MockdatabaseShard) WriteTagged(ctx context.Context, id ident.ID, tags ident.TagIterator, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) error { +func (m *MockdatabaseShard) WriteTagged(ctx context.Context, id ident.ID, tags ident.TagIterator, timestamp time.Time, value float64, unit time0.Unit, annotation []byte) (ts.Series, error) { ret := m.ctrl.Call(m, "WriteTagged", ctx, id, tags, timestamp, value, unit, annotation) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(ts.Series) + ret1, _ := ret[1].(error) + return ret0, ret1 } // WriteTagged indicates an expected call of WriteTagged @@ -2849,3 +2961,27 @@ func (m *MockOptions) QueryIDsWorkerPool() sync0.WorkerPool { func (mr *MockOptionsMockRecorder) QueryIDsWorkerPool() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryIDsWorkerPool", reflect.TypeOf((*MockOptions)(nil).QueryIDsWorkerPool)) } + +// SetWriteBatchPool mocks base method +func (m *MockOptions) SetWriteBatchPool(value *ts.WriteBatchPool) Options { + ret := m.ctrl.Call(m, "SetWriteBatchPool", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetWriteBatchPool indicates an expected call of SetWriteBatchPool +func (mr *MockOptionsMockRecorder) SetWriteBatchPool(value interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWriteBatchPool", reflect.TypeOf((*MockOptions)(nil).SetWriteBatchPool), value) +} + +// WriteBatchPool mocks base method +func (m *MockOptions) WriteBatchPool() *ts.WriteBatchPool { + ret := m.ctrl.Call(m, "WriteBatchPool") + ret0, _ := ret[0].(*ts.WriteBatchPool) + return ret0 +} + +// WriteBatchPool indicates an expected call of WriteBatchPool +func (mr *MockOptionsMockRecorder) WriteBatchPool() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteBatchPool", reflect.TypeOf((*MockOptions)(nil).WriteBatchPool)) +} diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 9efceccf21..9c9275ac22 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -38,6 +38,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" + "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xcounter" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3x/context" @@ -51,6 +52,13 @@ import ( // PageToken is an opaque paging token. type PageToken []byte +// IndexedErrorHandler can handle individual errors based on their index. It +// is used primarily in cases where we need to handle errors in batches, but +// want to avoid an intermediary allocation of []error. +type IndexedErrorHandler interface { + HandleError(index int, err error) +} + // Database is a time series database type Database interface { // Options returns the database options @@ -103,6 +111,26 @@ type Database interface { annotation []byte, ) error + // BatchWriter returns a batch writer for the provided namespace that can + // be used to issue a batch of writes to either WriteBatch or WriteTaggedBatch. + BatchWriter(namespace ident.ID, batchSize int) (ts.BatchWriter, error) + + // WriteBatch is the same as Write, but in batch. + WriteBatch( + ctx context.Context, + namespace ident.ID, + writes ts.BatchWriter, + errHandler IndexedErrorHandler, + ) error + + // WriteTaggedBatch is the same as WriteTagged, but in batch. + WriteTaggedBatch( + ctx context.Context, + namespace ident.ID, + writes ts.BatchWriter, + errHandler IndexedErrorHandler, + ) error + // QueryIDs resolves the given query into known IDs. QueryIDs( ctx context.Context, @@ -221,7 +249,7 @@ type databaseNamespace interface { value float64, unit xtime.Unit, annotation []byte, - ) error + ) (ts.Series, error) // WriteTagged values to the namespace for an ID WriteTagged( @@ -232,7 +260,7 @@ type databaseNamespace interface { value float64, unit xtime.Unit, annotation []byte, - ) error + ) (ts.Series, error) // QueryIDs resolves the given query into known IDs. QueryIDs( @@ -350,7 +378,7 @@ type databaseShard interface { value float64, unit xtime.Unit, annotation []byte, - ) error + ) (ts.Series, error) // WriteTagged values to the shard for an ID WriteTagged( @@ -361,7 +389,7 @@ type databaseShard interface { value float64, unit xtime.Unit, annotation []byte, - ) error + ) (ts.Series, error) ReadEncoded( ctx context.Context, @@ -821,6 +849,12 @@ type Options interface { // QueryIDsWorkerPool returns the QueryIDs worker pool. QueryIDsWorkerPool() xsync.WorkerPool + + // SetWriteBatchPool sets the WriteBatch pool. + SetWriteBatchPool(value *ts.WriteBatchPool) Options + + // WriteBatchPool returns the WriteBatch pool. + WriteBatchPool() *ts.WriteBatchPool } // DatabaseBootstrapState stores a snapshot of the bootstrap state for all shards across all diff --git a/src/dbnode/ts/types.go b/src/dbnode/ts/types.go index b63b2a70ab..4e14df81cd 100644 --- a/src/dbnode/ts/types.go +++ b/src/dbnode/ts/types.go @@ -22,8 +22,56 @@ package ts import ( "time" + + "github.com/m3db/m3x/ident" + xtime "github.com/m3db/m3x/time" ) +// Write is a write for the commitlog. +type Write struct { + Series Series + Datapoint Datapoint + Unit xtime.Unit + Annotation Annotation +} + +// BatchWrite represents a write that was added to the +// BatchWriter. +type BatchWrite struct { + // Used by the commitlog (series needed to be updated by the shard + // object first, cannot use the Series provided by the caller as it + // is missing important fields like Tags.) + Write Write + // Not used by the commitlog, provided by the caller (since the request + // is usually coming from over the wire) and is superseded by the Tags + // in Write.Series which will get set by the Shard object. + TagIter ident.TagIterator + // Used to help the caller tie errors back to an index in their + // own collection. + OriginalIndex int + // Used by the commitlog. + Err error +} + +// Series describes a series. +type Series struct { + // UniqueIndex is the unique index assigned to this series (only valid + // on a per-process basis). + UniqueIndex uint64 + + // Namespace is the namespace the series belongs to. + Namespace ident.ID + + // ID is the series identifier. + ID ident.ID + + // Tags are the series tags. + Tags ident.Tags + + // Shard is the shard the series belongs to. + Shard uint32 +} + // A Datapoint is a single data value reported at a given time. type Datapoint struct { Timestamp time.Time @@ -37,3 +85,42 @@ func (d Datapoint) Equal(x Datapoint) bool { // Annotation represents information used to annotate datapoints. type Annotation []byte + +// WriteBatch is the interface that supports adding writes to the batch, +// as well as iterating through the batched writes and resetting the +// struct (for pooling). +type WriteBatch interface { + BatchWriter + // Can't use a real iterator pattern here as it slows things down. + Iter() []BatchWrite + SetOutcome(idx int, series Series, err error) + Reset(batchSize int, ns ident.ID) + Finalize() + + // Returns the WriteBatch's internal capacity. Used by the pool to throw + // away batches that have grown too large. + cap() int +} + +// BatchWriter is the interface that is used for preparing a batch of +// writes. +type BatchWriter interface { + Add( + originalIndex int, + id ident.ID, + timestamp time.Time, + value float64, + unit xtime.Unit, + annotation []byte, + ) + + AddTagged( + originalIndex int, + id ident.ID, + tags ident.TagIterator, + timestamp time.Time, + value float64, + unit xtime.Unit, + annotation []byte, + ) +} diff --git a/src/dbnode/ts/write_batch.go b/src/dbnode/ts/write_batch.go new file mode 100644 index 0000000000..d38972ab97 --- /dev/null +++ b/src/dbnode/ts/write_batch.go @@ -0,0 +1,136 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ts + +import ( + "time" + + "github.com/m3db/m3x/ident" + xtime "github.com/m3db/m3x/time" +) + +type writeBatch struct { + writes []BatchWrite + ns ident.ID + finalizeFn func(WriteBatch) +} + +// NewWriteBatch creates a new WriteBatch. +func NewWriteBatch( + batchSize int, + ns ident.ID, + finalizeFn func(WriteBatch), +) WriteBatch { + return &writeBatch{ + writes: make([]BatchWrite, 0, batchSize), + ns: ns, + finalizeFn: finalizeFn, + } +} + +func (b *writeBatch) Add( + originalIndex int, + id ident.ID, + timestamp time.Time, + value float64, + unit xtime.Unit, + annotation []byte, +) { + write := newBatchWriterWrite( + originalIndex, b.ns, id, nil, timestamp, value, unit, annotation) + b.writes = append(b.writes, write) +} + +func (b *writeBatch) AddTagged( + originalIndex int, + id ident.ID, + tagIter ident.TagIterator, + timestamp time.Time, + value float64, + unit xtime.Unit, + annotation []byte, +) { + write := newBatchWriterWrite( + originalIndex, b.ns, id, tagIter, timestamp, value, unit, annotation) + b.writes = append(b.writes, write) +} + +func (b *writeBatch) Reset( + batchSize int, + ns ident.ID, +) { + var writes []BatchWrite + if batchSize > cap(b.writes) { + writes = make([]BatchWrite, 0, batchSize) + } else { + writes = b.writes[:0] + } + + b.writes = writes + b.ns = ns +} + +func (b *writeBatch) Iter() []BatchWrite { + return b.writes +} + +func (b *writeBatch) SetOutcome(idx int, series Series, err error) { + b.writes[idx].Write.Series = series + b.writes[idx].Err = err +} + +func (b *writeBatch) Finalize() { + b.ns = nil + b.writes = b.writes[:0] + b.finalizeFn(b) +} + +func (b *writeBatch) cap() int { + return cap(b.writes) +} + +func newBatchWriterWrite( + originalIndex int, + namespace ident.ID, + id ident.ID, + tagsIter ident.TagIterator, + timestamp time.Time, + value float64, + unit xtime.Unit, + annotation []byte, +) BatchWrite { + return BatchWrite{ + Write: Write{ + Series: Series{ + ID: id, + Namespace: namespace, + }, + Datapoint: Datapoint{ + Timestamp: timestamp, + Value: value, + }, + Unit: unit, + Annotation: annotation, + }, + TagIter: tagsIter, + OriginalIndex: originalIndex, + } +} diff --git a/src/dbnode/ts/write_batch_pool.go b/src/dbnode/ts/write_batch_pool.go new file mode 100644 index 0000000000..7d563d9940 --- /dev/null +++ b/src/dbnode/ts/write_batch_pool.go @@ -0,0 +1,85 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ts + +import ( + "github.com/m3db/m3x/pool" +) + +const ( + // defaultInitialiBatchSize determines the initial batch size that will be used when filling up the + // pool. + defaultInitialBatchSize = 1024 + // defaultWritePoolMaxBatchSize is the default maximum size for a writeBatch that the pool + // will allow to remain in the pool. Any batches larger than that will be discarded to prevent + // excessive memory use forever in the case of an exceptionally large batch write. + defaultMaxBatchSize = 100000 +) + +// WriteBatchPool is a pool of WriteBatch. +type WriteBatchPool struct { + pool pool.ObjectPool + initialBatchSize int + maxBatchSize int +} + +// NewWriteBatchPool constructs a new WriteBatchPool. +func NewWriteBatchPool( + opts pool.ObjectPoolOptions, + initialBatchSizeOverride, + maxBatchSizeOverride *int, +) *WriteBatchPool { + initialBatchSize := defaultInitialBatchSize + if initialBatchSizeOverride != nil { + initialBatchSize = *initialBatchSizeOverride + } + + maxBatchSize := defaultMaxBatchSize + if maxBatchSizeOverride != nil { + maxBatchSize = *maxBatchSizeOverride + } + + p := pool.NewObjectPool(opts) + return &WriteBatchPool{pool: p, initialBatchSize: initialBatchSize, maxBatchSize: maxBatchSize} +} + +// Init initializes a WriteBatchPool. +func (p *WriteBatchPool) Init() { + p.pool.Init(func() interface{} { + return NewWriteBatch(p.initialBatchSize, nil, p.Put) + }) +} + +// Get retrieves a WriteBatch from the pool. +func (p *WriteBatchPool) Get() WriteBatch { + w := p.pool.Get().(WriteBatch) + return w +} + +// Put stores a WriteBatch in the pool. +func (p *WriteBatchPool) Put(w WriteBatch) { + if w.cap() > p.maxBatchSize { + // WriteBatch has grown too large to remain in the pool. + return + } + + p.pool.Put(w) +} diff --git a/src/dbnode/ts/write_batch_test.go b/src/dbnode/ts/write_batch_test.go new file mode 100644 index 0000000000..48ec56a05e --- /dev/null +++ b/src/dbnode/ts/write_batch_test.go @@ -0,0 +1,211 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ts + +import ( + "bytes" + "errors" + "fmt" + "testing" + "time" + + "github.com/m3db/m3x/ident" + xtime "github.com/m3db/m3x/time" + + "github.com/stretchr/testify/require" +) + +const ( + batchSize = 2 + maxBatchSize = 10 +) + +var ( + namespace = ident.StringID("namespace") + writes = []testWrite{ + { + id: ident.StringID("series1"), + tagIter: ident.NewTagsIterator(ident.NewTags( + ident.Tag{ + Name: ident.StringID("name1"), + Value: ident.StringID("value1"), + })), + timestamp: time.Now(), + value: 0, + unit: xtime.Nanosecond, + annotation: []byte("annotation1"), + }, + { + id: ident.StringID("series2"), + tagIter: ident.NewTagsIterator(ident.NewTags( + ident.Tag{ + Name: ident.StringID("name2"), + Value: ident.StringID("value2"), + })), + timestamp: time.Now(), + value: 1, + unit: xtime.Nanosecond, + annotation: []byte("annotation2s"), + }, + } +) + +type testWrite struct { + id ident.ID + tagIter ident.TagIterator + timestamp time.Time + value float64 + unit xtime.Unit + annotation []byte +} + +func TestBatchWriterAddAndIter(t *testing.T) { + writeBatch := NewWriteBatch(batchSize, namespace, nil) + + for i, write := range writes { + writeBatch.Add( + i, + write.id, + write.timestamp, + write.value, + write.unit, + write.annotation) + } + + // Make sure all the data is there + assertDataPresent(t, writes, writeBatch) +} + +func TestBatchWriterAddTaggedAndIter(t *testing.T) { + writeBatch := NewWriteBatch(batchSize, namespace, nil) + + for i, write := range writes { + writeBatch.AddTagged( + i, + write.id, + write.tagIter, + write.timestamp, + write.value, + write.unit, + write.annotation) + } + + // Make sure all the data is there + assertDataPresent(t, writes, writeBatch) +} + +func TestBatchWriterSetSeries(t *testing.T) { + writeBatch := NewWriteBatch(batchSize, namespace, nil) + + for i, write := range writes { + writeBatch.AddTagged( + i, + write.id, + write.tagIter, + write.timestamp, + write.value, + write.unit, + write.annotation) + } + + // Set the outcome + iter := writeBatch.Iter() + for i, curr := range iter { + var ( + currWrite = curr.Write + currSeries = currWrite.Series + newSeries = currSeries + ) + newSeries.ID = ident.StringID(string(i)) + + var err error + if i == len(iter)-1 { + err = errors.New("some-error") + } + writeBatch.SetOutcome(i, newSeries, err) + } + + // Assert the series have been updated + iter = writeBatch.Iter() + for i, curr := range iter { + var ( + currWrite = curr.Write + currSeries = currWrite.Series + ) + require.True(t, ident.StringID(string(i)).Equal(currSeries.ID)) + if i == len(iter)-1 { + require.Equal(t, errors.New("some-error"), curr.Err) + } else { + require.NoError(t, curr.Err) + } + } +} + +func TestWriteBatchReset(t *testing.T) { + var ( + numResets = 10 + writeBatch = NewWriteBatch(batchSize, namespace, nil) + ) + + for i := 0; i < numResets; i++ { + writeBatch.Reset(batchSize, namespace) + for _, write := range writes { + writeBatch.Add( + i, + write.id, + write.timestamp, + write.value, + write.unit, + write.annotation) + } + + // Make sure all the data is there + assertDataPresent(t, writes, writeBatch) + } +} + +func assertDataPresent(t *testing.T, writes []testWrite, batchWriter WriteBatch) { + for _, write := range writes { + var ( + iter = batchWriter.Iter() + found = false + ) + + for _, currWriteBatch := range iter { + var ( + currWrite = currWriteBatch.Write + currSeries = currWrite.Series + ) + + if currSeries.ID.Equal(write.id) { + require.Equal(t, namespace, currWrite.Series.Namespace) + require.Equal(t, write.timestamp, currWrite.Datapoint.Timestamp) + require.Equal(t, write.value, currWrite.Datapoint.Value) + require.Equal(t, write.unit, currWrite.Unit) + require.True(t, bytes.Equal(write.annotation, currWrite.Annotation)) + found = true + break + } + } + + require.True(t, found, fmt.Sprintf("expected to find series: %s", write.id)) + } +}