Skip to content

Commit

Permalink
Merge commit '0773a8a' into jribbink/sdk-subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
jribbink committed Nov 26, 2024
2 parents f2c9f79 + 0773a8a commit a341371
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 28 deletions.
21 changes: 8 additions & 13 deletions packages/transport-http/src/subscribe/subscription-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import {
SubscriptionDataMessage,
UnsubscribeMessageRequest,
} from "./models"
import {SubscriptionManager} from "./subscription-manager"
import {
SubscriptionManager,
SubscriptionManagerConfig,
} from "./subscription-manager"
import {SdkTransport} from "@onflow/typedefs"

jest.mock("./websocket", () => ({
Expand All @@ -25,10 +28,8 @@ describe("WsSubscriptionTransport", () => {
})

test("does not connect to the socket when no subscriptions are made", async () => {
const config = {
const config: SubscriptionManagerConfig = {
node: "wss://localhost:8080",
reconnectInterval: 1000,
reconnectAttempts: 10,
}

new SubscriptionManager(config)
Expand All @@ -38,10 +39,8 @@ describe("WsSubscriptionTransport", () => {
})

test("disconnects from the socket when the last subscription is removed", async () => {
const config = {
const config: SubscriptionManagerConfig = {
node: "wss://localhost:8080",
reconnectInterval: 1000,
reconnectAttempts: 10,
}
const streamController = new SubscriptionManager(config)
const topic = "topic" as SdkTransport.SubscriptionTopic
Expand Down Expand Up @@ -90,10 +89,8 @@ describe("WsSubscriptionTransport", () => {
})

test("subscribes, receives data, and unsubscribes", async () => {
const config = {
const config: SubscriptionManagerConfig = {
node: "wss://localhost:8080",
reconnectInterval: 1000,
reconnectAttempts: 10,
}
const streamController = new SubscriptionManager(config)
const topic = "topic" as SdkTransport.SubscriptionTopic
Expand Down Expand Up @@ -162,10 +159,8 @@ describe("WsSubscriptionTransport", () => {
})

test("reconnects to stream on close", async () => {
const config = {
const config: SubscriptionManagerConfig = {
node: "wss://localhost:8080",
reconnectInterval: 1000,
reconnectAttempts: 1,
}
const streamController = new SubscriptionManager(config)
const topic = "topic" as SdkTransport.SubscriptionTopic
Expand Down
54 changes: 39 additions & 15 deletions packages/transport-http/src/subscribe/subscription-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import * as logger from "@onflow/util-logger"

const WS_OPEN = 1

type DeepRequired<T> = Required<{
[K in keyof T]: DeepRequired<T[K]>
}>

interface SubscriptionInfo<T extends SdkTransport.SubscriptionTopic> {
// Internal ID for the subscription
id: number
Expand All @@ -30,35 +34,49 @@ interface SubscriptionInfo<T extends SdkTransport.SubscriptionTopic> {
onError: (error: Error) => void
}

interface WsTransportConfig {
export interface SubscriptionManagerConfig {
/**
* The URL of the node to connect to
*/
node: string
/**
* Starting interval for reconnection attempts in milliseconds, exponential backoff is applied
* @default 500
*/
reconnectInterval?: number
/**
* The number of reconnection attempts before giving up
* @default 5
* Options for reconnecting to the server
*/
reconnectAttempts?: number
reconnectOptions?: {
/**
* The initial delay in milliseconds before reconnecting
* @default 500
*/
initialReconnectDelay?: number
/**
* The maximum number of reconnection attempts
* @default 5
*/
reconnectAttempts?: number
/**
* The maximum delay in milliseconds between reconnection attempts
* @default 5000
*/
maxReconnectDelay?: number
}
}

export class SubscriptionManager {
private counter = 0
private subscriptions: SubscriptionInfo<SdkTransport.SubscriptionTopic>[] = []
private socket: WebSocket | null = null
private config: Required<WsTransportConfig>
private config: DeepRequired<SubscriptionManagerConfig>
private reconnectAttempts = 0

constructor(config: WsTransportConfig) {
constructor(config: SubscriptionManagerConfig) {
this.config = {
reconnectInterval: 500,
reconnectAttempts: 5,
...config,
reconnectOptions: {
initialReconnectDelay: 500,
reconnectAttempts: 5,
maxReconnectDelay: 5000,
...config.reconnectOptions,
},
}
}

Expand Down Expand Up @@ -129,7 +147,9 @@ export class SubscriptionManager {
})

// Validate the number of reconnection attempts
if (this.reconnectAttempts >= this.config.reconnectAttempts) {
if (
this.reconnectAttempts >= this.config.reconnectOptions.reconnectAttempts
) {
logger.log({
level: logger.LEVELS.error,
title: "WebSocket Error",
Expand Down Expand Up @@ -288,6 +308,10 @@ export class SubscriptionManager {
* @returns The backoff interval in milliseconds
*/
private get backoffInterval() {
return this.config.reconnectInterval * (this.reconnectAttempts ^ 2)
return Math.min(
this.config.reconnectOptions.maxReconnectDelay,
this.config.reconnectOptions.initialReconnectDelay *
2 ** this.reconnectAttempts
)
}
}

0 comments on commit a341371

Please sign in to comment.