-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cache refresh for container recreate with same id #12747
Changes from 4 commits
6ec58af
ce4b6f1
638745a
6d609c6
98495c2
6386495
88d6569
e2a20f9
e89fed9
5600b94
cd39ceb
a92506d
306a151
4dc3ccc
5519d06
3a6fe71
472abc7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,4 +44,21 @@ static public <T> Mono<T> inlineIfPossibleAsObs(Callable<Mono<T>> function, IRet | |
return BackoffRetryUtility.executeRetry(() -> function.call(), retryPolicy); | ||
} | ||
} | ||
|
||
static public <T> Flux<T> fluxInlineIfPossibleAsObs(Callable<Flux<T>> function, IRetryPolicy retryPolicy) { | ||
|
||
if (retryPolicy == null) { | ||
// shortcut | ||
return Flux.defer(() -> { | ||
try { | ||
return function.call(); | ||
} catch (Exception e) { | ||
return Mono.error(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh haha, yea you are right, I just pushed a fix for it, thanks~ copy paste error |
||
} | ||
}); | ||
|
||
} else { | ||
return BackoffRetryUtility.fluxExecuteRetry(() -> function.call(), retryPolicy); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -560,18 +560,39 @@ private String parentResourceLinkToQueryLink(String parentResouceLink, ResourceT | |
} | ||
|
||
private <T extends Resource> Flux<FeedResponse<T>> createQuery( | ||
String parentResourceLink, | ||
String parentResourceLink, | ||
SqlQuerySpec sqlQuery, | ||
CosmosQueryRequestOptions options, | ||
Class<T> klass, | ||
ResourceType resourceTypeEnum) { | ||
|
||
String resourceLink = parentResourceLinkToQueryLink(parentResourceLink, resourceTypeEnum); | ||
UUID activityId = Utils.randomUUID(); | ||
IDocumentQueryClient queryClient = documentQueryClientImpl(RxDocumentClientImpl.this); | ||
|
||
InvalidPartitionExceptionRetryPolicy invalidPartitionExceptionRetryPolicy = new InvalidPartitionExceptionRetryPolicy( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I picked up the best places to wire up the invalid partitionExceptionRetryPolicy. Several thoughts and questions:
|
||
this.collectionCache, | ||
null, | ||
resourceLink, | ||
options); | ||
|
||
return ObservableHelper.fluxInlineIfPossibleAsObs( | ||
() -> createQueryInternal(resourceLink, sqlQuery, options, klass, resourceTypeEnum, queryClient, activityId), | ||
invalidPartitionExceptionRetryPolicy); | ||
} | ||
|
||
private <T extends Resource> Flux<FeedResponse<T>> createQueryInternal( | ||
String resourceLink, | ||
SqlQuerySpec sqlQuery, | ||
CosmosQueryRequestOptions options, | ||
Class<T> klass, | ||
ResourceType resourceTypeEnum) { | ||
ResourceType resourceTypeEnum, | ||
IDocumentQueryClient queryClient, | ||
UUID activityId) { | ||
|
||
String queryResourceLink = parentResourceLinkToQueryLink(parentResourceLink, resourceTypeEnum); | ||
|
||
UUID activityId = Utils.randomUUID(); | ||
IDocumentQueryClient queryClient = documentQueryClientImpl(RxDocumentClientImpl.this); | ||
Flux<? extends IDocumentQueryExecutionContext<T>> executionContext = | ||
DocumentQueryExecutionContextFactory.createDocumentQueryExecutionContextAsync(queryClient, resourceTypeEnum, klass, sqlQuery , options, queryResourceLink, false, activityId); | ||
DocumentQueryExecutionContextFactory.createDocumentQueryExecutionContextAsync(queryClient, resourceTypeEnum, klass, sqlQuery , options, resourceLink, false, activityId); | ||
|
||
return executionContext.flatMap(iDocumentQueryExecutionContext -> { | ||
QueryInfo queryInfo = null; | ||
if (iDocumentQueryExecutionContext instanceof PipelinedDocumentQueryExecutionContext) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WOW such a compact code. Equivalent .NET one is reasonably verbose.