-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-13402] Simplify PubsubLiteSink #16215
Conversation
This also makes it not afflicted by futures never terminating by avoiding indefinite waits
|
||
/** A map of working publishers by PublisherOptions. */ | ||
class PublisherCache implements AutoCloseable { | ||
private final Logger logger = LoggerFactory.getLogger(PublisherCache.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we usually make loggers static and call them LOG:
private final Logger logger = LoggerFactory.getLogger(PublisherCache.class); | |
private static final Logger LOG = LoggerFactory.getLogger(PublisherCache.class); |
❯ grep -iIrn LoggerFactory\.getLogger sdks/java | grep private | cut -d' ' -f2- | cut -d = -f-1 | awk '{$1=$1};1' | sort | uniq -c | sort -n
1 private final Logger log
1 private static final org.slf4j.Logger LOG
263 private static final Logger LOG
} | ||
}, | ||
SystemExecutors.getFuturesExecutor()); | ||
runState.publish(message); | ||
} | ||
|
||
// Intentionally don't flush on bundle finish to allow multi-sink client reuse. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment accurate? It looks like both the old and new implementation are flushing on bundle finish
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neither is explicitly calling flush() on the client since that would also block on the completion of other SDF instances' messages.
listener.failed(null, new CheckedApiException(Code.INTERNAL).underlying); | ||
future.set(MessageMetadata.of(Partition.of(1), Offset.of(2))); | ||
executorFuture.get(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this testing a case that we need to be concerned about? Why don't we need the Publisher.addListener logic in PubsubLiteSink anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the publisher is managed by PublisherCache. When the publisher is failed, the publish() call will just throw, causing the bundle to be retried.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the publisher is managed by PublisherCache. When the publisher is failed, the publish() call will just throw, causing the bundle to be retried.
Can we modify the test to mock PublisherCache etc. and check that behavior then, instead of deleting it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, this behavior is already tested elsewhere.
futures.add(publisher.publish(Message.fromProto(message))); | ||
} | ||
|
||
void waitForDone() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might consider adding some logging or wrapping the exceptions thrown here with more helpful messages, rather than bubbling up all exceptions, e.g. "timeout waiting for PubsubLite messages to be published".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is important that if any exception is thrown, the bundle closure fails, since the publishes need to be retried. I could add logging, but the exception still needs to be rethrown.
run java postcommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
listener.failed(null, new CheckedApiException(Code.INTERNAL).underlying); | ||
future.set(MessageMetadata.of(Partition.of(1), Offset.of(2))); | ||
executorFuture.get(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, this behavior is already tested elsewhere.
This also makes it not afflicted by futures never terminating by avoiding indefinite waits
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
ValidatesRunner
compliance status (on master branch)Examples testing status on various runners
Post-Commit SDK/Transform Integration Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.