Skip to content

Commit

Permalink
[dbnode] Fail M3TSZ encoding on DeltaOfDelta overflow (#3329)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Mar 9, 2021
1 parent 5875a94 commit 5c30780
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 2 deletions.
166 changes: 166 additions & 0 deletions src/dbnode/encoding/m3tsz/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,3 +539,169 @@ func testMultiplePasses(t *testing.T, test multiplePassesTest) {
}
}
}

func TestEncoderFailOnDeltaOfDeltaOverflow(t *testing.T) {
tests := []struct {
name string
delta time.Duration
units xtime.Unit
positiveErrMsg string
negativeErrMsg string
}{
{
name: "seconds, short gap",
delta: time.Hour,
units: xtime.Second,
},
{
name: "seconds, huge gap",
delta: 25 * 24 * time.Hour,
units: xtime.Second,
},
{
name: "seconds, too big gap",
delta: 1000 * 25 * 24 * time.Hour, // more than 2^31 s
units: xtime.Second,
positiveErrMsg: "deltaOfDelta value 2160000000 s overflows 32 bits",
negativeErrMsg: "deltaOfDelta value -2159999999 s overflows 32 bits",
},
{
name: "milliseconds, short gap",
delta: time.Hour,
units: xtime.Millisecond,
},
{
name: "milliseconds, almost too big gap",
delta: 24 * 24 * time.Hour, // slightly less than 2^31 ms
units: xtime.Millisecond,
},
{
name: "milliseconds, too big gap",
delta: 25 * 24 * time.Hour, // more than 2^31 ms
units: xtime.Millisecond,
positiveErrMsg: "deltaOfDelta value 2160000000 ms overflows 32 bits",
negativeErrMsg: "deltaOfDelta value -2159999999 ms overflows 32 bits",
},
{
name: "microseconds, short gap",
delta: time.Hour,
units: xtime.Microsecond,
},
{
name: "microseconds, huge gap",
delta: 25 * 24 * time.Hour,
units: xtime.Microsecond,
},
{
name: "nanoseconds, short gap",
delta: time.Hour,
units: xtime.Nanosecond,
},
{
name: "nanoseconds, huge gap",
delta: 25 * 24 * time.Hour,
units: xtime.Nanosecond,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testPositiveDeltaOfDelta(t, tt.delta, tt.units, tt.positiveErrMsg)
testNegativeDeltaOfDelta(t, tt.delta, tt.units, tt.negativeErrMsg)
})
}
}

func testPositiveDeltaOfDelta(t *testing.T, delta time.Duration, units xtime.Unit, expectedErrMsg string) {
t.Helper()

ctx := context.NewBackground()
defer ctx.Close()

enc := getTestEncoder(testStartTime)

dp1 := dp(testStartTime, 1)
dp2 := dp(testStartTime.Add(delta), 2)

err := enc.Encode(dp1, units, nil)
require.NoError(t, err)

err = enc.Encode(dp2, units, nil)
if expectedErrMsg != "" {
require.EqualError(t, err, expectedErrMsg)
return
}
require.NoError(t, err)

dec := NewDecoder(enc.intOptimized, enc.opts)
stream, ok := enc.Stream(ctx)
require.True(t, ok)

it := dec.Decode(stream)
defer it.Close()

decodeAndCheck(t, it, dp1, units)
decodeAndCheck(t, it, dp2, units)

require.False(t, it.Next())
require.NoError(t, it.Err())
}

func testNegativeDeltaOfDelta(t *testing.T, delta time.Duration, units xtime.Unit, expectedErrMsg string) {
t.Helper()

ctx := context.NewBackground()
defer ctx.Close()

oneUnit, err := units.Value()
require.NoError(t, err)

enc := getTestEncoder(testStartTime)

dps := []ts.Datapoint{
dp(testStartTime, 1),
dp(testStartTime.Add(delta/2), 2),
dp(testStartTime.Add(delta/2+delta), 3),
dp(testStartTime.Add(delta/2+delta+oneUnit), 4), // DoD = oneUnit - delta
}

for i, dp := range dps {
err = enc.Encode(dp, units, nil)
if i == 3 && expectedErrMsg != "" {
require.EqualError(t, err, expectedErrMsg)
return
}
require.NoError(t, err)
}

dec := NewDecoder(enc.intOptimized, enc.opts)
stream, ok := enc.Stream(ctx)
require.True(t, ok)

it := dec.Decode(stream)
defer it.Close()

for _, dp := range dps {
decodeAndCheck(t, it, dp, units)
}

require.False(t, it.Next())
require.NoError(t, it.Err())
}

func dp(timestamp time.Time, value float64) ts.Datapoint {
return ts.Datapoint{
Timestamp: timestamp,
TimestampNanos: xtime.ToUnixNano(timestamp),
Value: value,
}
}

func decodeAndCheck(t *testing.T, it encoding.ReaderIterator, dp ts.Datapoint, units xtime.Unit) {
t.Helper()

require.True(t, it.Next())
decodedDP, decodedUnits, _ := it.Current()
assert.Equal(t, dp, decodedDP)
assert.Equal(t, units, decodedUnits)
}
10 changes: 10 additions & 0 deletions src/dbnode/encoding/m3tsz/timestamp_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ func (enc *TimestampEncoder) writeDeltaOfDeltaTimeUnitUnchanged(
}

deltaOfDelta := xtime.ToNormalizedDuration(curDelta-prevDelta, u)
if timeUnit == xtime.Millisecond || timeUnit == xtime.Second {
// Only milliseconds and seconds are encoded using
// up to 32 bits (see defaultTimeEncodingSchemes).
dod32 := int32(deltaOfDelta)
if int64(dod32) != deltaOfDelta {
return fmt.Errorf(
"deltaOfDelta value %d %s overflows 32 bits", deltaOfDelta, timeUnit)
}
}

tes, exists := enc.Options.TimeEncodingSchemes().SchemeForUnit(timeUnit)
if !exists {
return fmt.Errorf("time encoding scheme for time unit %v doesn't exist", timeUnit)
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/m3tsz/timestamp_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (it *TimestampIterator) readNextTimestamp(stream *encoding.IStream) error {
}

it.PrevTimeDelta += dod
it.PrevTime = it.PrevTime + xtime.UnixNano(it.PrevTimeDelta)
it.PrevTime += xtime.UnixNano(it.PrevTimeDelta)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion src/x/time/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (tu Unit) Value() (time.Duration, error) {
if tu < 1 || int(tu) >= unitCount {
return 0, errUnrecognizedTimeUnit
}
return time.Duration(unitsToDuration[tu]), nil
return unitsToDuration[tu], nil
}

// Count returns the number of units contained within the duration.
Expand Down

0 comments on commit 5c30780

Please sign in to comment.