-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add configurable initial position for orphaned stream #853
Conversation
* | ||
* <p>Default value: {@link InitialPositionInStream#LATEST}</p> | ||
*/ | ||
private InitialPositionInStreamExtended orphanedStreamInitialPositionInStream = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking this can be part of the MultiStreamTracker interface, so that multistream specific overrides can be in a single place. Do you have any concerns? If we do that, then you can directly get the default initial position at line 230 of Scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on placement of config
6c371c2
to
f468b04
Compare
@@ -152,8 +157,8 @@ | |||
private Checkpointer checkpoint; | |||
@Mock | |||
private WorkerStateChangeListener workerStateChangeListener; | |||
@Mock | |||
private MultiStreamTracker multiStreamTracker; | |||
@Spy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't this be a pure mock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to test the default implementation of the orphanedStreamInitialPositionInStream
method we should use Spy
so we can mock out other methods (as needed by other tests) but call orphanedStreamInitialPositionInStream
directly?
The alternative would be to use Mock
and stub out the orphanedStreamInitialPositionInStream
method to return LATEST
by default. But then the test would be decoupled from the default implementation.
|
||
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) | ||
.retrievalFactory(retrievalFactory); | ||
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Do we need to spy this? It doesn't look like we're stubbing anything here.
- It's a bad practice to be mocking/spying the class under test - the point of unit tests is to test the class as a black box using its public methods; if we have to stub it then it's a sign we need to refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, removed.
@@ -588,13 +590,13 @@ private boolean isLeader() { | |||
(streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS); | |||
} | |||
|
|||
private void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) { | |||
@VisibleForTesting void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This annotation is a hack/usually a sign that we should refactor. If we can avoid introducing more methods w/ this annotation I would prefer it; is it possible to write a unit test just using .run()
?
* | ||
* <p>Default value: {@link InitialPositionInStream#LATEST}</p> | ||
*/ | ||
default InitialPositionInStreamExtended orphanedStreamInitialPositionInStream() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should return Optional<InitialPositionInStreamExtended>
here instead? In the case that a lease table is populated by stale streams and customer no longer has access to them and customer doesn't specify it in their list of streams, behavior will change from not reading at all to Scheduler
wasting a bunch of cycles getting 400's from auth.
It's a corner-corner case that I doubt many people will run into, but it's worth considering making this an opt-in feature instead of an opt-out feature, as most customers will not hesitate to upgrade to the latest version of KCL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t think we are changing the behavior here? We used to assign a default LATEST
position for any stream that is in lease table but not specified by client, and I think this default behavior is still the same.
For this refactoring we are just moving the default position from Scheduler
into MultiStreamTracker
interface.
Since orphanedStreamInitialPositionInStream
is a default method, customers doesn’t have to change existing implementation when upgrading, but now they can specify a custom position to start for any orphaned stream by implementing the method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yeah this is correct. I left this comment before seeing the changes in Scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left comments
* Add default `orphanedStreamInitialPositionInStream` in `MultiStreamTracker`, so a custom initial position can be passed to Scheduler to initialize the orphaned stream's config * Renamed `getDefaultStreamConfig` to `getOrphanedStreamConfig` * Refactored `SchedulerTest` setup and implement `TestMultiStreamTracker` to test `MultiStreamTracker` interface default methods. Note that this is a workaround for using mockito 1.x to test default interface methods. mockito >= 2.7.13 supports Spy on interface directly, which can be used to test default methods without implementing a concrete class. However, mockito 2.x has a number of breaking changes, so future work will be needed to refactor unit tests and upgrade to mockito >= 2.7.13 Signed-off-by: Rex Chen <[email protected]>
f468b04
to
1feaebc
Compare
orphanedStreamInitialPositionInStream
inMultiStreamTracker
, so a custom initial position can be passed to Scheduler to initialize the orphaned stream's configgetDefaultStreamConfig
togetOrphanedStreamConfig
SchedulerTest
setup and implementTestMultiStreamTracker
to testMultiStreamTracker
interface default methods. Note that this is a workaround for using mockito 1.x to test default interface methods. mockito >= 2.7.13 supports Spy on interface directly, which can be used to test default methods without implementing a concrete class. However, mockito 2.x has a number of breaking changes, so future work will be needed to refactor unit tests and upgrade to mockito >= 2.7.13Signed-off-by: Rex Chen [email protected]
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.