Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pool Manager: limit, pause and cancel requests #96

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a40bd9b
Add PoolManager
vixez Oct 28, 2021
38c1152
Update changelog and readme
vixez Oct 28, 2021
c6ab04e
Cleanup
vixez Oct 28, 2021
01775b9
Remove kSkipPoolHeader from requests
vixez Oct 28, 2021
ed1a8f3
Move poolResource to _attemptRequest
vixez Nov 30, 2021
8ec1f6a
Expose tokenUpdateRequested
vixez Dec 3, 2021
ad57084
fix: _tokenUpdateRequested should not default to true
vixez Dec 9, 2021
bb762bb
feat: events + manual release + pause
vixez Dec 16, 2021
ab33bf8
merge
vixez Jan 10, 2022
55ff368
fix: add missing fields in copyWith
vixez Jan 10, 2022
7c8e25f
chore: cleanup
vixez Jan 10, 2022
c8d6e84
test: updated tests
vixez Jan 10, 2022
dc5fa8f
Merge branch 'fix/copywith-missing-fields' into feat/2.0-beta-update
vixez Jan 10, 2022
67ee2e7
ver: 2.0.0-beta.3
vixez Jan 10, 2022
291e9c7
Merge branch 'fix/copywith-missing-fields' into feat/2.0-beta-update
vixez Jan 10, 2022
9ef43ca
feat: _retryCount Map + catch exception in retry exception
vixez Jan 11, 2022
ecb67b3
feat: add request to shouldAttemptRetryOnException
vixez Jan 13, 2022
81d7fb1
test: update test
vixez Jan 13, 2022
05322fa
chore: cleanup
vixez Jan 13, 2022
18d2ca8
example: update shouldAttemptRetryOnException in example
vixez Jan 13, 2022
22480d8
Merge branch 'main' into improvement/retry-exception-add-request
vixez Jan 13, 2022
6ab940b
docs: updated changelog
vixez Jan 13, 2022
4f93fb8
ver: 2.0.0-beta.4
vixez Jan 13, 2022
03957a6
Merge branch 'improvement/retry-exception-add-request' into feature/p…
vixez Jan 13, 2022
3427f76
feat: cancel all active requests
vixez Feb 7, 2022
a68a001
Merge branch 'main' into feature/pool-manager
vixez Feb 7, 2022
9729c68
feat: throwCanceledException
vixez Feb 7, 2022
8bf3a1f
fix: release pool request when this had already been done
vixez Mar 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example/lib/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:developer';
import 'dart:io';

import 'package:flutter/material.dart';
import 'package:http/http.dart';
import 'package:http_interceptor/http_interceptor.dart';
import 'package:shared_preferences/shared_preferences.dart';

Expand Down
2 changes: 2 additions & 0 deletions example/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ dependencies:
http_interceptor:
path: ../
shared_preferences: ^2.0.6
http: ^0.13.4


dev_dependencies:
flutter_test:
Expand Down
1 change: 1 addition & 0 deletions lib/exceptions/exceptions.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export 'package:http_interceptor/exceptions/request_exceptions.dart';
11 changes: 11 additions & 0 deletions lib/exceptions/request_exceptions.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import 'package:http/http.dart';

class RequestCancelledException implements Exception {
final BaseRequest request;

RequestCancelledException(this.request);

String toString() {
return 'The request has been cancelled (${request.url})';
}
}
131 changes: 118 additions & 13 deletions lib/http/intercepted_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';

import 'package:async/async.dart';
import 'package:http/http.dart';
import 'package:http_interceptor/extensions/extensions.dart';
import 'package:http_interceptor/managers/pool_manager.dart';
import 'package:http_interceptor/models/models.dart';
import 'package:pool/pool.dart';

import '../exceptions/exceptions.dart';
import 'http_methods.dart';
import 'interceptor_contract.dart';

