-
Notifications
You must be signed in to change notification settings - Fork 97
Some food-for-thought for supporting Key-Value API, this shows we can… #143
The head ref may contain hidden characters: "wip-kv-api-\u221A"
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,243 @@ | ||
package io.cloudstate.javasupport; | ||
|
||
import java.util.Optional; | ||
import java.lang.Comparable; | ||
import java.util.function.Function; | ||
import static java.util.Objects.requireNonNull; | ||
import static java.util.Collections.singletonMap; | ||
import akka.util.ByteString; | ||
|
||
import io.cloudstate.keyvalue.KeyValue.KVEntityOrBuilder; | ||
import io.cloudstate.keyvalue.KeyValue.KVEntity; | ||
import io.cloudstate.keyvalue.KeyValue.KVModification; | ||
import io.cloudstate.keyvalue.KeyValue.KVModificationOrBuilder; | ||
|
||
import io.cloudstate.javasupport.EntityId; | ||
import io.cloudstate.javasupport.eventsourced.*; | ||
|
||
import static java.nio.charset.StandardCharsets.UTF_8; | ||
|
||
public final class KeyValue { | ||
public static final class Key<T> implements Comparable<Key<?>> { | ||
private final String _name; | ||
private final Function<ByteString, T> _reader; | ||
private final Function<T, ByteString> _writer; | ||
|
||
Key( | ||
final String name, | ||
final Function<ByteString, T> reader, | ||
final Function<T, ByteString> writer) { | ||
this._name = name; | ||
this._reader = reader; | ||
this._writer = writer; | ||
} | ||
|
||
public final String name() { | ||
return this._name; | ||
} | ||
|
||
public final Function<ByteString, T> reader() { | ||
return this._reader; | ||
} | ||
|
||
public final Function<T, ByteString> writer() { | ||
return this._writer; | ||
} | ||
|
||
@Override | ||
public final boolean equals(final Object that) { | ||
if (this == that) return true; | ||
else if (that instanceof Key) return ((Key<T>) that).name().equals(this.name()); | ||
else return false; | ||
} | ||
|
||
@Override | ||
public final int hashCode() { | ||
return 403 + name().hashCode(); | ||
} | ||
|
||
@Override | ||
public final int compareTo(final Key<?> other) { | ||
return name().compareTo(other.name()); | ||
} | ||
} | ||
|
||
public static final <T> Key<T> keyOf( | ||
final String name, | ||
final Function<ByteString, T> reader, | ||
final Function<T, ByteString> writer) { | ||
return new Key<T>( | ||
requireNonNull(name, "Key name cannot be null"), | ||
requireNonNull(reader, "Key reader cannot be null"), | ||
requireNonNull(writer, "Key writer cannot be null")); | ||
} | ||
|
||
public static final class Map { | ||
/* | ||
TODO document invariants | ||
*/ | ||
private final java.util.Map<String, ByteString> unparsed; | ||
private final java.util.Map<Key<?>, Object> updated = new java.util.TreeMap<Key<?>, Object>(); | ||
private final java.util.Set<String> removed = new java.util.TreeSet<String>(); | ||
|
||
Map(final java.util.Map<String, ByteString> initial) { | ||
this.unparsed = requireNonNull(initial, "Map initial values must not be null"); | ||
} | ||
|
||
public Map() { | ||
this.unparsed = new java.util.TreeMap<String, ByteString>(); | ||
} | ||
|
||
public final <T> Optional<T> get(final Key<T> key) { | ||
final T value = (T) updated.get(key); | ||
if (value != null) { | ||
return Optional.of(value); | ||
} else { | ||
final ByteString bytes = unparsed.get(key.name()); | ||
if (bytes != null) { | ||
final T parsed = key.reader().apply(bytes); | ||
requireNonNull(parsed, "Key reader not allowed to read `null`"); | ||
updated.put((Key<Object>) key, parsed); | ||
return Optional.of(parsed); | ||
} else { | ||
return Optional.empty(); | ||
} | ||
} | ||
} | ||
|
||
public final <T> void set(final Key<T> key, final T value) { | ||
requireNonNull(key, "Map key must not be null"); | ||
requireNonNull(value, "Map value must not be null"); | ||
updated.put(key, value); | ||
unparsed.remove(key.name()); | ||
removed.remove(key.name()); | ||
} | ||
|
||
public final <T> boolean remove(final Key<T> key) { | ||
requireNonNull(key, "Map key must not be null"); | ||
if (!removed.contains(key.name()) | ||
&& (updated.remove(key) != null | ||
| unparsed.remove(key.name()) != null)) { // Yes, you read that strict-or right | ||
removed.add(key.name()); | ||
return true; | ||
} else { | ||
return false; | ||
} | ||
} | ||
|
||
final KVEntity toProto() { | ||
final KVEntity.Builder builder = KVEntity.newBuilder(); | ||
unparsed.forEach( | ||
(k, v) -> { | ||
builder.putEntries(k, com.google.protobuf.ByteString.copyFrom(v.asByteBuffer())); | ||
}); | ||
updated.forEach( | ||
(k, v) -> { | ||
// Skip as this item is only a parsed un-changed item, and we already added those in the | ||
// previous step | ||
if (!unparsed.containsKey(k.name())) { | ||
builder.putEntries( | ||
k.name(), | ||
com.google.protobuf.ByteString.copyFrom( | ||
((Key<Object>) k).writer().apply(v).asByteBuffer())); | ||
} | ||
}); | ||
return builder.build(); | ||
} | ||
|
||
final KVModification toProtoModification() { | ||
final KVModification.Builder builder = KVModification.newBuilder(); | ||
updated.forEach( | ||
(k, v) -> { | ||
// Skip those which remain as unparsed, as they have not been changed | ||
if (!unparsed.containsKey(k.name())) { | ||
builder.putUpdatedEntries( | ||
k.name(), | ||
com.google.protobuf.ByteString.copyFrom( | ||
((Key<Object>) k).writer().apply(v).asByteBuffer())); | ||
} | ||
}); | ||
removed.forEach(builder::addRemovedKeys); | ||
return builder.build(); | ||
} | ||
|
||
final void resetTo(KVEntity entityState) { | ||
unparsed.clear(); | ||
updated.clear(); | ||
removed.clear(); | ||
entityState | ||
.getEntriesMap() | ||
.forEach( | ||
(k, v) -> | ||
unparsed.put( | ||
k, | ||
v.isEmpty() | ||
? ByteString.empty() | ||
: akka.util.ByteString.fromArrayUnsafe(v.toByteArray()))); | ||
} | ||
|
||
final void applyModification(KVModificationOrBuilder modification) { | ||
// Apply new modifications to the base unparsed values | ||
modification | ||
.getUpdatedEntriesMap() | ||
.forEach( | ||
(k, v) -> | ||
unparsed.put( | ||
k, | ||
v.isEmpty() | ||
? ByteString.empty() | ||
: akka.util.ByteString.fromArrayUnsafe(v.toByteArray()))); | ||
|
||
modification.getRemovedKeysList().forEach(unparsed::remove); | ||
} | ||
} | ||
|
||
@EventSourcedEntity | ||
public abstract static class KeyValueEntity { | ||
private final Map state = new Map(); | ||
|
||
protected Map state() { | ||
return state; | ||
} | ||
|
||
@Snapshot | ||
public KVEntityOrBuilder snapshot() { | ||
return state.toProto(); | ||
} | ||
|
||
@SnapshotHandler | ||
public void handleSnapshot(final KVEntity entityState) { | ||
state.resetTo(entityState); | ||
} | ||
|
||
@EventHandler | ||
public void kVModification(final KVModification modification) { | ||
state.applyModification(modification); | ||
} | ||
} | ||
|
||
public static class YourCommandType {} | ||
|
||
// We'll most likely want to add a KeyValueEntity annotation instead | ||
public static class YourClass extends KeyValueEntity { | ||
private static final Key<String> name = | ||
keyOf("name", ByteString::utf8String, ByteString::fromString); | ||
|
||
// We'll most likely want to add a specific CommandContext for KeyValueEntity | ||
// so you'd instead of state().set(…) would do ctx.setState(…), and then it'd be automatically | ||
// flushing ctx.emit(state.toProtoModification()) if there are any changes | ||
@CommandHandler | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jroper What do you think about the comments above, James? Do you think this is something we could put together rather quickly? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like that idea. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jroper ping :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @viktorklang Any progress here? This seems to be stopped for a long time There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sleipnir No, unfortunately not. I guess the question is: Should we go for an API-only approach or support KV natively in the proxy as a different way of storing data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The CRUD support will provide the underlying native KV support discussed in this thread, and @ralphlaude is making progress there. (Side note: not sure if "CRUD" entities is the best name, maybe we can improve the naming there too.) Until that's complete, language supports could also experiment with including a Also agree that it would be good to try out some use cases, to see how useful a KV API like in this PR might be, since this would need to be added to all the language supports. This currently gives a more generic Map-like API in place of defining protobuf message objects directly (which can also be map-like as needed). The shopping cart samples also use a map for state — would that example be improved with a generic key-value API, compared with the types it uses now for event sourcing? In these kinds of examples, would just setting a map as the value (and using CRUD support) already be clean enough? Entity services themselves provide a KV-style store, keyed by entity id, and can be backed by event sourced, or the upcoming CRUD for a simple set-value approach, or using CRDTs for distributed, in-memory, eventually consistent storage. Within an entity the state can be a map (be key-value-based) like in this PR. And then there's the concept of a KV store that's available across Entity or Action instances — like the Cloudflare Workers KV, which is an eventually-consistent LWW (last writer wins) key-value store distributed across the worker nodes. For Cloudstate, the pattern that we've demonstrated that's similar to this is having multiple entity types. Like the "hot items" CRDT entity used alongside the shopping cart. We have forwarding and side effects to combine entities. Maybe we should look at supporting an "ask pattern", an async get on another entity type, where values can be fetched from other entity types and then continue processing with the responses. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jroper and I discussed this further. Some things we discussed: We don't currently see clear use cases for a key-value API like this in the language supports. Especially when compared with simply using an object or map as the value (with CRUD or event sourcing support). For persisting deltas/changes only, there's the event sourcing already, and having an additional KV API increases the surface area that language supports need to implement. For CRUD support naming, we're suggesting to refer to this as a value-based state model, and it could be the default state model for entities. From the user perspective, we could simply call these I think the main thing to do here is to complete the basic entities with value-based state model (CRUD support). In terms of roadmap, we also discussed other forward-looking things, like replicated and active-active event-sourcing, durable actions (as in long-lasting, that can fetch asynchronously, implement workflows), durable CRDTs, and product CRDTs. So suggesting that we close this PR and put the idea of KV APIs in language supports on hold, and focus on finishing the value entities (CRUD support) and making those the default starting point. Interested to hear more thoughts on this, and it can be a topic for the next contributors' call. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pvlugter thanks for summing everything up. It is a clear roadmap and the proposed names are fine for me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that this is the best path forward. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having a good "CRUD" solution might obviate the need for a KV solution, especially if the proxy can diff. |
||
public com.google.protobuf.Empty yourCommand(YourCommandType command, CommandContext ctx) { | ||
final boolean someValidation = command.hashCode() % 2 == 0; // Or whatever | ||
|
||
if (someValidation) { | ||
ctx.fail("Invalid command as foo was baz"); | ||
} | ||
|
||
state().set(name, "YourAwesomeName"); | ||
|
||
ctx.emit(state().toProtoModification()); | ||
return com.google.protobuf.Empty.getDefaultInstance(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
// Copyright 2019 Lightbend Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
syntax = "proto3"; | ||
|
||
import "google/protobuf/any.proto"; | ||
|
||
package cloudstate; | ||
|
||
option java_package = "io.cloudstate.keyvalue"; | ||
|
||
message KVEntity { | ||
map<string, bytes> entries = 1; | ||
} | ||
|
||
message KVModification { | ||
map<string, bytes> updated_entries = 1; | ||
repeated string removed_keys = 2; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viktorklang
This is the serialization for the user choosen for his values in the KVS, right?
Does this conflict or is it compatible with the stable serialization rules by Cloudstate? Is this relevant here too? I think about how, if done, a separate/different user lib, language wise, would parse the values back is they're are marshalled as ByteStrings here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this needs work. I am not sure how to integrate it though—I'd need to go through AnySupport I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be a good example how to use AnySupport.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. I'll have a look at what that could look like.