Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Actor state save best practices #854

Open
manuelserradev opened this issue Apr 27, 2023 · 8 comments
Open

Actor state save best practices #854

manuelserradev opened this issue Apr 27, 2023 · 8 comments
Labels
kind/bug Something isn't working P1

Comments

@manuelserradev
Copy link
Contributor

Expected Behavior

Saving actor state does not fail.

Actual Behavior

Saving an actor state fails with the following:

java.util.ConcurrentModificationException: null
	at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1597)
	at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1630)
	at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1628)
	at io.dapr.actors.runtime.ActorStateManager.flush(ActorStateManager.java:286)
	at io.dapr.actors.runtime.ActorStateManager.lambda$save$15(ActorStateManager.java:272)
	at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73)
	at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:228)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onComplete(FluxHide.java:147)
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1840)
	at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:62)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
	at reactor.core.publisher.MonoIgnorePublisher.subscribe(MonoIgnorePublisher.java:57)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
	at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:203)
	at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4475)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
	at reactor.core.publisher.Mono.block(Mono.java:1741)	

Steps to Reproduce the Problem

Implement an actor with two methods like the following:

  @Override
  public void enqueue(String taskId) {

    var mockPayload = "{}";

    this.getActorStateManager().set("payload_%s".formatted(taskId), mockPayload).block();

    System.out.println("Saved payload %s".formatted(taskId));

    this.saveState().block();
  }

  @Override
  public void ack(String taskId) {

    var mockPayload = this.getActorStateManager().get("payload_%s".formatted(taskId), String.class).block();

    System.out.println("Retrieved payload %s".formatted(taskId));
    System.out.println("Payload was %s".formatted(mockPayload));

    this.getActorStateManager().remove("payload_%s".formatted(taskId)).block();

    this.saveState().block();
  }

And play around, try to call:

  1. enqueue(1)
  2. enqueue(2)
  3. enqueue(3)
  4. ack(1)

Should fail.

Controller for the sake of completeness:

@RestController
@RequiredArgsConstructor
public class SerializerActorController {

  private final ActorProxyBuilder<SerializerActor> actorProxyBuilder;

  @GetMapping(value = "/enqueue/{taskId}")
  public ResponseEntity<Void> enqueue(@PathVariable("taskId") String taskId) {

    try {
      var actorId = new ActorId("actorId");
      var actor = actorProxyBuilder.build(actorId);

      actor.enqueue(taskId);

      return ResponseEntity.ok().build();
    } catch (Exception ex) {
      ex.printStackTrace();
      return ResponseEntity.internalServerError().build();
    }
  }

  @GetMapping(value = "/ack/{taskId}")
  public ResponseEntity<Void> ack(@PathVariable("taskId") String taskId) {

    try {
      var actorId = new ActorId("actorId");
      var actor = actorProxyBuilder.build(actorId);

      actor.ack(taskId);

      return ResponseEntity.ok().build();
    } catch (Exception ex) {
      ex.printStackTrace();
      return ResponseEntity.internalServerError().build();
    }
  }
}

Release Note

RELEASE NOTE:

FIX Solved concurrency on actor state saving stage.

@manuelserradev manuelserradev added the kind/bug Something isn't working label Apr 27, 2023
@mukundansundar
Copy link
Contributor

@nitroin What is the version of Dapr that you are using? Also what version of SDK are you using?

@manuelserradev
Copy link
Contributor Author

manuelserradev commented May 12, 2023

@mukundansundar

❯ dapr --version
CLI version: 1.9.1
Runtime version: 1.10.5

Latest SDK version (1.8.0).

@mukundansundar
Copy link
Contributor

mukundansundar commented May 18, 2023

@nitroin Can you check if this happens with 1.10.7 runtime?
And what actor state store is being used?

@artursouza
Copy link
Member

This error might be inside the ActorStateManager code in SDK and not in runtime.

@artursouza artursouza added the P1 label May 18, 2023
@manuelserradev
Copy link
Contributor Author

manuelserradev commented May 26, 2023

I, also, do think that is not runtime related.

What I can see is that in the actor-sdk the incorrect type of collection is used (for thread safety and such), but this contradicts with the expectation of the "actor" threading model - but again looks like to me the problem arise from underlying layer (sdk and state interaction).

That being said I'll do try on the latest runtime.

@manuelserradev
Copy link
Contributor Author

I can confirm the same issue with:

❯ dapr --version
CLI version: 1.9.1 
Runtime version: 1.10.7

And sdk version 1.8.0.

I will copy-paste the full actor:

