From 8c4f1d38cb63654c5dec7ba69fd6eb445c53f920 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Fri, 15 Dec 2023 12:56:10 -0500 Subject: [PATCH] fix(app-shell): use a wrapping stream for usb (#14214) Implement the tcp socket emulator by creating a separate stream that is piped through to the usb connection via its downward interface rather than using the same stream for both the port and the socket or pipelining the two streams together. This implementation allows the lifecycles of the USB port connection, which we want to be the same as the physical robot connection; and the tcp socket emulators, which we want to be more like tcp sockets; to be separate, which really increases reliability because we don't have the port going up and down all the time anymore. This reduces spurious disconnects. It will also allow us to add socket activity timeouts to help with the windows dropping data problem, though we can't really do that until the http requests that cause synchronous actions that we make get keep alive streaming responses. --- app-shell/src/usb.ts | 124 +++++------ usb-bridge/node-client/src/usb-agent.ts | 277 ++++++++++++++---------- 2 files changed, 225 insertions(+), 176 deletions(-) diff --git a/app-shell/src/usb.ts b/app-shell/src/usb.ts index 81d1afdade1..ee402093770 100644 --- a/app-shell/src/usb.ts +++ b/app-shell/src/usb.ts @@ -34,25 +34,50 @@ let usbFetchInterval: NodeJS.Timeout export function getSerialPortHttpAgent(): SerialPortHttpAgent | undefined { return usbHttpAgent } -export function createSerialPortHttpAgent(path: string): void { - const serialPortHttpAgent = new SerialPortHttpAgent({ - maxFreeSockets: 1, - maxSockets: 1, - maxTotalSockets: 1, - keepAlive: true, - keepAliveMsecs: Infinity, - path, - logger: usbLog, - timeout: 100000, - }) - usbHttpAgent = serialPortHttpAgent +export function createSerialPortHttpAgent( + path: string, + onComplete: (err: Error | null, agent?: SerialPortHttpAgent) => void +): void { + if (usbHttpAgent != null) { + onComplete( + new Error('Tried to make a USB http agent when one already existed') + ) + } else { + usbHttpAgent = new SerialPortHttpAgent( + { + maxFreeSockets: 1, + maxSockets: 1, + maxTotalSockets: 1, + keepAlive: true, + keepAliveMsecs: Infinity, + path, + logger: usbLog, + timeout: 100000, + }, + (err, agent?) => { + if (err != null) { + usbHttpAgent = undefined + } + onComplete(err, agent) + } + ) + } } -export function destroyUsbHttpAgent(): void { +export function destroyAndStopUsbHttpRequests(dispatch: Dispatch): void { if (usbHttpAgent != null) { usbHttpAgent.destroy() } usbHttpAgent = undefined + ipcMain.removeHandler('usb:request') + dispatch(usbRequestsStop()) + // handle any additional invocations of usb:request + ipcMain.handle('usb:request', () => + Promise.resolve({ + status: 400, + statusText: 'USB robot disconnected', + }) + ) } function isUsbDeviceOt3(device: UsbDevice): boolean { @@ -115,42 +140,11 @@ function pollSerialPortAndCreateAgent(dispatch: Dispatch): void { } usbFetchInterval = setInterval(() => { // already connected to an Opentrons robot via USB - if (getSerialPortHttpAgent() != null) { - return - } - usbLog.debug('fetching serialport list') - fetchSerialPortList() - .then((list: PortInfo[]) => { - const ot3UsbSerialPort = list.find( - port => - port.productId?.localeCompare(DEFAULT_PRODUCT_ID, 'en-US', { - sensitivity: 'base', - }) === 0 && - port.vendorId?.localeCompare(DEFAULT_VENDOR_ID, 'en-US', { - sensitivity: 'base', - }) === 0 - ) - - if (ot3UsbSerialPort == null) { - usbLog.debug('no OT-3 serial port found') - return - } - - createSerialPortHttpAgent(ot3UsbSerialPort.path) - // remove any existing handler - ipcMain.removeHandler('usb:request') - ipcMain.handle('usb:request', usbListener) - - dispatch(usbRequestsStart()) - }) - .catch(e => - // eslint-disable-next-line @typescript-eslint/restrict-template-expressions - usbLog.debug(`fetchSerialPortList error ${e?.message ?? 'unknown'}`) - ) + tryCreateAndStartUsbHttpRequests(dispatch) }, 10000) } -function startUsbHttpRequests(dispatch: Dispatch): void { +function tryCreateAndStartUsbHttpRequests(dispatch: Dispatch): void { fetchSerialPortList() .then((list: PortInfo[]) => { const ot3UsbSerialPort = list.find( @@ -165,17 +159,22 @@ function startUsbHttpRequests(dispatch: Dispatch): void { // retry if no OT-3 serial port found - usb-detection and serialport packages have race condition if (ot3UsbSerialPort == null) { - usbLog.debug('no OT-3 serial port found, retrying') - setTimeout(() => startUsbHttpRequests(dispatch), 1000) + usbLog.debug('no OT-3 serial port found') return } - - createSerialPortHttpAgent(ot3UsbSerialPort.path) - // remove any existing handler - ipcMain.removeHandler('usb:request') - ipcMain.handle('usb:request', usbListener) - - dispatch(usbRequestsStart()) + if (usbHttpAgent == null) { + createSerialPortHttpAgent(ot3UsbSerialPort.path, (err, agent?) => { + if (err != null) { + const message = err?.message ?? err + usbLog.error(`Failed to create serial port: ${message}`) + } + if (agent) { + ipcMain.removeHandler('usb:request') + ipcMain.handle('usb:request', usbListener) + dispatch(usbRequestsStart()) + } + }) + } }) .catch(e => // eslint-disable-next-line @typescript-eslint/restrict-template-expressions @@ -188,27 +187,18 @@ export function registerUsb(dispatch: Dispatch): (action: Action) => unknown { switch (action.type) { case SYSTEM_INFO_INITIALIZED: if (action.payload.usbDevices.find(isUsbDeviceOt3) != null) { - startUsbHttpRequests(dispatch) + tryCreateAndStartUsbHttpRequests(dispatch) } pollSerialPortAndCreateAgent(dispatch) break case USB_DEVICE_ADDED: if (isUsbDeviceOt3(action.payload.usbDevice)) { - startUsbHttpRequests(dispatch) + tryCreateAndStartUsbHttpRequests(dispatch) } break case USB_DEVICE_REMOVED: if (isUsbDeviceOt3(action.payload.usbDevice)) { - destroyUsbHttpAgent() - ipcMain.removeHandler('usb:request') - dispatch(usbRequestsStop()) - // handle any additional invocations of usb:request - ipcMain.handle('usb:request', () => - Promise.resolve({ - status: 400, - statusText: 'USB robot disconnected', - }) - ) + destroyAndStopUsbHttpRequests(dispatch) } break } diff --git a/usb-bridge/node-client/src/usb-agent.ts b/usb-bridge/node-client/src/usb-agent.ts index 62639f23796..b4a2bf933e2 100644 --- a/usb-bridge/node-client/src/usb-agent.ts +++ b/usb-bridge/node-client/src/usb-agent.ts @@ -1,6 +1,6 @@ import * as http from 'http' import agent from 'agent-base' -import type { Duplex } from 'stream' +import { Duplex } from 'stream' import { SerialPort } from 'serialport' @@ -110,38 +110,95 @@ export function createSerialPortListMonitor( return { start, stop } } -class SerialPortSocket extends SerialPort { - // added these to squash keepAlive errors - setKeepAlive(): void {} +interface SerialPortHttpAgentOptions extends AgentOptions { + path: string + logger: Logger +} - unref(): SerialPortSocket { - return this +function socketEmulatorFromPort(port: SerialPort): Socket { + // build a duplex stream to act as a socket that we can give to node https internals, linked + // to an open usb serial port. + // + // this is a separate stream rather than just passing in the port so that we can sever the + // lifetimes and lifecycles of the socket and the port. sockets want to be closed and opened all + // the time by node http internals, and we don't want that for the port since opening and closing it + // can take a while. this lets us open and close and create and destroy sockets at will while not + // affecting the port. + + // unfortunately, because we need to sever the lifecycles, we can't use node stream pipelining + // since half the point of node stream pipelining is to link stream lifecycles. instead, we do a + // custom duplex implementation whose lower interface talks to the upper interface of the port... + // which is something that's really annoying without using pipelining, which we can't use. so + // this closed-over mutable doRead has to stand in for the pause event propagating down; we have to + // add or remove data listeners to the port stream to propagate read backpressure. + let doRead = false + const socket = new Duplex({ + write(chunk, encoding, cb) { + return port.write(chunk, encoding, cb) + }, + read() { + if (!doRead) { + port.on('data', dataForwarder) + doRead = true + } + }, + }) as Socket + + const dataForwarder = (chunk: any): void => { + if (doRead) { + doRead = socket.push(chunk) + if (!doRead) { + port.removeListener('data', dataForwarder) + } + } } - setTimeout(): void {} - - ref(): SerialPortSocket { - return this + // since this socket is independent from the port, we can do stuff like "have an activity timeout" + // without worrying that it will kill the socket + let currentTimeout: NodeJS.Timeout | null = null + const refreshTimeout = (): void => { + currentTimeout?.refresh() } - - // We never actually really want to destroy our serial port sockets, but - // the abort logic (at least) in node http client actually has a call stack - // that requires the socket close event to happen (???) so this is for that. - // We only really seem to abort when there's a 3xx return because we use - // npm follow-redirects and that aborts on a 3xx - destroy(): void { - if (!!!this.destroyed) { - this.destroyed = true - this.close() + socket.on('data', refreshTimeout) + socket.setTimeout = (timeout, callable?) => { + currentTimeout !== null && clearTimeout(currentTimeout) + if (timeout === 0 && currentTimeout !== null) { + currentTimeout = null + } else if (timeout !== 0) { + currentTimeout = setTimeout(() => { + console.log('socket timed out') + socket.emit('timeout') + }, timeout) + if (callable != null) { + socket.once('timeout', callable) + } } + + return socket } + // important: without this we'll leak sockets since the port event emitter will hold a ref to dataForwarder which + // closes over the socket + socket.on('close', () => { + port.removeListener('data', dataForwarder) + }) - _httpMessage: { shouldKeepAlive: boolean } | undefined = undefined -} + // some little functions to have the right shape for the http internals + socket.ref = () => socket + socket.unref = () => socket + socket.setKeepAlive = () => { + return socket + } + socket.setNoDelay = () => { + return socket + } -interface SerialPortHttpAgentOptions extends AgentOptions { - path: string - logger: Logger + socket.on('finish', () => { + socket.emit('close') + }) + socket.on('close', () => { + currentTimeout && clearTimeout(currentTimeout) + }) + return socket } const kOnKeylog = Symbol.for('onkeylog') @@ -151,24 +208,75 @@ class SerialPortHttpAgent extends http.Agent { declare sockets: NodeJS.Dict declare emit: ( event: string, - socket: SerialPortSocket, + socket: Socket, options: NodeJS.Dict ) => void declare getName: (options: NodeJS.Dict) => string - declare removeSocket: ( - socket: SerialPortSocket, - options: NodeJS.Dict - ) => void; + declare removeSocket: (socket: Socket, options: NodeJS.Dict) => void; // node can assign a keylogger to the agent for debugging, this allows adding the keylog listener to the event declare [kOnKeylog]: (...args: unknown[]) => void - constructor(options: SerialPortHttpAgentOptions) { + constructor( + options: SerialPortHttpAgentOptions, + onComplete: (err: Error | null, agent?: SerialPortHttpAgent) => void + ) { super(options) this.options = options + const openRetryer: (err: Error | null) => void = err => { + if (err != null) { + if (this.remainingRetries > 0 && !this.destroyed) { + const message = err?.message ?? err + this.log( + 'info', + `Failed to open port: ${message} , retrying ${this.remainingRetries} more times` + ) + this.remainingRetries-- + setTimeout( + () => this.port.open(openRetryer), + SOCKET_OPEN_RETRY_TIME_MS + ) + } else if (!this.destroyed) { + const message = err?.message ?? err + this.log( + 'info', + `Failed to open port after ${this.remainingRetries} attempts: ${message}` + ) + this.destroy() + onComplete(err) + } else { + this.log( + 'info', + `Cancelling open attempts because the agent was destroyed` + ) + onComplete(new Error('Agent destroyed while opening')) + } + } else if (!this.destroyed) { + this.log('info', `Port ${this.options.path} now open`) + onComplete(null, this) + } else { + this.log('info', `Port was opened but agent is now destroyed, closing`) + if (this.port.isOpen) { + this.port.close() + } + onComplete(new Error('Agent destroyed while opening')) + } + } + this.log( + 'info', + `creating and opening serial port for ${this.options.path}` + ) + this.port = new SerialPort( + { path: this.options.path, baudRate: 1152000, autoOpen: true }, + openRetryer + ) } + port: SerialPort + remainingRetries: number = MAX_SOCKET_CREATE_RETRIES + destroyed: boolean = false + // TODO: add method to close port (or destroy agent) options: { @@ -185,77 +293,49 @@ class SerialPortHttpAgent extends http.Agent { this.options.logger[level](msg, meta) } + destroy(): void { + this.destroyed = true + this.port.destroy(new Error('Agent was destroyed')) + } + createSocket( req: http.ClientRequest, options: NodeJS.Dict, - cb: Function + cb: (err: Error | string | null, stream?: Duplex) => void ): void { // copied from _http_agent.js, replacing this.createConnection - this.log('info', `creating usb socket at ${this.options.path}`) + this.log('info', `creating usb socket wrapper to ${this.options.path}`) options = { __proto__: null, ...options, ...this.options } const name = this.getName(options) options._agentKey = name options.encoding = null - // We preemptively increase the socket count and then reduce it if we - // actually failed because more requests will come in as soon as this function - // function finishes and if we don't increment it here those messages will also - // try and make new sockets - this.totalSocketCount++ - const oncreate = (err: any | null, s?: SerialPortSocket): void => { - if (err != null) { - this.totalSocketCount-- - return cb(err) - } - if (this.sockets[name] == null) { - this.sockets[name] = [] - } - this.sockets[name]?.push((s as unknown) as Socket) - this.log( - 'debug', - `sockets ${name} ${this.sockets[name]?.length ?? ''} ${ - this.totalSocketCount - }` - ) - installListeners(this, s as SerialPortSocket, options) - cb(null, s) + if (this.totalSocketCount >= 1) { + this.log('error', `tried to create more than one socket wrapper`) + cb(new Error('Cannot create more than one USB port wrapper')) + return } - // we do retries via recursion because this is all callback based anyway - const createSocketInner: ( - req: http.ClientRequest, - options: NodeJS.Dict, - cb: Function, - remainingRetries: number - ) => void = (req, options, cb, remainingRetries) => { - const socket: SerialPortSocket = new SerialPortSocket({ - path: this.options.path, - baudRate: 1152000, - // setting autoOpen false makes the rest of the logic a little easier because - // we always go through the "open-after-constructor" codepath - autoOpen: false, - }) - socket.open(err => { - if (err) { - if (remainingRetries > 0) { - setTimeout( - () => createSocketInner(req, options, cb, remainingRetries - 1), - SOCKET_OPEN_RETRY_TIME_MS - ) - } else { - oncreate(err) - } - } else { - oncreate(err, socket) - } - }) + if (!this.port.isOpen) { + this.log('error', `tried to create usb socket wrapper with closed port`) + cb(new Error('Underlying USB port is closed')) + return } - createSocketInner(req, options, cb, MAX_SOCKET_CREATE_RETRIES) + + const wrapper = socketEmulatorFromPort(this.port) + this.totalSocketCount++ + installListeners(this, wrapper, options) + this.log('info', `created usb socket wrapper writable: ${wrapper.writable}`) + cb(null, wrapper) + setImmediate(() => { + wrapper.emit('connect') + wrapper.emit('ready') + }) } } // most copied from _http_agent.js; onData and onFinish listeners added to log and close serial port function installListeners( agent: SerialPortHttpAgent, - s: SerialPortSocket, + s: Socket, options: { [k: string]: unknown } ): void { const onFree: () => void = () => { @@ -267,19 +347,10 @@ function installListeners( // the function, but we need the entire thing except like one conditional so we do this. agent.log('debug', 'CLIENT socket onFree') - // need to emit free to attach listeners to serialport - if (s._httpMessage) { - s._httpMessage.shouldKeepAlive = true - } agent.emit('free', s, options) } s.on('free', onFree) - s.on('open', () => { - s.emit('connect') - s.emit('ready') - }) - function onError(err: Error): void { agent.log('error', `CLIENT socket onError: ${err?.message}`) } @@ -287,25 +358,13 @@ function installListeners( function onClose(): void { agent.log('debug', 'CLIENT socket onClose') - // the 'close' event is emitted both by the serial port stream when it closes - // the serial port (yay) and by both the readable and writable streams that the - // serial port inherits from when they close which has nothing to do with the serial - // port (boo!) so if we get a close event we need to check if we're actually closed - // and if we're not do a real close (and also only remove the socket from the agent - // if it's real) - - if (s.isOpen) { - s.close() - } else { - agent.totalSocketCount-- - agent.removeSocket(s, options) - } + agent.totalSocketCount-- + agent.removeSocket(s, options) } s.on('close', onClose) function onFinish(): void { - agent.log('info', 'socket finishing: closing serialport') - s.close() + agent.log('info', 'socket finishing') } s.on('finish', onFinish)