Skip to content

Commit

Permalink
fix(window): sliding delay time (#3424)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying authored Dec 5, 2024
1 parent 6a2d600 commit 36d6ecf
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
6 changes: 4 additions & 2 deletions internal/topo/node/window_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
if o.isMatchCondition(ctx, d) {
if o.window.Delay > 0 {
go func(ts time.Time) {
after := time.After(o.window.Delay * time.Millisecond)
after := timex.After(o.window.Delay)
select {
case <-after:
delayCh <- ts
Expand All @@ -358,7 +358,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
}
} else {
// clear inputs if condition not matched
inputs = o.gcInputs(inputs, d.Timestamp, ctx)
inputs = o.gcInputs(inputs, d.Timestamp.Add(o.window.Delay), ctx)
}
case ast.SESSION_WINDOW:
if timeoutTicker != nil {
Expand Down Expand Up @@ -602,12 +602,14 @@ func (o *WindowOperator) handleInputs(ctx api.StreamContext, inputs []*xsql.Tupl
if nextleft < 0 {
return inputs[:0], inputs, content
}
ctx.GetLogger().Debugf("discard before %d", nextleft)
return inputs[nextleft:], inputs[:nextleft], content
}

func (o *WindowOperator) gcInputs(inputs []*xsql.Tuple, triggerTime time.Time, ctx api.StreamContext) []*xsql.Tuple {
var discard []*xsql.Tuple
inputs, discard, _ = o.handleInputs(ctx, inputs, triggerTime)
ctx.GetLogger().Debugf("after scan %v", inputs)
o.handleTraceDiscardTuple(ctx, discard)
return inputs
}
Expand Down
25 changes: 25 additions & 0 deletions internal/topo/topotest/window_rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,31 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_out_total": int64(5),
},
},
{
Name: `TestSlidingDelay`,
Sql: `SELECT size,color FROM demo GROUP BY SlidingWindow(ss, 5, 1) Over (when size = 2)`,
R: [][]map[string]interface{}{
{
{
"size": 3,
"color": "red",
},
{
"size": 6,
"color": "blue",
},
{
"size": 2,
"color": "blue",
},
{
"size": 4,
"color": "yellow",
},
},
},
M: map[string]interface{}{},
},
}
HandleStream(true, streamList, t)
options := []*def.RuleOption{
Expand Down

0 comments on commit 36d6ecf

Please sign in to comment.