-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: return topic subscribe error immediately if subscription limit reached #62
Changes from all commits
ae5d983
2875cef
7fa5434
3907044
9c4ebc4
81931bc
81e6ea6
278577c
5ac6dfd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,7 @@ abstract class AbstractPubsubClient { | |
Future<TopicPublishResponse> publish( | ||
String cacheName, String topicName, Value value); | ||
|
||
TopicSubscribeResponse subscribe(String cacheName, String topicName); | ||
Future<TopicSubscribeResponse> subscribe(String cacheName, String topicName); | ||
|
||
void close(); | ||
} | ||
|
@@ -61,20 +61,29 @@ class ClientPubsub implements AbstractPubsubClient { | |
} | ||
|
||
@override | ||
TopicSubscribeResponse subscribe(String cacheName, String topicName, | ||
{Int64? resumeAtTopicSequenceNumber}) { | ||
Future<TopicSubscribeResponse> subscribe(String cacheName, String topicName, | ||
{Int64? resumeAtTopicSequenceNumber}) async { | ||
var request = SubscriptionRequest_(); | ||
request.cacheName = cacheName; | ||
request.topic = topicName; | ||
request.resumeAtTopicSequenceNumber = | ||
resumeAtTopicSequenceNumber ?? Int64(0); | ||
try { | ||
var stream = _grpcManager.client.subscribe(request); | ||
return TopicSubscription(stream, request.resumeAtTopicSequenceNumber, | ||
this, cacheName, topicName); | ||
final subscription = TopicSubscription(stream, | ||
request.resumeAtTopicSequenceNumber, this, cacheName, topicName); | ||
|
||
try { | ||
await subscription.init(); | ||
return subscription; | ||
} catch (e) { | ||
rethrow; | ||
} | ||
Comment on lines
+64
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. had to make subscribe an async function in order to run the init method that checks for the first message being a heartbeat |
||
} catch (e) { | ||
if (e is GrpcError) { | ||
return TopicSubscribeError(grpcStatusToSdkException(e)); | ||
} else if (e is SdkException) { | ||
return TopicSubscribeError(e); | ||
} | ||
return TopicSubscribeError( | ||
UnknownException("Unexpected error: $e", null, null)); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,11 +9,7 @@ class TopicGrpcManager { | |
final _logger = Logger('MomentoTopicClient'); | ||
|
||
TopicGrpcManager(CredentialProvider credentialProvider) { | ||
_channel = ClientChannel(credentialProvider.cacheEndpoint, | ||
options: ChannelOptions( | ||
keepAlive: ClientKeepAliveOptions( | ||
pingInterval: Duration(seconds: 10), | ||
timeout: Duration(seconds: 5)))); | ||
_channel = ClientChannel(credentialProvider.cacheEndpoint); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed the keepalive settings as it seemed to interfere with the default retry strategy dart-grpc already implements (seems to completely close the grpc channel without any possibility of reconnecting using the generated pubsub client) |
||
_client = PubsubClient(_channel, | ||
options: CallOptions(metadata: { | ||
'authorization': credentialProvider.apiKey, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,9 +18,24 @@ class TopicSubscription implements TopicSubscribeResponse { | |
String topicName; | ||
bool retry = true; | ||
final logger = Logger("TopicSubscribeResponse"); | ||
late Stream _broadcastStream; | ||
|
||
TopicSubscription(this._stream, this.lastSequenceNumber, this._client, | ||
this.cacheName, this.topicName); | ||
this.cacheName, this.topicName) { | ||
_broadcastStream = _stream.asBroadcastStream(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. creating a broadcast stream allows for multiple subscribers to consume messages from the stream. |
||
} | ||
|
||
Future<void> init() async { | ||
await for (final firstItem in _broadcastStream) { | ||
if (firstItem.whichKind() != SubscriptionItem__Kind.heartbeat) { | ||
throw InternalServerException( | ||
"Expected heartbeat message for topic $topicName on cache $cacheName, got ${firstItem.whichKind()}", | ||
null, | ||
null); | ||
} | ||
break; | ||
} | ||
} | ||
|
||
Stream<TopicSubscriptionItemResponse> get stream { | ||
return _handleStream(); | ||
|
@@ -29,7 +44,7 @@ class TopicSubscription implements TopicSubscribeResponse { | |
Stream<TopicSubscriptionItemResponse> _handleStream() async* { | ||
while (retry) { | ||
try { | ||
await for (final msg in _stream) { | ||
await for (final msg in _broadcastStream) { | ||
var result = _processResult(msg); | ||
if (result != null) { | ||
yield result; | ||
|
@@ -39,20 +54,41 @@ class TopicSubscription implements TopicSubscribeResponse { | |
if (e is CancelledException || | ||
(e is GrpcError && e.codeName == "CANCELLED")) { | ||
logger.fine("Subscription was cancelled, not reconnecting"); | ||
await _stream.cancel(); | ||
retry = false; | ||
} else if (e is ClientResourceExhaustedException || | ||
(e is GrpcError && e.codeName == "RESOURCE_EXHAUSTED")) { | ||
logger.fine("Subscription limit reached, not reconnecting"); | ||
await _stream.cancel(); | ||
retry = false; | ||
} else { | ||
logger.fine("Attempting to reconnect after receiving error: $e"); | ||
var result = _client.subscribe(cacheName, topicName, | ||
resumeAtTopicSequenceNumber: lastSequenceNumber); | ||
if (result is TopicSubscription) { | ||
_stream = result._stream; | ||
lastSequenceNumber = result.lastSequenceNumber; | ||
} else if (result is TopicSubscribeError) { | ||
logger.fine("Error reconnecting: ${result.message}"); | ||
} | ||
} | ||
} | ||
|
||
if (retry) { | ||
logger.fine("retry is still true"); | ||
retry = await attemptReconnect(); | ||
} | ||
} | ||
} | ||
|
||
Future<bool> attemptReconnect() async { | ||
await _stream.cancel(); | ||
var result = await _client.subscribe(cacheName, topicName, | ||
resumeAtTopicSequenceNumber: lastSequenceNumber); | ||
if (result is TopicSubscription) { | ||
_stream = result._stream; | ||
_broadcastStream = result._broadcastStream; | ||
lastSequenceNumber = result.lastSequenceNumber; | ||
} else if (result is TopicSubscribeError) { | ||
logger.fine("Error reconnecting: ${result.message}"); | ||
if (result.errorCode == MomentoErrorCode.limitExceededError || | ||
result.errorCode == MomentoErrorCode.cancelledError) { | ||
return false; | ||
} | ||
} | ||
return true; | ||
} | ||
|
||
TopicSubscriptionItemResponse? _processResult(SubscriptionItem_ item) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,7 @@ dependencies: | |
logging: ^1.2.0 | ||
protobuf: ^3.1.0 | ||
string_validator: ^1.0.2 | ||
uuid: ^4.3.1 | ||
uuid: ^4.2.2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. flutter demo app couldn't import the momento package because flutter uses uuid < 4.3.0 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmmmm, makes me question how dart handles transient deps, and if our other deps could cause a similar issue with other programs. I think is OK for now, but something to keep an eye out on |
||
# path: ^1.8.0 | ||
|
||
dev_dependencies: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🦾