Skip to content

Commit

Permalink
extracted subscription code
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Oct 27, 2020
1 parent f4886bd commit 39f885b
Showing 1 changed file with 21 additions and 17 deletions.
38 changes: 21 additions & 17 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,23 +154,7 @@ export class TaskPollingLifecycle {
elasticsearchAndSOAvailability$.subscribe((areESAndSOAvailable) => {
if (areESAndSOAvailable && !this.isStarted) {
// start polling for work
this.pollingSubscription = poller$
.pipe(
tap(
mapErr((error: PollingError<string>) => {
if (error.type === PollingErrorType.RequestCapacityReached) {
pipe(
error.data,
mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error))))
);
}
this.logger.error(error.message);
})
)
)
.subscribe((event: Result<FillPoolResult, PollingError<string>>) => {
this.emitEvent(asTaskPollingCycleEvent<string>(event));
});
this.pollingSubscription = this.subscribeToPoller(poller$);
} else if (!areESAndSOAvailable && this.isStarted) {
this.pollingSubscription.unsubscribe();
this.pool.cancelRunningTasks();
Expand Down Expand Up @@ -206,6 +190,26 @@ export class TaskPollingLifecycle {
return !this.pollingSubscription.closed;
}

private subscribeToPoller(poller$: Observable<Result<FillPoolResult, PollingError<string>>>) {
return poller$
.pipe(
tap(
mapErr((error: PollingError<string>) => {
if (error.type === PollingErrorType.RequestCapacityReached) {
pipe(
error.data,
mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error))))
);
}
this.logger.error(error.message);
})
)
)
.subscribe((event: Result<FillPoolResult, PollingError<string>>) => {
this.emitEvent(asTaskPollingCycleEvent<string>(event));
});
}

private pollForWork = async (...tasksToClaim: string[]): Promise<FillPoolResult> => {
return fillPool(
// claim available tasks
Expand Down

0 comments on commit 39f885b

Please sign in to comment.