Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

WIP-CRUD on top of Event Sourcing #220

Merged

Conversation

ralphlaude
Copy link
Contributor

@ralphlaude ralphlaude commented Mar 30, 2020

This is just a first step to investigate if the CRUD state model could be done on top of Event Sourcing. If it is the case it can be a good candidate for CRQS and also Projection. So it is about this issue #50 (CRUD) and #81 (CRQS).

For CRUD the User Function should not deal with events and only with the snapshot of the whole object graph. For this i just add code to deal with snapshot and event in the EventSourcedEntity and comment code related to events in the language support.

// Nothing to do
// TODO: is it he right place delete the oldest snapshots and events?
// TODO ideally we want a one to one mapping between event and snapshot.
deleteSnapshots(SnapshotSelectionCriteria().copy(maxSequenceNr = metadata.sequenceNr - 1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you want to do this? (esp. for all EventSourcedEntities?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viktorklang I did it this way just to show how it should work for the the eventually new CrudEntity. I would like to have an one to one mapping between snapshot and event. in this case snapshot and event are almost the same.

@viktorklang
Copy link
Contributor

@ralphlaude I think you should have a look at how I did it for KVS, also built on top of EventSourcing: https://github.com/cloudstateio/cloudstate/pull/143/files

@ralphlaude
Copy link
Contributor Author

@viktorklang i will do

@ralphlaude
Copy link
Contributor Author

ralphlaude commented Mar 31, 2020

@viktorklang,
annotation for CrudEntity is added and the registration of such entity in cloudstate is now possible. I am looking forward to deal with the following questions:

  • how to deal with the compression of events because and it can be very big?
  • how to deal with the numbers of events for projection? in this case only the youngest event is the most important for projections
  • how to externalize a tiny abstraction for dealing with modification/reset on the CrudState?

Do we need all this?

@ralphlaude
Copy link
Contributor Author

@viktorklang, @jroper,

i want to know if my proposal regarding how to deal with the CRUD entity state in the User Function is the right way to go.

Here is the content of the crud_entity.proto file:

syntax = "proto3";

import "google/protobuf/any.proto";

package cloudstate;

option java_package = "io.cloudstate.crud";

// The CRUD state.
message CrudEntityState {
  bytes state = 1;
}

// The CRUD change events.
message CrudEntityStateModification {
  bytes state = 1;
}

Here is the class which deals with the CrudEntity state and will be in the java support:

package io.cloudstate.samples.shoppingcart;

import com.example.shoppingcart.Shoppingcart;
import com.example.shoppingcart.crud.persistence.Domain;
import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import com.google.protobuf.Message;
import io.cloudstate.crud.CrudState;
import io.cloudstate.javasupport.EntityId;
import io.cloudstate.javasupport.crud.CrudEntity;
import io.cloudstate.javasupport.eventsourced.CommandContext;
import io.cloudstate.javasupport.eventsourced.CommandHandler;
import io.cloudstate.javasupport.eventsourced.EventHandler;
import io.cloudstate.javasupport.eventsourced.Snapshot;
import io.cloudstate.javasupport.eventsourced.SnapshotHandler;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

// This class will be in the java support
public abstract class AbstractCrudEntityState<T extends Message> {

  // CrudState.CrudEntityState represents the persistent state when snapshot is done
  // CrudState.CrudEntityStateModification represents the persistent events
  // CrudState.CrudEntityState and CrudState.CrudEntityStateModification will likely be renamed


  // How to handle get rid of the type url?
  private String typeUrl;

  // How to deal with specific type here? I will try to use generics here but i am sure it will help.
  private T state;

  public final void state(T state) {
    // do some check here
    this.typeUrl = state.getDescriptorForType().getFullName();
    this.state = state;
  }

  public final T state() {
    // How to deal with reflection or grpc here?
    return state;
  }

  public final CrudState.CrudEntityState toEntityState() {
    return CrudState.CrudEntityState.newBuilder().setState(state.toByteString()).build();
  }

  public final CrudState.CrudEntityStateModification toEvent() {
    return CrudState.CrudEntityStateModification.newBuilder().setState(state.toByteString()).build();
  }

  // transform the state (domain state) in CrudState.CrudEntityStateModification
  public final CrudState.CrudEntityStateModification toEvent(T modifiedState) {
    return CrudState.CrudEntityStateModification.newBuilder().setState(modifiedState.toByteString()).build();
  }

  public final AbstractCrudEntityState<T> resetTo(CrudState.CrudEntityState entityState) {
    // How to get rid of the type url?
    this.state = (T) Any.newBuilder().setTypeUrl(typeUrl).setValue(entityState.getState()).build();
    return this;
  }

  public final AbstractCrudEntityState<T> applyEvent(CrudState.CrudEntityStateModification modification) {
    // How to get rid of the type url?
    this.state = (T) Any.newBuilder().setTypeUrl(typeUrl).setValue(modification.getState()).build();
    return this;
  }



  @CrudEntity
  public static final class ShoppingCart extends AbstractCrudEntityState<Domain.Cart> {
    private final String entityId;

    private Domain.Cart cart = Domain.Cart.newBuilder().build();

    public ShoppingCart(@EntityId String entityId) {
      this.entityId = entityId;
      this.state(cart);
    }

    @Snapshot
    public CrudState.CrudEntityState snapshot() {
      return toEntityState();
    }

    @SnapshotHandler
    public void handleSnapshot(CrudState.CrudEntityState state) {
      cart = resetTo(state).state();
    }

    @EventHandler
    public void handleEvent(CrudState.CrudEntityStateModification event) {
      cart = applyEvent(event).state();
    }

    @CommandHandler
    public Shoppingcart.Cart getCart() {
      Collection<Shoppingcart.LineItem> lineItems =
              cart.getItemsList().stream().map(this::convert).collect(Collectors.toList());

      return Shoppingcart.Cart.newBuilder().addAllItems(lineItems).build();
    }

    @CommandHandler
    public Empty addItem(Shoppingcart.AddLineItem item, CommandContext ctx) {
      if (item.getQuantity() <= 0) {
        ctx.fail("Cannot add negative quantity of to lineItem" + item.getProductId());
      }

      Domain.Cart modifiedCart = applyCommand(item); // create new modified state for shopping cart
      CrudState.CrudEntityStateModification modification = toEvent(modifiedCart); // create event sourced event of the state

      ctx.emit(modification);

      return Empty.getDefaultInstance();
    }

    private Domain.Cart applyCommand(Shoppingcart.AddLineItem item) {
      Domain.LineItem lineItem =
              Domain.LineItem.newBuilder()
                      .setProductId(item.getProductId())
                      .setName(item.getName())
                      .setQuantity(quantity(item))
                      .build();

      List<Domain.LineItem> items = cart.getItemsList().stream()
              .filter(i -> !i.getProductId().equals(item.getProductId()))
              .collect(Collectors.toList());

      return Domain.Cart.newBuilder()
              .addAllItems(items)
              .addItems(lineItem)
              .build();
    }

    private Shoppingcart.LineItem convert(Domain.LineItem item) {
      return Shoppingcart.LineItem.newBuilder()
              .setProductId(item.getProductId())
              .setName(item.getName())
              .setQuantity(item.getQuantity())
              .build();
    }

    private int quantity(Shoppingcart.AddLineItem item) {
      return cart.getItemsList().stream()
              .filter(lineItem -> lineItem.getProductId().equals(item.getProductId()))
              .findFirst()
              .map(lineItem -> lineItem.getQuantity() + item.getQuantity())
              .orElse(item.getQuantity());
    }
  }
}

@jroper
Copy link
Member

jroper commented Apr 5, 2020

@ralphlaude This isn't really CRUD (there's no delete), and it's pushing the definition of retrieve as well (since you can't arbitrarily retrieve any entity from a function, you're given an entity to work with), so I wouldn't call this CRUD. It's a key value, and we already have user function level support for key value storage.

What we'd like to do is support non eventsourced support for key value storage, so that things can be stored optimised (ie, without all the history). It could be accompanied by a log for CQRS, but for most use cases, that log need only be an index of when the values last changed, again stored optimised for the given store.

@ralphlaude
Copy link
Contributor Author

Hi @jroper,
thanks for the insights and i was thinking of CRUD just for an entity and not for the whole entity type. I will rethink this.

@viktorklang
Copy link
Contributor

@jroper Loading any instance in the stateful function will completely mess up the entity_key routing, will it not? Or you are thinkin about storing multiple sub-aggregates under the same entity_key?

@ralphlaude
Copy link
Contributor Author

@viktorklang, @jroper,

CRUD will leverage key value store and will be built on top of event sourcing.
For building the change logs for CQRS one option could be to have CRUD chnage log containing last updated values with index and also all removed values.
I am not sure if it makes sense to save the change logs as additional part of the state (the change logs could grow and could be very big). I think it also stored optimised.

@viktorklang i was thinking to use an entity_key for the whole aggregate graph. I am not sure here but perhaps there are another options.

…loudState java language support. Add KeyValue entity that the CRUD entity use. The KeyValue entity has support for last updates and removes.
@ralphlaude
Copy link
Contributor Author

ralphlaude commented Apr 14, 2020

@viktorklang, @jroper,
This is an extented implementation i propose based on (https://github.com/cloudstateio/cloudstate/pull/143/files of @viktorklang).

What i did:

  • Introduce a new class io.cloudstate.javasupport.crud.KeyValue.ChangedMap for dealing with last changed (updates and remove) in the CRUD entity. This is useful for CQRS
  • I change the definition of the io.cloudstate.javasupport.crud.CrudEntity annotation and also the logic for CloudState.registerCrudEntity

What i would like to do:

  • Who should register the key value proto file descriptor io.cloudstate.keyvalue.KeyValue.getDescriptor()?
    i would propose to do in CloudState.registerCrudEntity so it is implicit to the user
  • How to define persistent actor id for the CRUD entity? The CRUD entity is backed by a persistent actor and this actor should receive all request
    for the CRUD entity. Unfortunately the id (entity_key) for this actor cannot be extract from the GRPC request paylod, the entity_key should be created upfront.
    I would suggest to introduce a new entity type that can be passed to io.cloudstate.javasupport.impl.eventsourced.EventSourcedStatefulService and
    further down to io.cloudstate.proxy.EntityUserFunctionTypeSupport which can decide how to create the entity_key of persistent actor.

What are the next steps:

  • better naming, better documentation and better code design
  • how to deal with the CRUD entity state in the io.cloudstate.javasupport.eventsourced.CommandContext to be able to ctx.setState like @viktorklang proposed?
  • how to deal with sub-aggregates regarding the key? Use the same key or not for saving sub-aggregates?

Any comments are welcome.

@ralphlaude
Copy link
Contributor Author

@pvlugter the renaming is done, please take a look and give feeback.

Copy link
Member

@pvlugter pvlugter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ralphlaude, cool, looks good with the Entity name in the user API.

So this initial work is basically all done now? Needs to be based on latest master.

}

/**
* Register an value based entity factory.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Register an value based entity factory.
* Register a value based entity factory.

import java.lang.annotation.Target;

/**
* Marks a method on an value based entity as a command handler.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Marks a method on an value based entity as a command handler.
* Marks a method on a value based entity as a command handler.

import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/** An value based entity. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** An value based entity. */
/** A value based entity. */

/**
* Creation context for {@link Entity} annotated entities.
*
* <p>This may be accepted as an argument to the constructor of an value based entity.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* <p>This may be accepted as an argument to the constructor of an value based entity.
* <p>This may be accepted as an argument to the constructor of a value based entity.

package io.cloudstate.javasupport.entity;

/**
* Low level interface for handling commands on an value based entity.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Low level interface for handling commands on an value based entity.
* Low level interface for handling commands on a value based entity.

import java.util.Optional;

/**
* Low level interface for handling commands on an value based entity.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Low level interface for handling commands on an value based entity.
* Low level interface for handling commands on a value based entity.

import java.util.stream.Collectors;

/** An event sourced entity. */
@EventSourcedEntity
/** An value based entity. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** An value based entity. */
/** A value based entity. */

/** An event sourced entity. */
@EventSourcedEntity
/** An value based entity. */
@Entity(persistenceId = "value-entity-shopping-cart")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the persistence id be just shopping-cart?

@ralphlaude
Copy link
Contributor Author

So this initial work is basically all done now? Needs to be based on latest master.

@pvlugter. thanks for the feedback. Yes, the main work is done here. The only thing left is to make the value entity as default and adapt the the health-checks/readiness-checks in the accordingly in the proxy JDBC module. This means change the way the JDBC tables creation are done.

@ralphlaude
Copy link
Contributor Author

@pvlugter now the initial work is all done. This can be changed to a PR and reviewed again if needed.

Copy link
Member

@pvlugter pvlugter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, @ralphlaude

Great to see all the testing adapted and extended from other entities 👍

I think the main change that would be good to get in before merging is to not have slick dependencies and slick-based jdbc store in proxy core, but only in the jdbc proxy.

Can then be merged and worked on incrementally, for the things described in other issues.

build.sbt Outdated
//"ch.qos.logback" % "logback-classic" % "1.2.3", // Doesn't work well with SubstrateVM: https://github.com/vmencik/akka-graal-native/blob/master/README.md#logging
"com.typesafe.slick" %% "slick" % SlickVersion,
"com.typesafe.slick" %% "slick-hikaricp" % SlickHikariVersion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are slick dependencies being added to proxy-core? Seems strange to add here and then exclude in other proxies, rather than have all the slick-based backends together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the slick-dependencies only in proxy-jdbc. I am looking for the best option to have it separated.
The in-memory and the jdbc should be somehow pluggable and the in-memory should live in the core. @pvlugter perhaps you have some options here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The common approach for Lightbend projects is to have a setting in config, that includes a fully qualified class name that can be created dynamically. So that there's something like:

cloudstate.proxy.entity {
  store = in-memory

  in-memory {
    store-class = "fully.qualified.class.name.for.InMemoryStore"
    another-setting-for-in-memory = something
  }
}
# in proxy-jdbc, override setting

cloudstate.proxy.entity {
  store = jdbc

  jdbc {
    store-class = "fully.qualified.class.name.for.JdbcStore"
  }
}

I can take a look at adding this.

// limitations under the License.

// This is the public API offered by the shopping cart entity.
syntax = "proto3";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, would be good to have both value and event-sourced entities implement the same shopping cart protocol. Can add the remove cart method. It would demonstrate how the same service can be implemented with different state models.

cloudstate.proxy {
journal-enabled = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#484 has been merged, so config can be updated to match, with eventsourced-entity.journal-enabled

@@ -137,32 +126,6 @@ class EntityDiscoveryManager(config: EntityDiscoveryManager.Configuration)(
.withChannelBuilderOverrides(_.maxInboundMessageSize(config.maxInboundMessageSize.toInt))
.withTls(false)
private[this] final val entityDiscoveryClient = EntityDiscoveryClient(clientSettings)
private[this] final val autoscaler = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was already removed, but yes, should be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still have at a lot of places in the code

}

private def createJdbcStore(config: Config)(implicit ec: ExecutionContext): Store[Key, ByteString] = {
val slickDatabase = JdbcSlickDatabase(config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is why slick dependencies are on proxy core, we can look at making it possible to create stores dynamically from config, or have a store factory passed through, or similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is why slick dependencies are in the proxy core. Yes agreed with the proposed options. Let talk about it. I was thinking for a couples of days now and i don't have an option yet.


"not deserialize state" in {
val wrongSerializedEntity = ByteString("p.cloudstate.io/string_state")
deserialize(wrongSerializedEntity) should not be entity
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the deserialized state in this case? Should it be empty or something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should entity serializer have a check for not finding the separator (index = -1)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question just means something is confusing here. The serializes state should be p.cloudstate.io/string|state (the pipe sign is used as separator) and not p.cloudstate.io/string_state. Yes, the serializer should have somehow a check for not finding the separator and throw an exception. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, realise that it's checking deserialization of an incorrect byte string, but it only checks that it's not the expected result. So it leads to wondering about the failure case. Throwing an exception when it fails to deserialize sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the store-api and remove the serializer. now the typeurl is also persisted along with the state. A Value is introduced in the store-api which represents the typeurl and the state of the entity. The test is removed.

@pvlugter pvlugter marked this pull request as ready for review November 10, 2020 04:10
@ralphlaude
Copy link
Contributor Author

ralphlaude commented Nov 10, 2020

@pvlugter thanks 👍

The main issue here now is how to make the creation of the store pluggable or dynamic. Using the config or passing the factory around sounds good. I need more insights here.

@pvlugter
Copy link
Member

@ralphlaude, I've added some minimal changes to dynamically create stores, and moved the JDBC store under the JDBC proxy.

@ralphlaude
Copy link
Contributor Author

@pvlugter the end goal is achieved here. Please take also a look.

Copy link
Member

@pvlugter pvlugter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ralphlaude 👍 Ok, great. Thanks for adapting to all the changes, and the persistence on seeing this through. Started in February! I think we can merge this now and then continue incrementally with the remaining work 🎉

@pvlugter pvlugter merged commit b10f133 into cloudstateio:master Nov 12, 2020
@marcellanz
Copy link
Contributor

cool. @ralphlaude awesome work 🎉

@ralphlaude
Copy link
Contributor Author

It was a long and interesting journey with all the problems and insights on the way. Thanks @pvlugter, @jroper, @viktorklang, @sleipnir and @marcellanz for the support, the discussions and the feedback. It was all fun :). CRUD was very CRAZY! :).

@pvlugter yes, let continue with the remaining work.

@viktorklang
Copy link
Contributor

Wooohooo! Thanks @ralphlaude for your extreme perseverance! Awesomely done :)

@sleipnir
Copy link

We who thank you for your commitment. Now let's take advantage of what you have done and build lots of examples and friendly documentation for our users. Thanks for the work @ralphlaude !

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants