Skip to content

Commit

Permalink
Merge branch 'jribbink/subscription-manager' into jribbink/sdk-subscr…
Browse files Browse the repository at this point in the history
…iptions
  • Loading branch information
jribbink committed Nov 26, 2024
2 parents 6ef5343 + 206fa5d commit f2c9f79
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 48 deletions.
67 changes: 44 additions & 23 deletions packages/transport-http/src/subscribe/subscription-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
} from "./models"
import type {SdkTransport} from "@onflow/typedefs"
import {WebSocket} from "./websocket"
import * as logger from "@onflow/util-logger"

const WS_OPEN = 1

Expand All @@ -35,13 +36,13 @@ interface WsTransportConfig {
*/
node: string
/**
* The interval in milliseconds to wait before reconnecting
* @default 1000
* Starting interval for reconnection attempts in milliseconds, exponential backoff is applied
* @default 500
*/
reconnectInterval?: number
/**
* The number of reconnection attempts before giving up
* @default 10
* @default 5
*/
reconnectAttempts?: number
}
Expand All @@ -55,15 +56,15 @@ export class SubscriptionManager {

constructor(config: WsTransportConfig) {
this.config = {
reconnectInterval: 1000,
reconnectAttempts: 10,
reconnectInterval: 500,
reconnectAttempts: 5,
...config,
}
}

// Lazy connect to the socket when the first subscription is made
private async connect() {
return new Promise<void>(resolve => {
return new Promise<void>((resolve, reject) => {
// If the socket is already open, do nothing
if (this.socket?.readyState === WS_OPEN) {
return
Expand Down Expand Up @@ -92,8 +93,7 @@ export class SubscriptionManager {
void this.reconnect()
}
this.socket.onerror = e => {
console.error(`WebSocket error: ${e}`)
this.reconnect()
this.reconnect(e)
}

this.socket.onopen = () => {
Expand All @@ -103,14 +103,18 @@ export class SubscriptionManager {
const response = await this.sendSubscribe(sub)
sub.remoteId = response.id
})
).then(() => {
resolve()
})
)
.then(() => {
resolve()
})
.catch(e => {
reject(new Error(`Failed to restore subscriptions: ${e}`))
})
}
})
}

private async reconnect() {
private async reconnect(error?: any) {
// Clear the socket
this.socket = null

Expand All @@ -126,27 +130,36 @@ export class SubscriptionManager {

// Validate the number of reconnection attempts
if (this.reconnectAttempts >= this.config.reconnectAttempts) {
logger.log({
level: logger.LEVELS.error,
title: "WebSocket Error",
message: `Failed to reconnect to the server after ${this.reconnectAttempts + 1} attempts: ${error}`,
})

this.subscriptions.forEach(sub => {
sub.onError(
new Error(
`Failed to reconnect to the server after ${this.reconnectAttempts} attempts`
`Failed to reconnect to the server after ${this.reconnectAttempts + 1} attempts: ${error}`
)
)
})
this.subscriptions = []
this.reconnectAttempts = 0
return
}
} else {
logger.log({
level: logger.LEVELS.warn,
title: "WebSocket Error",
message: `WebSocket error, reconnecting in ${this.backoffInterval}ms: ${error}`,
})

// Delay the reconnection
await new Promise(resolve =>
setTimeout(resolve, this.config.reconnectInterval)
)
// Delay the reconnection
await new Promise(resolve => setTimeout(resolve, this.backoffInterval))

// Try to reconnect
this.reconnectAttempts++
await this.connect()
this.reconnectAttempts = 0
// Try to reconnect
this.reconnectAttempts++
await this.connect()
this.reconnectAttempts = 0
}
}

async subscribe<T extends SdkTransport.SubscriptionTopic>(opts: {
Expand Down Expand Up @@ -269,4 +282,12 @@ export class SubscriptionManager {
>(sub: SubscriptionInfo<T>, message: SubscriptionDataMessage) {
// TODO: Will be implemented with each subscription topic
}

/**
* Calculate the backoff interval for reconnection attempts
* @returns The backoff interval in milliseconds
*/
private get backoffInterval() {
return this.config.reconnectInterval * (this.reconnectAttempts ^ 2)
}
}
6 changes: 0 additions & 6 deletions packages/typedefs/src/sdk-transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,5 @@ export type Transport = {
subscribe: SubscribeFn
}

export type TransportConfig = {
node: string
}

export type TransportFactory = (config: TransportConfig) => Transport

export * from "./subscriptions"
export * from "./requests"
24 changes: 5 additions & 19 deletions packages/typedefs/src/sdk-transport/subscriptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,16 @@ type SchemaItem<TArgs, TData> = {
data: TData
}

// TODO: PLACEHOLDER - Replace with actual subscription topics
export enum SubscriptionTopic {
EVENTS = "events",
BLOCKS = "blocks",
PLACEHOLDER = "PLACEHOLDER",
}

export type SubscriptionSchema = {
[SubscriptionTopic.EVENTS]: SchemaItem<
[SubscriptionTopic.PLACEHOLDER]: SchemaItem<
{},
{
startBlock: number
endBlock: number
},
{
type: string
data: any
}
>
[SubscriptionTopic.BLOCKS]: SchemaItem<
{
startBlock: number
endBlock: number
},
{
type: string
data: any
placeholder: string
}
>
}
Expand Down

0 comments on commit f2c9f79

Please sign in to comment.