-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: return topic subscribe error immediately if subscription limit reached #62
Conversation
Future<TopicSubscribeResponse> subscribe(String cacheName, String topicName, | ||
{Int64? resumeAtTopicSequenceNumber}) async { | ||
var request = SubscriptionRequest_(); | ||
request.cacheName = cacheName; | ||
request.topic = topicName; | ||
request.resumeAtTopicSequenceNumber = | ||
resumeAtTopicSequenceNumber ?? Int64(0); | ||
try { | ||
var stream = _grpcManager.client.subscribe(request); | ||
return TopicSubscription(stream, request.resumeAtTopicSequenceNumber, | ||
this, cacheName, topicName); | ||
final subscription = TopicSubscription(stream, | ||
request.resumeAtTopicSequenceNumber, this, cacheName, topicName); | ||
|
||
try { | ||
await subscription.init(); | ||
return subscription; | ||
} catch (e) { | ||
rethrow; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
had to make subscribe an async function in order to run the init method that checks for the first message being a heartbeat
keepAlive: ClientKeepAliveOptions( | ||
pingInterval: Duration(seconds: 10), | ||
timeout: Duration(seconds: 5)))); | ||
_channel = ClientChannel(credentialProvider.cacheEndpoint); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the keepalive settings as it seemed to interfere with the default retry strategy dart-grpc already implements (seems to completely close the grpc channel without any possibility of reconnecting using the generated pubsub client)
|
||
TopicSubscription(this._stream, this.lastSequenceNumber, this._client, | ||
this.cacheName, this.topicName); | ||
this.cacheName, this.topicName) { | ||
_broadcastStream = _stream.asBroadcastStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
creating a broadcast stream allows for multiple subscribers to consume messages from the stream.
In this case, there's one consumer in the init method to check if the first message is a heartbeat and the other consumer is the end user
@@ -14,7 +14,7 @@ dependencies: | |||
logging: ^1.2.0 | |||
protobuf: ^3.1.0 | |||
string_validator: ^1.0.2 | |||
uuid: ^4.3.1 | |||
uuid: ^4.2.2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flutter demo app couldn't import the momento package because flutter uses uuid < 4.3.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmmm, makes me question how dart handles transient deps, and if our other deps could cause a similar issue with other programs. I think is OK for now, but something to keep an eye out on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
```bash | ||
flutter devices | ||
flutter run -d <device_id> --dart-define="MOMENTO_API_KEY=<your-api-key>" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🦾
@@ -14,7 +14,7 @@ dependencies: | |||
logging: ^1.2.0 | |||
protobuf: ^3.1.0 | |||
string_validator: ^1.0.2 | |||
uuid: ^4.3.1 | |||
uuid: ^4.2.2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmmm, makes me question how dart handles transient deps, and if our other deps could cause a similar issue with other programs. I think is OK for now, but something to keep an eye out on
addresses #55
Checks that the first message is a heartbeat before returning a TopicSubscription or TopicSubscribeError response. Now, if a user tries to subscribe >100 times, they will get a TopicSubscribeError saying they've reached the 100 subscriber limit.
Also did some testing and determined that the keepalive settings seemed to be interfering with the ability to reconnect. Keepalive settings were not playing well with the default backoff and retry strategy that dart-grpc uses, so removed them for now after confirming retries still work.
Before (with keepalive settings):
After (without keepalive settings):