Skip to content

Commit

Permalink
[apache#32929] Add initial OrderedListState impl.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Dec 12, 2024
1 parent de5e8eb commit babd852
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 7 deletions.
4 changes: 0 additions & 4 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,6 @@ def createPrismValidatesRunnerTask = { name, environmentType ->
excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService'
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'

// Not yet implemented in Prism
// https://github.com/apache/beam/issues/32929
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'

// Not supported in Portable Java SDK yet.
// https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
Expand Down
96 changes: 96 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ package engine

import (
"bytes"
"cmp"
"fmt"
"log/slog"
"slices"
"sort"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"google.golang.org/protobuf/encoding/protowire"
)

// StateData is a "union" between Bag state and MultiMap state to increase common code.
Expand Down Expand Up @@ -220,3 +224,95 @@ func (d *TentativeData) ClearMultimapKeysState(stateID LinkID, wKey, uKey []byte
kmap[string(uKey)] = StateData{}
slog.Debug("State() MultimapKeys.Clear", slog.Any("StateID", stateID), slog.Any("UserKey", uKey), slog.Any("WindowKey", wKey))
}

// OrderedListState from the Java SDK is encoded as KVs with varint encoded millis
// Followed by the value. This is *not* the standard TimestampValueCoder encoding, which
// uses a big-endian long as a suffix to the value.
//
// Next, is we need to parse out the individual values from the data blob anyway.
// So we probably can't cheekily re-use the BagState like it currently is, even
// after fixing the timestamp issue.
//
// Currently assuming the Value is length prefixed, which isn't always going to
// be true for ints, bools, bytes, and other "known" coders.s

// AppendOrderedListState appends the incoming timestamped data to the existing tentative data bundle.
// Assumes the data is TimestampedValue encoded, which has a BigEndian int64 suffixed to the data.
// This means we may always use the last 8 bytes to determine the value sorting.
//
// The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (d *TentativeData) AppendOrderedListState(stateID LinkID, wKey, uKey []byte, data []byte) {
kmap := d.appendState(stateID, wKey)
var datums [][]byte

// Lets assume two length prefixed things in the KV.
for i := 0; i < len(data); {
_, tn := protowire.ConsumeVarint(data[i:])

// We need to fix that value to get the correct width
// of the bytes.
n, vn := protowire.ConsumeVarint(data[i+tn:])
prev := i
i += tn + vn + int(n)
datums = append(datums, data[prev:i])
}

s := StateData{Bag: append(kmap[string(uKey)].Bag, datums...)}
sort.SliceStable(s.Bag, func(i, j int) bool {
vi := s.Bag[i]
vj := s.Bag[j]
return compareTimestampSuffixes(vi, vj)
})
kmap[string(uKey)] = s
slog.Debug("State() OrderedList.Append", slog.Any("StateID", stateID), slog.Any("UserKey", uKey), slog.Any("Window", wKey), slog.Any("NewData", s))
}

func compareTimestampSuffixes(vi, vj []byte) bool {
// TODO FIX TO EXTRACT VARINTS
ims, _ := protowire.ConsumeVarint(vi)
jms, _ := protowire.ConsumeVarint(vj)
return (int64(ims)) < (int64(jms))
}

// GetOrderedListState available state from the tentative bundle data.
// The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (d *TentativeData) GetOrderedListState(stateID LinkID, wKey, uKey []byte, start, end int64) [][]byte {
winMap := d.state[stateID]
w := d.toWindow(wKey)
data := winMap[w][string(uKey)]

lo, hi := findRange(data.Bag, start, end)
slog.Debug("State() OrderedList.Get", slog.Any("StateID", stateID), slog.Any("UserKey", uKey), slog.Any("Window", wKey), slog.Group("range", slog.Int64("start", start), slog.Int64("end", end)), slog.Group("outrange", slog.Int("lo", lo), slog.Int("hi", hi)), slog.Any("Data", data.Bag[lo:hi]))
return data.Bag[lo:hi]
}

func cmpSuffix(vs [][]byte, target int64) func(i int) int {
return func(i int) int {
v := vs[i]
ims, _ := protowire.ConsumeVarint(v)
tvsbi := cmp.Compare(target, int64(ims))
slog.Debug("cmpSuffix", "target", target, "bi", ims, "tvsbi", tvsbi)
return tvsbi
}
}

func findRange(bag [][]byte, start, end int64) (int, int) {
lo, _ := sort.Find(len(bag), cmpSuffix(bag, start))
hi, _ := sort.Find(len(bag), cmpSuffix(bag, end))
return lo, hi
}

func (d *TentativeData) ClearOrderedListState(stateID LinkID, wKey, uKey []byte, start, end int64) {
winMap := d.state[stateID]
w := d.toWindow(wKey)
kMap := winMap[w]
data := kMap[string(uKey)]

lo, hi := findRange(data.Bag, start, end)
slog.Debug("State() OrderedList.Clear", slog.Any("StateID", stateID), slog.Any("UserKey", uKey), slog.Any("Window", wKey), slog.Group("range", slog.Int64("start", start), slog.Int64("end", end)), "lo", lo, "hi", hi, slog.Any("PreClearData", data.Bag))

cleared := slices.Delete(data.Bag, lo, hi)
// Zero the current entry to clear.
// Delete makes it difficult to delete the persisted stage state for the key.
kMap[string(uKey)] = StateData{Bag: cleared}
}
36 changes: 36 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package engine

