Skip to content

Commit

Permalink
[#31438] WindowingStrategy Plumbing + AllowedLateness + Fixing Sessio…
Browse files Browse the repository at this point in the history
…n Windows (#33542)
  • Loading branch information
lostluck authored Jan 9, 2025
1 parent a19466a commit 4fc5c86
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 167 deletions.
15 changes: 0 additions & 15 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ def sickbayTests = [
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerEarly',
'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle',
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
// Requires Allowed Lateness, among others.
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate',
'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode',
'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
Expand All @@ -116,10 +113,6 @@ def sickbayTests = [
// Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected
'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage',

// Prism not firing sessions correctly (seems to be merging inapppropriately)
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine',
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext',

// Java side dying during execution.
// https://github.com/apache/beam/issues/32930
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders',
Expand Down Expand Up @@ -161,14 +154,6 @@ def sickbayTests = [
// TODO(https://github.com/apache/beam/issues/31231)
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata',


// These tests fail once Late Data was being precisely dropped.
// They set a single element to be late data, and expect it (correctly) to be preserved.
// Since presently, these are treated as No-ops, the fix is to disable the
// dropping behavior when a stage's input is a Reshuffle/Redistribute transform.
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeWithTimestampsStreaming',

// Prism isn't handling Java's side input views properly.
// https://github.com/apache/beam/issues/32932
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.
Expand Down
44 changes: 21 additions & 23 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,10 @@ func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, side

// StageAggregates marks the given stage as an aggregation, which
// means elements will only be processed based on windowing strategies.
func (em *ElementManager) StageAggregates(ID string) {
em.stages[ID].aggregate = true
func (em *ElementManager) StageAggregates(ID string, strat WinStrat) {
ss := em.stages[ID]
ss.aggregate = true
ss.strat = strat
}

// StageStateful marks the given stage as stateful, which means elements are
Expand Down Expand Up @@ -1095,7 +1097,7 @@ type stageState struct {
// Special handling bits
stateful bool // whether this stage uses state or timers, and needs keyed processing.
aggregate bool // whether this stage needs to block for aggregation.
strat winStrat // Windowing Strategy for aggregation fireings.
strat WinStrat // Windowing Strategy for aggregation fireings.
processingTimeTimersFamilies map[string]bool // Indicates which timer families use the processing time domain.

// onWindowExpiration management
Expand Down Expand Up @@ -1154,7 +1156,6 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st
ID: ID,
outputIDs: outputIDs,
sides: sides,
strat: defaultStrat{},
state: map[LinkID]map[typex.Window]map[string]StateData{},
watermarkHolds: newHoldTracker(),

Expand All @@ -1179,18 +1180,21 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st
func (ss *stageState) AddPending(newPending []element) int {
ss.mu.Lock()
defer ss.mu.Unlock()
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Data that arrives after the *output* watermark is late.
threshold := ss.output
origPending := make([]element, 0, ss.pending.Len())
for _, e := range newPending {
if e.window.MaxTimestamp() < threshold {
continue
if ss.aggregate {
// Late Data is data that has arrived after that window has expired.
// We only need to drop late data before aggregations.
// TODO - handle for side inputs too.
threshold := ss.output
origPending := make([]element, 0, ss.pending.Len())
for _, e := range newPending {
if ss.strat.EarliestCompletion(e.window) < threshold {
// TODO: figure out Pane and trigger firings.
continue
}
origPending = append(origPending, e)
}
origPending = append(origPending, e)
newPending = origPending
}
newPending = origPending
if ss.stateful {
if ss.pendingByKeys == nil {
ss.pendingByKeys = map[string]*dataAndTimers{}
Expand Down Expand Up @@ -1626,10 +1630,8 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
// They'll never be read in again.
for _, wins := range ss.sideInputs {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Clear out anything we've already used.
if win.MaxTimestamp() < newOut {
if ss.strat.EarliestCompletion(win) < newOut {
// If the expiry is in progress, skip this window.
if ss.inProgressExpiredWindows[win] > 0 {
continue
Expand All @@ -1640,9 +1642,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
}
for _, wins := range ss.state {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp() < newOut {
if ss.strat.EarliestCompletion(win) < newOut {
// If the expiry is in progress, skip collecting this window.
if ss.inProgressExpiredWindows[win] > 0 {
continue
Expand Down Expand Up @@ -1685,9 +1685,7 @@ func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *Ele
var preventDownstreamUpdate bool
for win, keys := range ss.keysToExpireByWindow {
// Check if the window has expired.
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp() >= newOut {
if ss.strat.EarliestCompletion(win) >= newOut {
continue
}
// We can't advance the output watermark if there's garbage to collect.
Expand Down
28 changes: 8 additions & 20 deletions sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,16 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)

type winStrat interface {
EarliestCompletion(typex.Window) mtime.Time
// WinStrat configures the windowing strategy for the stage, based on the
// stage's input PCollection.
type WinStrat struct {
AllowedLateness time.Duration // Used to extend duration
}

type defaultStrat struct{}

func (ws defaultStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp()
}

func (defaultStrat) String() string {
return "default"
}

type sessionStrat struct {
GapSize time.Duration
}

func (ws sessionStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp().Add(ws.GapSize)
func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp().Add(ws.AllowedLateness)
}

func (ws sessionStrat) String() string {
return fmt.Sprintf("session[GapSize:%v]", ws.GapSize)
func (ws WinStrat) String() string {
return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
}
13 changes: 7 additions & 6 deletions sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ import (

func TestEarliestCompletion(t *testing.T) {
tests := []struct {
strat winStrat
strat WinStrat
input typex.Window
want mtime.Time
}{
{defaultStrat{}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime},
{defaultStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
{defaultStrat{}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp - 1},
{sessionStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
{sessionStrat{GapSize: 3 * time.Millisecond}, window.IntervalWindow{Start: 0, End: 4}, 6},
{WinStrat{}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime},
{WinStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
{WinStrat{}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp - 1},
{WinStrat{AllowedLateness: 5 * time.Second}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime.Add(5 * time.Second)},
{WinStrat{AllowedLateness: 5 * time.Millisecond}, window.IntervalWindow{Start: 0, End: 4}, 8},
{WinStrat{AllowedLateness: 5 * time.Second}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp.Add(5 * time.Second)},
}

for _, test := range tests {
Expand Down
9 changes: 6 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
KeyDec: kd,
}
}
em.StageAggregates(stage.ID)
ws := windowingStrategy(comps, tid)
em.StageAggregates(stage.ID, engine.WinStrat{
AllowedLateness: time.Duration(ws.GetAllowedLateness()) * time.Millisecond,
})
case urns.TransformImpulse:
impulses = append(impulses, stage.ID)
em.AddStage(stage.ID, nil, []string{getOnlyValue(t.GetOutputs())}, nil)
Expand Down Expand Up @@ -266,11 +269,11 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
case *pipepb.TestStreamPayload_Event_ElementEvent:
var elms []engine.TestStreamElement
for _, e := range ev.ElementEvent.GetElements() {
elms = append(elms, engine.TestStreamElement{Encoded: mayLP(e.GetEncodedElement()), EventTime: mtime.Time(e.GetTimestamp())})
elms = append(elms, engine.TestStreamElement{Encoded: mayLP(e.GetEncodedElement()), EventTime: mtime.FromMilliseconds(e.GetTimestamp())})
}
tsb.AddElementEvent(ev.ElementEvent.GetTag(), elms)
case *pipepb.TestStreamPayload_Event_WatermarkEvent:
tsb.AddWatermarkEvent(ev.WatermarkEvent.GetTag(), mtime.Time(ev.WatermarkEvent.GetNewWatermark()))
tsb.AddWatermarkEvent(ev.WatermarkEvent.GetTag(), mtime.FromMilliseconds(ev.WatermarkEvent.GetNewWatermark()))
case *pipepb.TestStreamPayload_Event_ProcessingTimeEvent:
if ev.ProcessingTimeEvent.GetAdvanceDuration() == int64(mtime.MaxTimestamp) {
// TODO: Determine the SDK common formalism for setting processing time to infinity.
Expand Down
Loading

0 comments on commit 4fc5c86

Please sign in to comment.