Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Artoul committed Nov 6, 2018
1 parent bb6a808 commit a22e1bf
Showing 1 changed file with 20 additions and 36 deletions.
56 changes: 20 additions & 36 deletions src/dbnode/persist/fs/commitlog/commit_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ var (

type WritesBatch []Write
type Write struct {
series Series
datapoint ts.Datapoint
unit xtime.Unit
annotation ts.Annotation
Series Series
Datapoint ts.Datapoint
Unit xtime.Unit
Annotation ts.Annotation
}

type newCommitLogWriterFn func(
Expand All @@ -60,10 +60,7 @@ type newCommitLogWriterFn func(

type writeCommitLogFn func(
ctx context.Context,
series Series,
datapoint ts.Datapoint,
unit xtime.Unit,
annotation ts.Annotation,
writes WritesBatch,
) error

type commitLogFailFn func(err error)
Expand Down Expand Up @@ -449,8 +446,8 @@ func (l *commitLog) write() {

for i := 0; i < len(write.writesBatch); i++ {
write := write.writesBatch[i]
err := l.writerState.writer.Write(write.series,
write.datapoint, write.unit, write.annotation)
err := l.writerState.writer.Write(write.Series,
write.Datapoint, write.Unit, write.Annotation)
if err != nil {
l.metrics.errors.Inc(1)
l.log.Errorf("failed to write to commit log: %v", err)
Expand Down Expand Up @@ -541,15 +538,19 @@ func (l *commitLog) Write(
unit xtime.Unit,
annotation ts.Annotation,
) error {
return l.writeFn(ctx, series, datapoint, unit, annotation)
return l.writeFn(ctx, WritesBatch{
Write{
Series: series,
Datapoint: datapoint,
Unit: unit,
Annotation: annotation,
},
})
}

func (l *commitLog) writeWait(
ctx context.Context,
series Series,
datapoint ts.Datapoint,
unit xtime.Unit,
annotation ts.Annotation,
writes WritesBatch,
) error {
l.closedState.RLock()
if l.closedState.closed {
Expand All @@ -570,15 +571,8 @@ func (l *commitLog) writeWait(
}

write := commitLogWrite{
writesBatch: []Write{
{
series: series,
datapoint: datapoint,
unit: unit,
annotation: annotation,
},
},
callbackFn: completion,
writesBatch: writes,
callbackFn: completion,
}

enqueued := false
Expand All @@ -602,10 +596,7 @@ func (l *commitLog) writeWait(

func (l *commitLog) writeBehind(
ctx context.Context,
series Series,
datapoint ts.Datapoint,
unit xtime.Unit,
annotation ts.Annotation,
writes WritesBatch,
) error {
l.closedState.RLock()
if l.closedState.closed {
Expand All @@ -614,14 +605,7 @@ func (l *commitLog) writeBehind(
}

write := commitLogWrite{
writesBatch: []Write{
{
series: series,
datapoint: datapoint,
unit: unit,
annotation: annotation,
},
},
writesBatch: writes,
}

enqueued := false
Expand Down

0 comments on commit a22e1bf

Please sign in to comment.