Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PostgresSubscribableChannel - traceId and spanId are not added to log #612

Closed
piotrsiczek opened this issue Mar 1, 2024 · 9 comments
Closed

Comments

@piotrsiczek
Copy link

I have an application that uses PostgresSubscribableChannel to receive notifications from postgresql.
When a new message is received it contains traceId and spanId in message.getHeaders(), but are not visible in log message. What am I missing in my configuration?

Versions I am working with:

  • spring-boot:3.2.2
  • micrometer-tracing-bridge-brave:jar:1.2.2
  • spring-integration-jdbc:jar:6.2.1
  • micrometer-tracing:jar:1.2.2
  • micrometer-tracing-bridge-brave:jar:1.2.2
  • micrometer-observation:jar:1.12.2

Configuration

    @Bean
    public JdbcChannelMessageStore messageStore(DataSource dataSource) {
        var messageStore = new JdbcChannelMessageStore(dataSource);
        messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
        return messageStore;
    }

    @Bean
    public MessageChannel messageChannel(JdbcChannelMessageStore messageStore, ObservationRegistry observationRegistry) {
        var queueChannel = MessageChannels.queue(messageStore, "group-id")
                                          .getObject();
        queueChannel.registerObservationRegistry(observationRegistry);
        return queueChannel;
    }

    @Bean
    public PostgresChannelMessageTableSubscriber subscriber(DataSourceProperties dsp) {
        return new PostgresChannelMessageTableSubscriber(() -> DriverManager.getConnection(dsp.determineUrl(), dsp.determineUsername(), dsp.determinePassword())
                                                                            .unwrap(PgConnection.class));
    }

    @Bean
    public PostgresSubscribableChannel subscriptionChannel(PostgresChannelMessageTableSubscriber subscriber,
                                                           JdbcChannelMessageStore messageStore,
                                                           PlatformTransactionManager transactionManager,
                                                           ObservationRegistry observationRegistry) {
        var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, "group-id", subscriber);
        postgresSubscribableChannel.setTransactionManager(transactionManager);
        postgresSubscribableChannel.registerObservationRegistry(observationRegistry);
        return postgresSubscribableChannel;
    }

Message listener

    @Bean
    IntegrationFlow streamOutboxMessage(MessageChannel subscriptionChannel, ObjectMapper objectMapper) {
        return IntegrationFlow
                .from(subscriptionChannel)
                .handle(message -> {
                    //traceId and spanId is not added to this log 
                    log.info("Event received: {}", message.getPayload());
                    processMessage(message);
                })
                .get();
    }
@piotrsiczek piotrsiczek changed the title PostgresSubscribableChannel - traceId and spanId is not added to log PostgresSubscribableChannel - traceId and spanId are not added to log Mar 1, 2024
@jonatan-ivanov
Copy link
Member

This might be caused by the instrumentation in Spring Integration. Could you please open an issue against that repo?
They might need a minimal reproducer too to troubleshoot this.
Also, please check what happens if here:

.handle(message -> {
    //traceId and spanId is not added to this log 
    log.info("Event received: {}", message.getPayload());
    processMessage(message);
})

You try to get the current observation from the ObservationRegistry.

@jonatan-ivanov jonatan-ivanov closed this as not planned Won't fix, can't repro, duplicate, stale Mar 12, 2024
@jonatan-ivanov
Copy link
Member

/cc @artembilan

@artembilan
Copy link
Contributor

Let’s see if setDispatcherExecutor() on that channel wrapped with instrumentation helps somehow! On the other hand you can use a channel interceptor: https://docs.spring.io/spring-integration/reference/metrics.html#observation-propagation

@artembilan
Copy link
Contributor

Well, after looking into the code this morning, I realize that ContextExecutorService won't help.
And even ObservationPropagationChannelInterceptor won't work on that channel. The MessageWithThreadState is not Serializable to make it to be stored into DB when we send a message to this channel.
Since you say that traceId and spanId are present in the headers I suggest to enable observation on that handle():

                .handle(message -> {
                    //traceId and spanId is not added to this log 
                    log.info("Event received: {}", message.getPayload());
                    processMessage(message);
                }, e -> e.id("myHandler"))

@EnableIntegrationManagement(observationPatterns = "myHandler")

This way an observation is going to be restored from those headers before your code is called.

Please, raise an issue in https://github.com/spring-projects/spring-integration, so we will look into a proper propagation via that (and similar persistent) message channel.

Thanks

@piotrsiczek
Copy link
Author

Thank you for your answers.

@jonatan-ivanov when I added var currentObservation = observationRegistry.getCurrentObservation(); to handle method currentObservation is null

@artembilan I followed your advice but unfortunately it didn't work, traceId and spanId still are not logged. I will raise an issue as you suggested and create a reproducer.

@artembilan
Copy link
Contributor

Apparently the problem is not so simple, so created a respective issue for propagation revisiting: spring-projects/spring-integration#9001.

For now it should work as:

  1. Enable observation on the subscriptionChannel
  2. Enable observation on the handle()

@artembilan
Copy link
Contributor

So, apparently there is some race condition with Java DSL and management configurer bean post-processor (I'll investigate that separately).
And here is a working sample, but based on the messaging annotations instead: https://github.com/artembilan/sandbox/tree/master/postgres-channel-observation.

It sounds like we just need to mention in Spring Integration docs how to propagate an observation (tracing, essentially) over this kind of persistent channel.
And also that the mentioned ObservationPropagationChannelInterceptor is not suited for persistent channels, because an Observation is not Serializable.
On the other hand it might be deprecated altogether since sender observation is closed when the send() operation is done.
Therefore we need a new observation on the receiving side, which is done by the mentioned enabled observation on the channel subscriber.

@artembilan
Copy link
Contributor

OK. Figured out what is going on with Java DSL.
The .handle((m) -> ) produces a MessageHandler which is not a MessageHandlerSupport with an IntegrationManagement marker, but used as is for future subscription.
Therefore we cannot apply enableObservation on it.
How to mitigate that, I'll think over weekend...

I know: this feels like issue hijacking, but I wanted to close all the gaps before we move on 😄

@piotrsiczek
Copy link
Author

@artembilan thank you for solving this issue and sending a working example.

I tested it in my app. After sending the message to the messageChannel it was stored to postgres then the new handler was notified about the message. It works as expected the traceId and spanId are propagated and restored in log.

Thanks :)👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants