-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SolaceIO write connector #32060
SolaceIO write connector #32060
Conversation
56225c7
to
2d3873b
Compare
cdabf33
to
e5d67fd
Compare
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @m-trieu for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, had a first pass and had a couple of comments most about
- dead letter queue pattern
- possibly excessive dependency usage
- minor package/class structure adjusts
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
Outdated
Show resolved
Hide resolved
...o/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java
Outdated
Show resolved
Hide resolved
getNumberOfClientsPerWorker(), | ||
getPublishLatencyMetrics())); | ||
|
||
return writer.withOutputTags(FAILED_PUBLISH_TAG, TupleTagList.of(SUCCESSFUL_PUBLISH_TAG)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using PCollectionTags for failed/succeeded elements is no longer a recommended pattern. Consider use a DLQ instead (if possible). ref: https://docs.google.com/document/d/1NGeCk6tOqF-TiGEAV7ixd_vhIiWz9sHPlCa1P_77Ajs/edit#heading=h.a5fbpllgc16m and a couple of IOs already implemented it
From the user end, invocation will look like https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, nice, I was not aware, let me work on those changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Included in one of the latests commits
} | ||
|
||
Preconditions.checkArgument( | ||
getMaxNumOfUsedWorkers() > 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, maxNumOfUsedWorkers and numberOfClientsPerWorker currently have a constant default, or supplied with a constant parameter. Is there a concern regarding the scalability and flexibility to scale?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the concern is related to Solace quotas. They are normally much more limited than the average number of threads and workers in a pipeline (in the order of hundreds or thousands, when quotas are much lower).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most sinks would name this numShards
, would it make sense to do the same here and merge maxNumOfUsedWorkers
and numberOfClientsPerWorker
into numShards
instead? As far as I understand from the Solace documentation the broker supports a global connection limit and a per username connection limit, but not a per "machine" (source IP?) connection limit.
If during testing of this sink the output parallelism was always set to the maximum number of workers, then I understand why that may have led to issues with connection limits, but I'm hoping that a change in wording may help avoid those issues.
In any case, if I'm reading this right, the sink will always open numberOfClientsPerWorker
and randomly use one of those clients for a given bundle (keyed on maxNumOfUsedWorkers
), but if a machine is assigned less shards than that then it will never utilize those connections to their fullest, which seems like a waste of open network connections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed name to numShards
return 0; | ||
}; | ||
|
||
retryCallableManager.retryCallable(publish, ImmutableSet.of(JCSMPException.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a look at RetryCallableManager, it is based on com.google.cloud.RetryHelper. Solace isn't a GCP cloud product, but SolaceIO depends on various gcp artifacts for non-gcp usages currently (like here). We probably want to clean up these usages in the future. For example, retry management is preferred to use a FluentBackoff (in Java SDK core)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's also an existing dependency on Failsafe (https://github.com/failsafe-lib/failsafe) in Beam, which I'd probably recommend using.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The retry library does not pull any other Google Cloud dependency, it is a generic library that happens to be distributed by Google Cloud.
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
Outdated
Show resolved
Hide resolved
...o/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partially reviewed, I'll give this another look tomorrow.
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
Show resolved
Hide resolved
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
Outdated
Show resolved
Hide resolved
* connector. | ||
*/ | ||
@Internal | ||
public final class PublishResultsReceiver { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is likely not working as you intended.
The singleton in this class receives the results for all publishers, meaning that any publish result may end up in the output PCollection of any SolaceIO.Write transform.
My guess is you'd want a PublishResultReceiver per instance of the transform.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, this is actually a bug. To fix it, there should be a receiver per instance of the transform. I am working on fixing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 976e89e (commit message "Make the PublishResult class for handling callbacks non-static to handle pipelines with multiple write transforms." in case the hash changes with a force push)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM, is there any need for this class at all if it's just a wrapper around a queue?
// Store the current window used by the input | ||
PCollection<Solace.PublishResult> captureWindow = | ||
records.apply( | ||
"Capture window", ParDo.of(new UnboundedSolaceWriter.RecordToPublishResultDoFn())); | ||
|
||
@SuppressWarnings("unchecked") | ||
WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy = | ||
(WindowingStrategy<Solace.PublishResult, BoundedWindow>) | ||
captureWindow.getWindowingStrategy(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if transplanting the windowing strategy is a valid operation. Given that most connectors do not preserve the input windowing strategy it might not be expected behavior either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some connectors do. "Transplanting" should work because what I do here is grabbing the settings of the input window, and apply the same window (window with same settings) to the output. I need to apply a global window "internally" in the connector to control for parallelization, I think it was just nice to "respect" the input window and reapply another window with the same settings to the output, instead of producing the output with the global window that is applied internally (leaking an implementation detail to the user).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the issue here is that the publishSingleMessage()
or publishBatch()
don't return published results immediately, they return a status through an async callback. The rewindow
operation (in the SolaceIO.Write#expand()
) is applied on a collection of asynchronously collected message responses, which means that they might not belong to the same window, as the original message sent. Therefore, I would also incline to not 'transplant' the window, but to convert it and output as a global.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussing with @stankiewicz we concluded that we should transplant the window but we should also make sure that each bundle waits for a publish response for the messages it originally published.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scratch that, we've decided to output the results to a GlobalWindow. This change is necessary because the output PCollection contains asynchronously received acknowledgments. Since it's not practical to trace back to the original message's timestamp, we can't apply the same windowing strategy.
PCollection<KV<Integer, Solace.Record>> withShardKeys = | ||
withGlobalWindow.apply( | ||
"Add shard key", | ||
ParDo.of(new UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers()))); | ||
|
||
String label = | ||
getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : "Publish (batched)"; | ||
|
||
PCollectionTuple solaceOutput = withShardKeys.apply(label, getWriterTransform(destinationFn)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sharding that's applied here couples Beam's internal parallelism to the number of clients you intend to use (1 per transform instance per worker). Using GroupIntoBatches.withShardedKey() instead makes more sense, since Solace mentions their producer is thread safe for some producer patterns.
On that topic, and feel free to correct me if I'm wrong, since this transform is producing session-dependent messages the producer may only be accessed by multiple threads for direct messages.
The transform switches the producer index on a per bundle basis, but it does not exclusively claim the producer, so multiple threads can access a producer regardless of the delivery mode.
To uncomplicate the thread safety conditions of the producer you could complicate access to the producer. By putting the producer on its own thread and exposing an intermediate channel for message passing (e.g. a concurrent queue) you can decouple Beam's concurrency from the number of producers (1 per distinct configuration). The producer thread polls the channel to receive sendable work from multiple producers and sets up callbacks per the message's configuration (callbacks should be per instance, see other comment).
This carries minor overhead for session-dependent direct messages, but simplifies the rest of the setup. You could optionally use a Guava cache to expire and close producers after a period of inactivity. My remaining question here is why isn't this using session-independent messages?
Back to GroupIntoBatches, it covers all of the state and timer stuff that's happening in the writer fn and applying it would simplify the writer a great deal. It looks like it might not even need state and timers at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think that GroupIntoBatches
would be a more elegant solution here and it would not need a state & timers custom implementation.
About session-independent messages, that's actually what I am using :). I think my implementation for the producer does actually that. There is a concurrent map of producers, and each thread/worker polls for a producer to be used to do some work. The clients are closed automatically after inactivity (by the client libraries) and the producer map takes care of making sure the producer reconnects when needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've confused the docs and the classes introduced by this connector. The docs state that session-dependent message are created by calling create*Message
on a Producer
and session-independent messages are created by calling create*Message
on the JCSMPFactory
. The messages produced by this connector are created by the MessageProducer
class of this connector. It's calling out to JCSMPFactory
, so I see that this connector is indeed creating session-independent messages.
Going back over my comment then, the bit about GroupIntoBatches
is still relevant although likely best addressed in a follow-up PR. As an improvement to this PR I think we can still remove the state variables that limit the DoFn's parallelism since producer instances are thread-safe and manage the connection to Solace on a separate I/O thread, so Beam's internal parallelism should have no external effect as long as the producer is shared between instances of a unique transform. Did you by chance experiment with sharing a producer per unique transform configuration or per unique connection configuration instead of the fixed pool of 100 producers? If multiple producers improve throughput, then would it make sense to allow users to configure the size of the producer pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, let me finish the changes for this PR, and I will switch to GroupIntoBatches
in an upcoming PR.
SolaceOutput output; | ||
if (getDeliveryMode() == DeliveryMode.PERSISTENT) { | ||
PCollection<Solace.PublishResult> failedPublish = solaceOutput.get(FAILED_PUBLISH_TAG); | ||
PCollection<Solace.PublishResult> successfulPublish = | ||
solaceOutput.get(SUCCESSFUL_PUBLISH_TAG); | ||
output = | ||
rewindow( | ||
SolaceOutput.in(input.getPipeline(), failedPublish, successfulPublish), | ||
windowingStrategy); | ||
} else { | ||
LOG.info( | ||
String.format( | ||
"Solace.Write: omitting writer output because delivery mode is %s", | ||
getDeliveryMode())); | ||
output = SolaceOutput.in(input.getPipeline(), null, null); | ||
} | ||
|
||
return output; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do both of these need to be the same output type? PublishResult contains a boolean for success or failure, metrics and an error message. It seems to me like you would want to provide the user a PCollection<Solace.Record> for successses, PCollection for failures and PCollection for metrics.
See the note about using the new DLQ pattern as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that's not possible, unless we accumulate the processed Solace.Record
and then we check all the responses, match each response to each incoming Solace.Record
and put the right record in the right place. I think that approach would be too heavyweight. The replies are received via "callbacks" (polling by the client for responses actually, as implemented in the Solace client libraries) by any thread in any worker (potentiall a worker different to the one that sent the original message). So doing this would require shuffling too.
The reply from Solace only contains some metadata (id, timestamp), and it is this info the one that the connector returns, which is a much simpler process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, I thought the record was propagated on the result. The writer should not be polling for any responses other than those for its own requests. Allowing any publish result to be emitted as output means the output could contain results from elements that belong to different bundles or windows. Reapplying the input windowing strategy is effectively useless if the input elements and output elements can appear in different bundles and windows.
I'm not convinced you're receiving responses to requests initiated on other machines though, that would be incredibly unhelpful behavior of a message broker and it makes distributed processing impractical. It also renders the latency metrics of this connector meaningless because the result of System.nanoTime()
is not related to synchronized system or wall-clock time and can thus only be used to measure the duration between two results that were produced on the same machine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scratch my previous comment, you are right. The callbacks are received in the same client that sent the initial request. And I could keep the full record then to emit it when the callback arrives.
For instance, currently we actually keep a timestamp of when the message is sent.
Let me think of this, and I will send more changes in an upcoming PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I did some digging and found a few undocumented JCSMP methods which are used by the new Java API to make the resource ownership flow a little more like what I was expecting.
The methods are marked as @SolReserved
on Jcsmpsession
and are named createProducer
. If I understand correctly, a session owns the outbound connection to Solace and a producer uses that resource to produce messages. However, the documented getProducer
method specifies that it will replace a previously returned producer, but I'd have to validate that.
The ownership and lifetime model that you probably want here is one where there's a shared, long-lived session per unique transform in a static variable, some shared map keyed on the transform configuration, and a producer per instance of the transform all multiplexed on the shared session. Every writer instance needs its own producer since the callbacks are passed as construction arguments to the producer, that's the only way to ensure that responses are routed to the originating requestor.
Could you check if repeated calls to getProducer
invalidate prior return values like the docs say? If they do, then we'll likely need to use createProducer
and verify with Solace that it's ok to call undocumented public methods on their classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The connector guarantees that if there is already a producer, it is reused. If the producer is closed, a new one is created. The connector only closes producers when a worker is destroyed.
The receiver that gets all the callbacks calls is the same for all the producers (static variable, which I actually have to change), so even if there is a new producer, as long as it is in the same worker, it will receive the same callbacks.
The static variable approach is actually buggy, two instances of the Write connector will use the same instance. I am changing it to a single instance receiving the callbacks per instance of the Write transform. But the same principle applies: all the callbacks are always received in the same object, and the producers are always reused for as long as Solace permits (and closed if the worker is destroyed).
return 0; | ||
}; | ||
|
||
retryCallableManager.retryCallable(publish, ImmutableSet.of(JCSMPException.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's also an existing dependency on Failsafe (https://github.com/failsafe-lib/failsafe) in Beam, which I'd probably recommend using.
.../solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
Outdated
Show resolved
Hide resolved
.../solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
Show resolved
Hide resolved
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java
Outdated
Show resolved
Hide resolved
if (queue != null) { | ||
builder = builder.queueName(queue.getName()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this change from throwing to checking?
Also, given that AutoValue implementations are immutable, deriving from a class that allows you to mutate the queue name seems to run counter to what the framework provides. SessionServiceFactory should probably be an interface declaring create and optionally define an abstract class that implements the interface (abstract of course) and declares equals and hashCode abstract if your intention is to have all implementors explicitly override those methods for value equality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class was initially used for the Read
connector alone, which requires that queue
property to be set, and should fail if it is not set.
The Write
connector does not need a pre-existing queue
. I changed the queueName
parameter to @Nullable
and this to a check since now the queueName
value is optional.
Initially, there were different classes for SessionService
-ish for both connectors, with different properties. Now we use a single class, for consistency across the Read
and Write
connectors.
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java
Outdated
Show resolved
Hide resolved
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java
Outdated
Show resolved
Hide resolved
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java
Outdated
Show resolved
Hide resolved
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java
Outdated
Show resolved
Hide resolved
.../java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/PublishResultHandler.java
Outdated
Show resolved
Hide resolved
The transform into millis is done at the presentation moment, when the metric is reported to Beam.
A couple of DoFns are moved to their own files too, as the abstract class forthe UnboundedSolaceWriter was in practice a "package". This commits addresses a few comments about the structure of UnboundedSolaceWriter and some base classes of that abstract class.
This DoFn is a stateful DoFn to force a shuffling with a given input key set cardinality.
The warnings are only shown if the user decided to set the properties that are overriden by the connector. This was changed in one of the previous commits but it is actually a bug. I am reverting that change and changing this to a switch block, to make it more clear that the properties need to be set always by the connector.
This lets the user to fully control all the properties used by the connector, instead of making sensible choices on its behalf. This also adds some logging to be more explicit about what the connector is doing. This does not add too much logging pressure, this only adds logging at the producer creation moment.
I forgot to pass the submission mode when the write session is created, and I called the wrong method in the base class because it was defined as public. This makes sure that the submission mode is passed to the session when the session is created for writing messages.
…dle pipelines with multiple write transforms.
ff7bb14
to
4dddd33
Compare
Hi @Abacn, would you mind merging this PR? It was reviewed in detail by @sjvanrossum :) |
This is a follow-up PR to #31953, and part of the issue #31905.
This PR adds the actual writer functionality, and some additional testing, including integration testing.
This should be final PR for the SolaceIO write connector to be complete.
This PR fixes #31905.