Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer committed Jan 13, 2025
1 parent 0c6f4a0 commit 3263a4a
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions internal/topo/node/window_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,11 +608,21 @@ func (o *WindowOperator) handleInputs(ctx api.StreamContext, inputs []*xsql.Tupl
}

func (o *WindowOperator) gcInputs(inputs []*xsql.Tuple, triggerTime time.Time, ctx api.StreamContext) []*xsql.Tuple {
var discard []*xsql.Tuple
inputs, discard, _ = o.handleInputs(ctx, inputs, triggerTime)
ctx.GetLogger().Debugf("after scan %v", inputs)
o.handleTraceDiscardTuple(ctx, discard)
return inputs
length := o.window.Length + o.window.Delay
gcIndex := -1
for i, tuple := range inputs {
if tuple.Timestamp.Add(length).Compare(triggerTime) >= 0 {
break
}
gcIndex = i
}
if gcIndex == len(inputs)-1 {
return inputs[:0]
}

Check warning on line 621 in internal/topo/node/window_op.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/node/window_op.go#L620-L621

Added lines #L620 - L621 were not covered by tests
if gcIndex == -1 {
return inputs
}
return inputs[gcIndex+1:]
}

func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime time.Time, ctx api.StreamContext) []*xsql.Tuple {
Expand Down

0 comments on commit 3263a4a

Please sign in to comment.