import (
"encoding/binary"
"math"
"testing"
)

func TestCompareTimestampSuffixes(t *testing.T) {
t.Run("simple", func(t *testing.T) {
loI := int64(math.MinInt64)
hiI := int64(math.MaxInt64)

loB := binary.BigEndian.AppendUint64(nil, uint64(loI))
hiB := binary.BigEndian.AppendUint64(nil, uint64(hiI))

if compareTimestampSuffixes(loB, hiB) != (loI < hiI) {
t.Errorf("lo vs Hi%v < %v: bytes %v vs %v, %v %v", loI, hiI, loB, hiB, loI < hiI, compareTimestampSuffixes(loB, hiB))
}
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ *
// Validate all the state features
for _, spec := range pardo.GetStateSpecs() {
isStateful = true
check("StateSpec.Protocol.Urn", spec.GetProtocol().GetUrn(), urns.UserStateBag, urns.UserStateMultiMap)
check("StateSpec.Protocol.Urn", spec.GetProtocol().GetUrn(),
urns.UserStateBag, urns.UserStateMultiMap, urns.UserStateOrderedList)
}
// Validate all the timer features
for _, spec := range pardo.GetTimerFamilySpecs() {
Expand Down
5 changes: 3 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/urns/urns.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ var (
SideInputMultiMap = siUrn(pipepb.StandardSideInputTypes_MULTIMAP)

// UserState kinds
UserStateBag = usUrn(pipepb.StandardUserStateTypes_BAG)
UserStateMultiMap = usUrn(pipepb.StandardUserStateTypes_MULTIMAP)
UserStateBag = usUrn(pipepb.StandardUserStateTypes_BAG)
UserStateMultiMap = usUrn(pipepb.StandardUserStateTypes_MULTIMAP)
UserStateOrderedList = usUrn(pipepb.StandardUserStateTypes_ORDERED_LIST)

// WindowsFns
WindowFnGlobal = quickUrn(pipepb.GlobalWindowsPayload_PROPERTIES)
Expand Down
14 changes: 14 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,11 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) error {
case *fnpb.StateKey_MultimapKeysUserState_:
mmkey := key.GetMultimapKeysUserState()
data = b.OutputData.GetMultimapKeysState(engine.LinkID{Transform: mmkey.GetTransformId(), Local: mmkey.GetUserStateId()}, mmkey.GetWindow(), mmkey.GetKey())
case *fnpb.StateKey_OrderedListUserState_:
olkey := key.GetOrderedListUserState()
data = b.OutputData.GetOrderedListState(
engine.LinkID{Transform: olkey.GetTransformId(), Local: olkey.GetUserStateId()},
olkey.GetWindow(), olkey.GetKey(), olkey.GetRange().GetStart(), olkey.GetRange().GetEnd())
default:
panic(fmt.Sprintf("unsupported StateKey Get type: %T: %v", key.GetType(), prototext.Format(key)))
}
Expand All @@ -578,6 +583,11 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) error {
case *fnpb.StateKey_MultimapUserState_:
mmkey := key.GetMultimapUserState()
b.OutputData.AppendMultimapState(engine.LinkID{Transform: mmkey.GetTransformId(), Local: mmkey.GetUserStateId()}, mmkey.GetWindow(), mmkey.GetKey(), mmkey.GetMapKey(), req.GetAppend().GetData())
case *fnpb.StateKey_OrderedListUserState_:
olkey := key.GetOrderedListUserState()
b.OutputData.AppendOrderedListState(
engine.LinkID{Transform: olkey.GetTransformId(), Local: olkey.GetUserStateId()},
olkey.GetWindow(), olkey.GetKey(), req.GetAppend().GetData())
default:
panic(fmt.Sprintf("unsupported StateKey Append type: %T: %v", key.GetType(), prototext.Format(key)))
}
Expand All @@ -601,6 +611,10 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) error {
case *fnpb.StateKey_MultimapKeysUserState_:
mmkey := key.GetMultimapUserState()
b.OutputData.ClearMultimapKeysState(engine.LinkID{Transform: mmkey.GetTransformId(), Local: mmkey.GetUserStateId()}, mmkey.GetWindow(), mmkey.GetKey())
case *fnpb.StateKey_OrderedListUserState_:
olkey := key.GetOrderedListUserState()
b.OutputData.ClearOrderedListState(engine.LinkID{Transform: olkey.GetTransformId(), Local: olkey.GetUserStateId()},
olkey.GetWindow(), olkey.GetKey(), olkey.GetRange().GetStart(), olkey.GetRange().GetEnd())
default:
panic(fmt.Sprintf("unsupported StateKey Clear type: %T: %v", key.GetType(), prototext.Format(key)))
}
Expand Down

0 comments on commit babd852

Please sign in to comment.