diff --git a/src/dbnode/persist/fs/commitlog/commit_log.go b/src/dbnode/persist/fs/commitlog/commit_log.go index 12c338aee0..e49a81bdef 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log.go +++ b/src/dbnode/persist/fs/commitlog/commit_log.go @@ -445,7 +445,7 @@ func (l *commitLog) write() { } numWritesSuccess := int64(0) - if write.write.writeBatch.Writes == nil { + if write.write.writeBatch == nil { // Handle individual write case write := write.write.write err := l.writerState.writer.Write(write.Series, @@ -457,8 +457,9 @@ func (l *commitLog) write() { numWritesSuccess++ } else { // Handle write batch case - for i := 0; i < len(write.write.writeBatch.Writes); i++ { - write := write.write.writeBatch.Writes[i] + iter := write.write.writeBatch.Iter() + for iter.Next() { + write := iter.Current().Write err := l.writerState.writer.Write(write.Series, write.Datapoint, write.Unit, write.Annotation) if err != nil { diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index a57e9d118d..df162f2be9 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -531,6 +531,21 @@ func (d *db) WriteTagged( return d.commitLog.Write(ctx, series, dp, unit, annotation) } +func (d *db) WriteTaggedBatchWriter(namespace ident.ID, batchSize int) (ts.BatchWriter, error) { + n, err := d.namespaceFor(namespace) + if err != nil { + // TODO: Fix metric + d.metrics.unknownNamespaceWriteTagged.Inc(1) + return nil, err + } + + var ( + nsID = n.ID() + batchWriter = ts.NewWriteBatch(batchSize, 100000, nsID, d.shardSet.Lookup) + ) + return batchWriter, nil +} + func (d *db) WriteTaggedBatch( ctx context.Context, namespace ident.ID, @@ -542,15 +557,18 @@ func (d *db) WriteTaggedBatch( return err } - for i, write := range writes { + iter := writes.Iter() + for iter.Next() { + write := iter.Current() + series, err := n.WriteTagged( ctx, - write.Series.ID, - write.Series.TagIter, - write.Datapoint.Timestamp, - write.Datapoint.Value, - write.Unit, - write.Annotation, + write.Write.Series.ID, + write.TagIter, + write.Write.Datapoint.Timestamp, + write.Write.Datapoint.Value, + write.Write.Unit, + write.Write.Annotation, ) if err == commitlog.ErrCommitLogQueueFull { d.errors.Record(1) @@ -558,7 +576,7 @@ func (d *db) WriteTaggedBatch( if err != nil { return err } - writes[i].Series = series + iter.UpdateSeries(series) } return d.commitLog.WriteBatch(ctx, writes) diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index fa0a806790..93d02b5b5d 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -37,6 +37,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/dbnode/storage/repair" + "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/metrics" "github.com/m3db/m3x/context" xerrors "github.com/m3db/m3x/errors" @@ -154,7 +155,7 @@ func TestNamespaceWriteShardOwned(t *testing.T) { defer ctx.Close() id := ident.StringID("foo") - ts := time.Now() + now := time.Now() val := 0.0 unit := xtime.Second ant := []byte(nil) @@ -162,10 +163,10 @@ func TestNamespaceWriteShardOwned(t *testing.T) { ns, closer := newTestNamespace(t) defer closer() shard := NewMockdatabaseShard(ctrl) - shard.EXPECT().Write(ctx, id, ts, val, unit, ant).Return(ts.Series{}, nil) + shard.EXPECT().Write(ctx, id, now, val, unit, ant).Return(ts.Series{}, nil) ns.shards[testShardIDs[0].ID()] = shard - _, err := ns.Write(ctx, id, ts, val, unit, ant) + _, err := ns.Write(ctx, id, now, val, unit, ant) require.NoError(t, err) } @@ -1024,15 +1025,15 @@ func TestNamespaceIndexInsert(t *testing.T) { defer closer() ctx := context.NewContext() - ts := time.Now() + now := time.Now() shard := NewMockdatabaseShard(ctrl) shard.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator, - ts, 1.0, xtime.Second, nil).Return(ts.Series{}, nil) + now, 1.0, xtime.Second, nil).Return(ts.Series{}, nil) ns.shards[testShardIDs[0].ID()] = shard _, err := ns.WriteTagged(ctx, ident.StringID("a"), - ident.EmptyTagIterator, ts, 1.0, xtime.Second, nil) + ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) require.NoError(t, err) shard.EXPECT().Close() diff --git a/src/dbnode/ts/types.go b/src/dbnode/ts/types.go index 7601bc4ccd..6a5b423139 100644 --- a/src/dbnode/ts/types.go +++ b/src/dbnode/ts/types.go @@ -49,8 +49,6 @@ type Series struct { // Tags are the series tags Tags ident.Tags - TagIter ident.TagIterator - // Shard is the shard the series belongs to Shard uint32 }