Skip to content

Commit

Permalink
more refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Artoul committed Nov 9, 2018
1 parent c188b40 commit b647dc0
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 19 deletions.
7 changes: 4 additions & 3 deletions src/dbnode/persist/fs/commitlog/commit_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (l *commitLog) write() {
}

numWritesSuccess := int64(0)
if write.write.writeBatch.Writes == nil {
if write.write.writeBatch == nil {
// Handle individual write case
write := write.write.write
err := l.writerState.writer.Write(write.Series,
Expand All @@ -457,8 +457,9 @@ func (l *commitLog) write() {
numWritesSuccess++
} else {
// Handle write batch case
for i := 0; i < len(write.write.writeBatch.Writes); i++ {
write := write.write.writeBatch.Writes[i]
iter := write.write.writeBatch.Iter()
for iter.Next() {
write := iter.Current().Write
err := l.writerState.writer.Write(write.Series,
write.Datapoint, write.Unit, write.Annotation)
if err != nil {
Expand Down
34 changes: 26 additions & 8 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,21 @@ func (d *db) WriteTagged(
return d.commitLog.Write(ctx, series, dp, unit, annotation)
}

func (d *db) WriteTaggedBatchWriter(namespace ident.ID, batchSize int) (ts.BatchWriter, error) {
n, err := d.namespaceFor(namespace)
if err != nil {
// TODO: Fix metric
d.metrics.unknownNamespaceWriteTagged.Inc(1)
return nil, err
}

var (
nsID = n.ID()
batchWriter = ts.NewWriteBatch(batchSize, 100000, nsID, d.shardSet.Lookup)
)
return batchWriter, nil
}

func (d *db) WriteTaggedBatch(
ctx context.Context,
namespace ident.ID,
Expand All @@ -542,23 +557,26 @@ func (d *db) WriteTaggedBatch(
return err
}

for i, write := range writes {
iter := writes.Iter()
for iter.Next() {
write := iter.Current()

series, err := n.WriteTagged(
ctx,
write.Series.ID,
write.Series.TagIter,
write.Datapoint.Timestamp,
write.Datapoint.Value,
write.Unit,
write.Annotation,
write.Write.Series.ID,
write.TagIter,
write.Write.Datapoint.Timestamp,
write.Write.Datapoint.Value,
write.Write.Unit,
write.Write.Annotation,
)
if err == commitlog.ErrCommitLogQueueFull {
d.errors.Record(1)
}
if err != nil {
return err
}
writes[i].Series = series
iter.UpdateSeries(series)
}

return d.commitLog.WriteBatch(ctx, writes)
Expand Down
13 changes: 7 additions & 6 deletions src/dbnode/storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/namespace"
"github.com/m3db/m3/src/dbnode/storage/repair"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/x/metrics"
"github.com/m3db/m3x/context"
xerrors "github.com/m3db/m3x/errors"
Expand Down Expand Up @@ -154,18 +155,18 @@ func TestNamespaceWriteShardOwned(t *testing.T) {
defer ctx.Close()

id := ident.StringID("foo")
ts := time.Now()
now := time.Now()
val := 0.0
unit := xtime.Second
ant := []byte(nil)

ns, closer := newTestNamespace(t)
defer closer()
shard := NewMockdatabaseShard(ctrl)
shard.EXPECT().Write(ctx, id, ts, val, unit, ant).Return(ts.Series{}, nil)
shard.EXPECT().Write(ctx, id, now, val, unit, ant).Return(ts.Series{}, nil)
ns.shards[testShardIDs[0].ID()] = shard

_, err := ns.Write(ctx, id, ts, val, unit, ant)
_, err := ns.Write(ctx, id, now, val, unit, ant)
require.NoError(t, err)
}

Expand Down Expand Up @@ -1024,15 +1025,15 @@ func TestNamespaceIndexInsert(t *testing.T) {
defer closer()

ctx := context.NewContext()
ts := time.Now()
now := time.Now()

shard := NewMockdatabaseShard(ctrl)
shard.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator,
ts, 1.0, xtime.Second, nil).Return(ts.Series{}, nil)
now, 1.0, xtime.Second, nil).Return(ts.Series{}, nil)
ns.shards[testShardIDs[0].ID()] = shard

_, err := ns.WriteTagged(ctx, ident.StringID("a"),
ident.EmptyTagIterator, ts, 1.0, xtime.Second, nil)
ident.EmptyTagIterator, now, 1.0, xtime.Second, nil)
require.NoError(t, err)

shard.EXPECT().Close()
Expand Down
2 changes: 0 additions & 2 deletions src/dbnode/ts/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ type Series struct {
// Tags are the series tags
Tags ident.Tags

TagIter ident.TagIterator

// Shard is the shard the series belongs to
Shard uint32
}
Expand Down

0 comments on commit b647dc0

Please sign in to comment.