Expand Down Expand Up @@ -38,6 +42,8 @@ import 'interceptor_contract.dart';
/// Don't forget to close the client once you are done, as a client keeps
/// the connection alive with the server by default.
class InterceptedClient extends BaseClient {
static const String kSkipPoolHeader = 'Intercepted-Client-Skip-Pool-Manager';

/// List of interceptors that will be applied to the requests and responses.
final List<InterceptorContract> interceptors;

Expand All @@ -48,13 +54,17 @@ class InterceptedClient extends BaseClient {
/// retry. This is useful for implementing JWT token expiration
final RetryPolicy? retryPolicy;

int _retryCount = 0;
/// Manage the requests in a Pool.
PoolManager? poolManager;

Map<BaseRequest, int> _retryCount = {};
late Client _inner;

InterceptedClient._internal({
required this.interceptors,
this.requestTimeout,
this.retryPolicy,
this.poolManager,
Client? client,
}) : _inner = client ?? Client();

Expand All @@ -80,12 +90,14 @@ class InterceptedClient extends BaseClient {
Duration? requestTimeout,
RetryPolicy? retryPolicy,
Client? client,
PoolManager? poolManager,
}) =>
InterceptedClient._internal(
interceptors: interceptors,
requestTimeout: requestTimeout,
retryPolicy: retryPolicy,
client: client,
poolManager: poolManager,
);

@override
Expand Down Expand Up @@ -205,11 +217,25 @@ class InterceptedClient extends BaseClient {

@override
Future<StreamedResponse> send(BaseRequest request) async {
final response = await _attemptRequest(request);
PoolResource? poolResource = await _addToRequestPool(request.headers);
request.headers.remove(kSkipPoolHeader);

final interceptedResponse = await _interceptResponse(response);
try {
final interceptedRequest = await _interceptRequest(request);

return interceptedResponse as StreamedResponse;
final response = await _cancellableSendRequest(
interceptedRequest,
poolResource,
);

final interceptedResponse = await _interceptResponse(response);

await _releasePoolRequest(poolResource);
return interceptedResponse as StreamedResponse;
} catch (_) {
await _releasePoolRequest(poolResource);
rethrow;
}
}

Future<BaseResponse> _sendUnstreamed({
Expand Down Expand Up @@ -237,7 +263,8 @@ class InterceptedClient extends BaseClient {
}
}

var response = await _attemptRequest(request);
PoolResource? poolResource = await _addToRequestPool(request.headers);
var response = await _cancellableAttemptRequest(request, poolResource);

// Intercept response
response = await _interceptResponse(response);
Expand All @@ -256,7 +283,14 @@ class InterceptedClient extends BaseClient {

/// Attempts to perform the request and intercept the data
/// of the response
Future<BaseResponse> _attemptRequest(BaseRequest request) async {
Future<BaseResponse> _attemptRequest(
BaseRequest request, PoolResource? poolResource) async {
request.headers.remove(kSkipPoolHeader);

if (!_retryCount.containsKey(request)) {
_retryCount[request] = 0;
}

BaseResponse response;
try {
// Intercept request
Expand All @@ -270,26 +304,75 @@ class InterceptedClient extends BaseClient {
request is Request ? await Response.fromStream(stream) : stream;

if (retryPolicy != null &&
retryPolicy!.maxRetryAttempts > _retryCount &&
retryPolicy!.maxRetryAttempts > _retryCount[request]! &&
await retryPolicy!.shouldAttemptRetryOnResponse(response)) {
_retryCount += 1;
return _attemptRequest(request);
_retryCount[request] = _retryCount[request]! + 1;
response = await _attemptRequest(request, poolResource);
return response;
}
} on Exception catch (error) {
if (error is RequestCancelledException) {
_retryCount.remove(request);
await _releasePoolRequest(poolResource);
rethrow;
}
if (retryPolicy != null &&
retryPolicy!.maxRetryAttempts > _retryCount &&
retryPolicy!.maxRetryAttempts > _retryCount[request]! &&
retryPolicy!.shouldAttemptRetryOnException(error, request)) {
_retryCount += 1;
return _attemptRequest(request);
_retryCount[request] = _retryCount[request]! + 1;
try {
response = await _attemptRequest(request, poolResource);
return response;
} on Exception catch (_) {
_retryCount.remove(request);
rethrow;
}
} else {
_retryCount.remove(request);
await _releasePoolRequest(poolResource);
rethrow;
}
}

_retryCount = 0;
_retryCount.remove(request);
await _releasePoolRequest(poolResource);
return response;
}

/// Attempt a request, but allow it to be cancelled.
Future<BaseResponse> _cancellableAttemptRequest(
BaseRequest request, PoolResource? poolResource) async {
CancelableCompleter completer = CancelableCompleter();

poolManager?.addCancelableRequest(completer.operation);

completer.complete(_attemptRequest(request, poolResource));
return await completer.operation.valueOrCancellation().then((value) {
poolManager?.removeCancelableRequest(completer.operation);
if (completer.isCanceled && poolManager?.throwCanceledException == true) {
throw RequestCancelledException(request);
}
return value;
});
}

/// Attempt a send request, but allow it to be cancelled.
Future<StreamedResponse> _cancellableSendRequest(
BaseRequest request, PoolResource? poolResource) async {
CancelableCompleter completer = CancelableCompleter();

poolManager?.addCancelableRequest(completer.operation);

completer.complete(_inner.send(request));
return await completer.operation.valueOrCancellation().then((value) {
poolManager?.removeCancelableRequest(completer.operation);
if (completer.isCanceled && poolManager?.throwCanceledException == true) {
throw RequestCancelledException(request);
}
return value;
});
}

/// This internal function intercepts the request.
Future<BaseRequest> _interceptRequest(BaseRequest request) async {
BaseRequest interceptedRequest = request.copyWith();
Expand Down Expand Up @@ -317,4 +400,26 @@ class InterceptedClient extends BaseClient {
void close() {
_inner.close();
}

/// Add a new request to the pool.
/// If [kSkipPoolHeader] is found in the headers, the pool is skipped so
/// the request is executed immediately.
Future<PoolResource?> _addToRequestPool(Map<String, String>? headers) async {
if (headers?.keys.contains(kSkipPoolHeader) == true) {
return null;
}

if (poolManager == null) {
return null;
}
return await poolManager?.request();
}

/// Release a [PoolRequest]
Future<void> _releasePoolRequest(PoolResource? poolResource) async {
if (poolResource == null) {
return;
}
await poolManager?.release(poolResource);
}
}
1 change: 1 addition & 0 deletions lib/http/intercepted_http.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';

import 'package:http/http.dart';
import 'package:http_interceptor/http_interceptor.dart';

/// Class to be used by the user as a replacement for 'http' with interceptor
Expand Down
3 changes: 2 additions & 1 deletion lib/http_interceptor.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
library http_interceptor;

export 'package:http/http.dart';
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best if we expose http package along with this package so we can avoid having to install manually http when using http_interceptor.

export 'package:http_interceptor/exceptions/exceptions.dart';
export 'package:http_interceptor/extensions/extensions.dart';
export 'package:http_interceptor/http/http.dart';
export 'package:http_interceptor/managers/managers.dart';
export 'package:http_interceptor/models/models.dart';
1 change: 1 addition & 0 deletions lib/managers/managers.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export 'pool_manager.dart';
Loading