Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@ConsumeEvent does not work in native when returning a Uni<Set<String>> #36172

Closed
tnmtechnologies opened this issue Sep 26, 2023 · 21 comments · Fixed by #36374
Closed

@ConsumeEvent does not work in native when returning a Uni<Set<String>> #36172

tnmtechnologies opened this issue Sep 26, 2023 · 21 comments · Fixed by #36374
Assignees
Labels
Milestone

Comments

@tnmtechnologies
Copy link
Contributor

tnmtechnologies commented Sep 26, 2023

Describe the bug

Quarkus 3.4.1
Redis 7.2.1

I use event bus consumer to run the redis client logic.
In JVM mode, the following piece of code works fine but with native executable, it fails with a timeout (30s default value I guess) at event bus sender side.
Thanks to the redis MONITOR command, I can see the command processed by redis.

I did some tests with Quarkus 3.2.6.Final. I have the same behavior.

@Inject
private ReactiveRedisDataSource rds;

  return Optional.ofNullable(nfType)
                .map(t -> {
                    final ReactiveSetCommands<String,String> setCommands = rds.set(String.class);
                    return setCommands.smembers(toNfTypesKey(t));
                }).orElseGet(() -> {
                    final ReactiveKeyCommands<String> keyCommands = rds.key(String.class);
                    final ReactiveKeyScanCursor<String> scanCursor = keyCommands.scan(new KeyScanArgs().match(NF_PROFILES_NAMESPACE + '*').count(Long.MAX_VALUE));
                    final boolean hasNext = scanCursor.hasNext();
                    return hasNext ? scanCursor.next() : Uni.createFrom().item(() -> Collections.emptySet());
                }).onItem().transformToMulti(s -> Multi.createFrom().items(s::stream))
                .select().first(limit)
                .onItem().transform(s -> nfType != null ? s : s.substring(NF_PROFILES_NAMESPACE.length()))
                .collect().asSet()
                ;

Expected behavior

The code should work in native executable as for JVM mode.

Actual behavior

The code works in JVM mode but fails with native executable.

How to Reproduce?

No response

Output of uname -a or ver

No response

Output of java -version

java -version
openjdk version "17.0.5" 2022-10-18
OpenJDK Runtime Environment GraalVM CE 22.3.0 (build 17.0.5+8-jvmci-22.3-b08)
OpenJDK 64-Bit Server VM GraalVM CE 22.3.0 (build 17.0.5+8-jvmci-22.3-b08, mixed mode, sharing)

GraalVM version (if different from Java)

No response

Quarkus version or git rev

3.4.1

Build tool (ie. output of mvnw --version or gradlew --version)

./mvnw --version
Apache Maven 3.9.1 (2e178502fcdbffc201671fb2537d0cb4b4cc58f8)
Maven home: C:\Users\mvsz7559\.m2\wrapper\dists\apache-maven-3.9.1-bin\320285b4\apache-maven-3.9.1
Java version: 17.0.5, vendor: GraalVM Community, runtime: D:\Liberkey\MyApps\graalvm\graalvm-ce-java17-22.3.0
Default locale: en_GB, platform encoding: Cp1252
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"

Additional information

No response

@quarkus-bot
Copy link

quarkus-bot bot commented Sep 26, 2023

/cc @cescoffier (redis), @gsmet (redis), @machi1990 (redis)

@cescoffier
Copy link
Member

So, you say that the commands are correctly executed by Redis?

@tnmtechnologies
Copy link
Contributor Author

tnmtechnologies commented Sep 27, 2023

Here is the redis monitor output (smembers is for the mutiny map and scan for the orElseGet) for the native app:

> MONITOR
OK
1695817743.024683 [0 192.168.65.1:48609] ping
1695817753.017798 [0 192.168.65.1:48609] ping
1695817753.683625 [0 192.168.65.1:48613] hello 3
1695817753.686222 [0 192.168.65.1:48613] smembers nfTypes:NRF
1695817763.019689 [0 192.168.65.1:48609] ping
1695817773.013909 [0 192.168.65.1:48609] ping
1695817783.009533 [0 192.168.65.1:48609] ping
1695817793.021823 [0 192.168.65.1:48609] ping
1695817796.545971 [0 192.168.65.1:48614] hello 3
1695817796.548048 [0 192.168.65.1:48614] scan 0 COUNT 9223372036854775807 MATCH nfProfiles:*
1695817803.011999 [0 192.168.65.1:48609] ping
1695817813.014244 [0 192.168.65.1:48609] ping
1695817823.009855 [0 192.168.65.1:48609] ping

and the caught exception at event bus producer side (not sure it will help):

 (vert.x-eventloop-thread-0)  eventBus request e=: (TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: __vertx.reply.2, repliedAddress: nnrf.nfm.GetNFInstances
        at io.vertx.core.eventbus.impl.ReplyHandler.handle(ReplyHandler.java:76)
        at io.vertx.core.eventbus.impl.ReplyHandler.handle(ReplyHandler.java:24)
        at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:948)
        at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:919)
        at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
        at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:179)
        at io.vertx.core.impl.ContextInternal.emit(ContextInternal.java:207)
        at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:937)
        at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
        at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at [email protected]/java.lang.Thread.run(Thread.java:833)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:807)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:210)

The Redis monitor output is the same as above for JVM mode but as said, the code works fine, I mean it serves the expected HTTP response which it is not the case with the native app.

@geoand
Copy link
Contributor

geoand commented Sep 27, 2023

So just to be 100% clear, everything works properly in JVM mode?

@tnmtechnologies
Copy link
Contributor Author

Yes everything works properly in JVM mode.

@cescoffier cescoffier self-assigned this Oct 2, 2023
@cescoffier
Copy link
Member