import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.actors.ActorId;
import io.dapr.actors.runtime.AbstractActor;
import io.dapr.actors.runtime.ActorRuntimeContext;
import io.dapr.client.DaprClient;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SerializerActorImpl extends AbstractActor implements SerializerActor {

  public SerializerActorImpl(ActorRuntimeContext<SerializerActorImpl> runtimeContext, ActorId id) {
    super(runtimeContext, id);

    log.info("Actor:%s Initializing...".formatted(id));
  }

  public static SerializerActorImpl createActor(
      ActorRuntimeContext<SerializerActorImpl> runtimeContext,
      ActorId id,
      DaprClient daprClient,
      String broker,
      String engineTopic,
      ObjectMapper objectMapper) {
    var actor = new SerializerActorImpl(runtimeContext, id);

    return actor;
  }

  @Override
  public void enqueue(String taskId) {

    var mockPayload = "{}";

    this.getActorStateManager().set("payload_%s".formatted(taskId), mockPayload).block();

    System.out.println("Saved payload %s".formatted(taskId));

    this.saveState().block();
  }

  @Override
  public void ack(String taskId) {

    var mockPayload =
        this.getActorStateManager().get("payload_%s".formatted(taskId), String.class).block();

    System.out.println("Retrieved payload %s".formatted(taskId));
    System.out.println("Payload was %s".formatted(mockPayload));

    this.getActorStateManager().remove("payload_%s".formatted(taskId)).block();

    this.saveState().block();
  }

  @Override
  public String state() {
    return "hello!";
  }
}

Controller:

import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxyBuilder;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
public class SerializerActorController {

  private final ActorProxyBuilder<SerializerActor> actorProxyBuilder;

  @GetMapping(value = "/enqueue/{taskId}")
  public ResponseEntity<Void> enqueue(@PathVariable("taskId") String taskId) {

    try {
      var actorId = new ActorId("actorId");
      var actor = actorProxyBuilder.build(actorId);

      actor.enqueue(taskId);

      return ResponseEntity.ok().build();
    } catch (Exception ex) {
      ex.printStackTrace();
      return ResponseEntity.internalServerError().build();
    }
  }

  @GetMapping(value = "/ack/{taskId}")
  public ResponseEntity<Void> ack(@PathVariable("taskId") String taskId) {

    try {
      var actorId = new ActorId("actorId");
      var actor = actorProxyBuilder.build(actorId);

      actor.ack(taskId);

      return ResponseEntity.ok().build();
    } catch (Exception ex) {
      ex.printStackTrace();
      return ResponseEntity.internalServerError().build();
    }
  }
}

App config:

import com.fasterxml.jackson.databind.Module;
import io.cloudevents.core.provider.ExtensionProvider;
import io.cloudevents.jackson.JsonFormat;
import io.dapr.actors.client.ActorClient;
import io.dapr.actors.client.ActorProxyBuilder;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ApplicationConfig {

  @Bean
  public Module cloudEventModule() {
    return JsonFormat.getCloudEventJacksonModule();
  }

  @Bean
  public ActorClient actorClient() {
    return new ActorClient();
  }

  @Bean
  public DaprClient daprClient(CloudEventSerializer asd) {
    return new DaprClientBuilder().withObjectSerializer(asd).build();
  }

  @Bean
  public ActorProxyBuilder<SerializerActor> actorProxyBuilder(ActorClient actorClient) {
    return new ActorProxyBuilder<SerializerActor>(SerializerActor.class, actorClient);
  }
}

And actor factory, needed to inject app configs into actors:

import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.actors.ActorId;
import io.dapr.actors.runtime.ActorFactory;
import io.dapr.actors.runtime.ActorRuntimeContext;
import io.dapr.client.DaprClient;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class SerializerActorFactory implements ActorFactory<SerializerActorImpl> {

  private final DaprClient daprClient;

  private final ObjectMapper objectMapper;

  @Value("${dapr.components.pubsub.broker}")
  private String daprBroker;

  @Value("${dapr.components.pubsub.topics.out.engine}")
  private String daprOutTopicEngine;

  @Override
  public SerializerActorImpl createActor(
      ActorRuntimeContext<SerializerActorImpl> actorRuntimeContext, ActorId actorId) {
    return SerializerActorImpl.createActor(
        actorRuntimeContext, actorId, daprClient, daprBroker, daprOutTopicEngine, objectMapper);
  }
}

@fnurglewitz
Copy link

fnurglewitz commented May 26, 2023

I'm having the same issue with dapr 1.10

$ dapr --version
CLI version: 1.10.0
Runtime version: 1.10.5

@manuelserradev
Copy link
Contributor Author

This error might be inside the ActorStateManager code in SDK and not in runtime.

@artursouza care to explain a little further? This error still happens and I'd like to understand any actionable on our side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Something isn't working P1
Projects
None yet
Development

No branches or pull requests

4 participants