I would need a reproducer as a simple test (added to our ITs) works.

Also, the code can be simplified:

ReactiveKeyScanCursor<String> scanned = keys.scan(new KeyScanArgs().match("people:*"));
return scanned.toMulti().collect().asSet()

@cescoffier cescoffier added the triage/needs-reproducer We are waiting for a reproducer. label Oct 2, 2023
@tnmtechnologies
Copy link
Contributor Author

Here is the reproducer project https://github.com/tnmtechnologies/quarkusRedisReactiveKeyScanCursorReproducer .

Thanks for the KEYS command. I agree it simplifies but there is a warning in the documentation:
Warning: consider KEYS as a command that should only be used in production environments with extreme care. It may ruin performance when it is executed against large databases.

@cescoffier
Copy link
Member

Thanks, I will check the reproducer.

Yes, the key command and the key scan command should be used with caution.

@cescoffier
Copy link
Member

In your reproducer you do not mention the redis version and how many key do you have.

@tnmtechnologies
Copy link
Contributor Author

I have updated the README file.
Redis version is 7.2.1-alpine.
I expect a few hundred keys.

@cescoffier
Copy link
Member

cescoffier commented Oct 8, 2023

The issue seems to be in the event bus, not in Redis. The redis code produces the expected output, but it looks like the event is lost on the event bus.

@cescoffier
Copy link
Member

The issue is the usage of Set as reply. The local codec does not seem to work in this case in native. We need to have a look (probably with the help of @mkouba ).

If you use something like:

public static class MySet extends HashSet<String> {
        public MySet() {
            super();
        }

        public MySet(Set<String> s) {
            super(s);
        }

    }

    /**
     *
     */
    @ConsumeEvent("getIds")
    public Uni<MySet> getIds(final JsonObject message) {
        Log.infov("getIds(message={0})", message);

        final Long limit = Optional.ofNullable(message.getLong("limit")).orElse(Long.MAX_VALUE);
        final String type = message.getString("type");

        return Optional.ofNullable(type)
                    .map(t -> {
                        final ReactiveSetCommands<String,String> setCommands = rds.set(String.class);
                        return setCommands.smembers(TYPE_NAMESPACE + t);
                    }).orElseGet(() -> {
                        final ReactiveKeyCommands<String> keyCommands = rds.key(String.class);                        
                        final ReactiveKeyScanCursor<String> scanCursor = keyCommands.scan(new KeyScanArgs().match(ID_NAMESPACE + '*').count(Long.MAX_VALUE));
                        final boolean hasNext = scanCursor.hasNext();
                        System.out.println("Has next...");
                        return hasNext ? scanCursor.next() : Uni.createFrom().item(() -> Collections.emptySet());
                    }).onItem().transformToMulti(s -> Multi.createFrom().items(s::stream))
                    .select().first(limit)
                    .onItem().transform(s -> type != null ? s : s.substring(ID_NAMESPACE.length()))
                    .collect().asSet()
                    .map(s -> new MySet(s))
                    .log("user code")
                    ;
    }

It works

@cescoffier cescoffier changed the title Redis ReactiveKeyScanCursor doesn't work with native executable @ConsumeEvent does not work in native when returning a Uni<Set<String>> Oct 8, 2023
@cescoffier cescoffier added area/vertx and removed triage/needs-reproducer We are waiting for a reproducer. area/redis labels Oct 8, 2023
@tnmtechnologies
Copy link
Contributor Author

Thanks a lot @cescoffier, the work around works.

@mkouba
Copy link
Contributor

mkouba commented Oct 9, 2023

It seems that we only register the LocalEventBusCodec for the return type of a consumer method, i.e. for java.util.Set in this particular case. However, when Vert.x attempts to lookup the codec for the message it's using the actual type of the message body; i.e. java.util.HashSet in this reproducer. As a result, in the JVM mode the SerializableCodec is used as the fallback (because HashSet implements java.io.Serializable) but this obviously does not work in native mode.

@tnmtechnologies In theory, you don't need to create the MySet class but merely change the return type of the consumer method to Uni<HashSet<String>>.

@cescoffier I'm not quite sure if we could do something to improve the UX in similar cases...

@cescoffier
Copy link
Member

@mkouba thanks for the analysis.

In terms of UX, should we detect when a @ConsumeEvent method return an interface and log a warning in this case (if no codec for that type is found) ?

@cescoffier cescoffier removed their assignment Oct 9, 2023
@mkouba
Copy link
Contributor

mkouba commented Oct 9, 2023

In terms of UX, should we detect when a @ConsumeEvent method return an interface and log a warning in this case (if no codec for that type is found) ?

Hm, we could log a warning when a consumer method returns an interface or Uni/CompletionStage with an interface type param but it could also be false positive because we don't know the actual type of the message returned from the method..

@tnmtechnologies
Copy link
Contributor Author

@mkouba Thank you, It works.

@cescoffier
Copy link
Member

@mkouba ah yes.... So what about just documenting it as a limitation for now?

@mkouba
Copy link
Contributor

mkouba commented Oct 9, 2023

@mkouba ah yes.... So what about just documenting it as a limitation for now?

Yes, we should extend the Use codecs section and explain how the default codec is registered for a return type. And make it clear that registering a default codec for an interface is of no use. And maybe add how to register a default codec for a type + register for reflection for native image.

I will send a PR tomorrow.

@mkouba mkouba self-assigned this Oct 9, 2023
@mkouba
Copy link
Contributor

mkouba commented Oct 9, 2023

Hm, I think that we could actually improve the UX a little bit and register a default codec selector with EventBus.codecSelector(Function<Object, String>) and use the LocalEventBusCodec by default for anything... WDYT?

@cescoffier
Copy link
Member

Ah yes! Good idea! I forgot about the selector.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants