diff --git a/app-shell/src/usb.ts b/app-shell/src/usb.ts index 81d1afdade1..ee402093770 100644 --- a/app-shell/src/usb.ts +++ b/app-shell/src/usb.ts @@ -34,25 +34,50 @@ let usbFetchInterval: NodeJS.Timeout export function getSerialPortHttpAgent(): SerialPortHttpAgent | undefined { return usbHttpAgent } -export function createSerialPortHttpAgent(path: string): void { - const serialPortHttpAgent = new SerialPortHttpAgent({ - maxFreeSockets: 1, - maxSockets: 1, - maxTotalSockets: 1, - keepAlive: true, - keepAliveMsecs: Infinity, - path, - logger: usbLog, - timeout: 100000, - }) - usbHttpAgent = serialPortHttpAgent +export function createSerialPortHttpAgent( + path: string, + onComplete: (err: Error | null, agent?: SerialPortHttpAgent) => void +): void { + if (usbHttpAgent != null) { + onComplete( + new Error('Tried to make a USB http agent when one already existed') + ) + } else { + usbHttpAgent = new SerialPortHttpAgent( + { + maxFreeSockets: 1, + maxSockets: 1, + maxTotalSockets: 1, + keepAlive: true, + keepAliveMsecs: Infinity, + path, + logger: usbLog, + timeout: 100000, + }, + (err, agent?) => { + if (err != null) { + usbHttpAgent = undefined + } + onComplete(err, agent) + } + ) + } } -export function destroyUsbHttpAgent(): void { +export function destroyAndStopUsbHttpRequests(dispatch: Dispatch): void { if (usbHttpAgent != null) { usbHttpAgent.destroy() } usbHttpAgent = undefined + ipcMain.removeHandler('usb:request') + dispatch(usbRequestsStop()) + // handle any additional invocations of usb:request + ipcMain.handle('usb:request', () => + Promise.resolve({ + status: 400, + statusText: 'USB robot disconnected', + }) + ) } function isUsbDeviceOt3(device: UsbDevice): boolean { @@ -115,42 +140,11 @@ function pollSerialPortAndCreateAgent(dispatch: Dispatch): void { } usbFetchInterval = setInterval(() => { // already connected to an Opentrons robot via USB - if (getSerialPortHttpAgent() != null) { - return - } - usbLog.debug('fetching serialport list') - fetchSerialPortList() - .then((list: PortInfo[]) => { - const ot3UsbSerialPort = list.find( - port => - port.productId?.localeCompare(DEFAULT_PRODUCT_ID, 'en-US', { - sensitivity: 'base', - }) === 0 && - port.vendorId?.localeCompare(DEFAULT_VENDOR_ID, 'en-US', { - sensitivity: 'base', - }) === 0 - ) - - if (ot3UsbSerialPort == null) { - usbLog.debug('no OT-3 serial port found') - return - } - - createSerialPortHttpAgent(ot3UsbSerialPort.path) - // remove any existing handler - ipcMain.removeHandler('usb:request') - ipcMain.handle('usb:request', usbListener) - - dispatch(usbRequestsStart()) - }) - .catch(e => - // eslint-disable-next-line @typescript-eslint/restrict-template-expressions - usbLog.debug(`fetchSerialPortList error ${e?.message ?? 'unknown'}`) - ) + tryCreateAndStartUsbHttpRequests(dispatch) }, 10000) } -function startUsbHttpRequests(dispatch: Dispatch): void { +function tryCreateAndStartUsbHttpRequests(dispatch: Dispatch): void { fetchSerialPortList() .then((list: PortInfo[]) => { const ot3UsbSerialPort = list.find( @@ -165,17 +159,22 @@ function startUsbHttpRequests(dispatch: Dispatch): void { // retry if no OT-3 serial port found - usb-detection and serialport packages have race condition if (ot3UsbSerialPort == null) { - usbLog.debug('no OT-3 serial port found, retrying') - setTimeout(() => startUsbHttpRequests(dispatch), 1000) + usbLog.debug('no OT-3 serial port found') return } - - createSerialPortHttpAgent(ot3UsbSerialPort.path) - // remove any existing handler - ipcMain.removeHandler('usb:request') - ipcMain.handle('usb:request', usbListener) - - dispatch(usbRequestsStart()) + if (usbHttpAgent == null) { + createSerialPortHttpAgent(ot3UsbSerialPort.path, (err, agent?) => { + if (err != null) { + const message = err?.message ?? err + usbLog.error(`Failed to create serial port: ${message}`) + } + if (agent) { + ipcMain.removeHandler('usb:request') + ipcMain.handle('usb:request', usbListener) + dispatch(usbRequestsStart()) + } + }) + } }) .catch(e => // eslint-disable-next-line @typescript-eslint/restrict-template-expressions @@ -188,27 +187,18 @@ export function registerUsb(dispatch: Dispatch): (action: Action) => unknown { switch (action.type) { case SYSTEM_INFO_INITIALIZED: if (action.payload.usbDevices.find(isUsbDeviceOt3) != null) { - startUsbHttpRequests(dispatch) + tryCreateAndStartUsbHttpRequests(dispatch) } pollSerialPortAndCreateAgent(dispatch) break case USB_DEVICE_ADDED: if (isUsbDeviceOt3(action.payload.usbDevice)) { - startUsbHttpRequests(dispatch) + tryCreateAndStartUsbHttpRequests(dispatch) } break case USB_DEVICE_REMOVED: if (isUsbDeviceOt3(action.payload.usbDevice)) { - destroyUsbHttpAgent() - ipcMain.removeHandler('usb:request') - dispatch(usbRequestsStop()) - // handle any additional invocations of usb:request - ipcMain.handle('usb:request', () => - Promise.resolve({ - status: 400, - statusText: 'USB robot disconnected', - }) - ) + destroyAndStopUsbHttpRequests(dispatch) } break } diff --git a/usb-bridge/node-client/src/usb-agent.ts b/usb-bridge/node-client/src/usb-agent.ts index 62639f23796..b4a2bf933e2 100644 --- a/usb-bridge/node-client/src/usb-agent.ts +++ b/usb-bridge/node-client/src/usb-agent.ts @@ -1,6 +1,6 @@ import * as http from 'http' import agent from 'agent-base' -import type { Duplex } from 'stream' +import { Duplex } from 'stream' import { SerialPort } from 'serialport' @@ -110,38 +110,95 @@ export function createSerialPortListMonitor( return { start, stop } } -class SerialPortSocket extends SerialPort { - // added these to squash keepAlive errors - setKeepAlive(): void {} +interface SerialPortHttpAgentOptions extends AgentOptions { + path: string + logger: Logger +} - unref(): SerialPortSocket { - return this +function socketEmulatorFromPort(port: SerialPort): Socket { + // build a duplex stream to act as a socket that we can give to node https internals, linked + // to an open usb serial port. + // + // this is a separate stream rather than just passing in the port so that we can sever the + // lifetimes and lifecycles of the socket and the port. sockets want to be closed and opened all + // the time by node http internals, and we don't want that for the port since opening and closing it + // can take a while. this lets us open and close and create and destroy sockets at will while not + // affecting the port. + + // unfortunately, because we need to sever the lifecycles, we can't use node stream pipelining + // since half the point of node stream pipelining is to link stream lifecycles. instead, we do a + // custom duplex implementation whose lower interface talks to the upper interface of the port... + // which is something that's really annoying without using pipelining, which we can't use. so + // this closed-over mutable doRead has to stand in for the pause event propagating down; we have to + // add or remove data listeners to the port stream to propagate read backpressure. + let doRead = false + const socket = new Duplex({ + write(chunk, encoding, cb) { + return port.write(chunk, encoding, cb) + }, + read() { + if (!doRead) { + port.on('data', dataForwarder) + doRead = true + } + }, + }) as Socket + + const dataForwarder = (chunk: any): void => { + if (doRead) { + doRead = socket.push(chunk) + if (!doRead) { + port.removeListener('data', dataForwarder) + } + } } - setTimeout(): void {} - - ref(): SerialPortSocket { - return this + // since this socket is independent from the port, we can do stuff like "have an activity timeout" + // without worrying that it will kill the socket + let currentTimeout: NodeJS.Timeout | null = null + const refreshTimeout = (): void => { + currentTimeout?.refresh() } - - // We never actually really want to destroy our serial port sockets, but - // the abort logic (at least) in node http client actually has a call stack - // that requires the socket close event to happen (???) so this is for that. - // We only really seem to abort when there's a 3xx return because we use - // npm follow-redirects and that aborts on a 3xx - destroy(): void { - if (!!!this.destroyed) { - this.destroyed = true - this.close() + socket.on('data', refreshTimeout) + socket.setTimeout = (timeout, callable?) => { + currentTimeout !== null && clearTimeout(currentTimeout) + if (timeout === 0 && currentTimeout !== null) { + currentTimeout = null + } else if (timeout !== 0) { + currentTimeout = setTimeout(() => { + console.log('socket timed out') + socket.emit('timeout') + }, timeout) + if (callable != null) { + socket.once('timeout', callable) + } } + + return socket } + // important: without this we'll leak sockets since the port event emitter will hold a ref to dataForwarder which + // closes over the socket + socket.on('close', () => { + port.removeListener('data', dataForwarder) + }) - _httpMessage: { shouldKeepAlive: boolean } | undefined = undefined -} + // some little functions to have the right shape for the http internals + socket.ref = () => socket + socket.unref = () => socket + socket.setKeepAlive = () => { + return socket + } + socket.setNoDelay = () => { + return socket + } -interface SerialPortHttpAgentOptions extends AgentOptions { - path: string - logger: Logger + socket.on('finish', () => { + socket.emit('close') + }) + socket.on('close', () => { + currentTimeout && clearTimeout(currentTimeout) + }) + return socket } const kOnKeylog = Symbol.for('onkeylog') @@ -151,24 +208,75 @@ class SerialPortHttpAgent extends http.Agent { declare sockets: NodeJS.Dict declare emit: ( event: string, - socket: SerialPortSocket, + socket: Socket, options: NodeJS.Dict ) => void declare getName: (options: NodeJS.Dict) => string - declare removeSocket: ( - socket: SerialPortSocket, - options: NodeJS.Dict - ) => void; + declare removeSocket: (socket: Socket, options: NodeJS.Dict) => void; // node can assign a keylogger to the agent for debugging, this allows adding the keylog listener to the event declare [kOnKeylog]: (...args: unknown[]) => void - constructor(options: SerialPortHttpAgentOptions) { + constructor( + options: SerialPortHttpAgentOptions, + onComplete: (err: Error | null, agent?: SerialPortHttpAgent) => void + ) { super(options) this.options = options + const openRetryer: (err: Error | null) => void = err => { + if (err != null) { + if (this.remainingRetries > 0 && !this.destroyed) { + const message = err?.message ?? err + this.log( + 'info', + `Failed to open port: ${message} , retrying ${this.remainingRetries} more times` + ) + this.remainingRetries-- + setTimeout( + () => this.port.open(openRetryer), + SOCKET_OPEN_RETRY_TIME_MS + ) + } else if (!this.destroyed) { + const message = err?.message ?? err + this.log( + 'info', + `Failed to open port after ${this.remainingRetries} attempts: ${message}` + ) + this.destroy() + onComplete(err) + } else { + this.log( + 'info', + `Cancelling open attempts because the agent was destroyed` + ) + onComplete(new Error('Agent destroyed while opening')) + } + } else if (!this.destroyed) { + this.log('info', `Port ${this.options.path} now open`) + onComplete(null, this) + } else { + this.log('info', `Port was opened but agent is now destroyed, closing`) + if (this.port.isOpen) { + this.port.close() + } + onComplete(new Error('Agent destroyed while opening')) + } + } + this.log( + 'info', + `creating and opening serial port for ${this.options.path}` + ) + this.port = new SerialPort( + { path: this.options.path, baudRate: 1152000, autoOpen: true }, + openRetryer + ) } + port: SerialPort + remainingRetries: number = MAX_SOCKET_CREATE_RETRIES + destroyed: boolean = false + // TODO: add method to close port (or destroy agent) options: { @@ -185,77 +293,49 @@ class SerialPortHttpAgent extends http.Agent { this.options.logger[level](msg, meta) } + destroy(): void { + this.destroyed = true + this.port.destroy(new Error('Agent was destroyed')) + } + createSocket( req: http.ClientRequest, options: NodeJS.Dict, - cb: Function + cb: (err: Error | string | null, stream?: Duplex) => void ): void { // copied from _http_agent.js, replacing this.createConnection - this.log('info', `creating usb socket at ${this.options.path}`) + this.log('info', `creating usb socket wrapper to ${this.options.path}`) options = { __proto__: null, ...options, ...this.options } const name = this.getName(options) options._agentKey = name options.encoding = null - // We preemptively increase the socket count and then reduce it if we - // actually failed because more requests will come in as soon as this function - // function finishes and if we don't increment it here those messages will also - // try and make new sockets - this.totalSocketCount++ - const oncreate = (err: any | null, s?: SerialPortSocket): void => { - if (err != null) { - this.totalSocketCount-- - return cb(err) - } - if (this.sockets[name] == null) { - this.sockets[name] = [] - } - this.sockets[name]?.push((s as unknown) as Socket) - this.log( - 'debug', - `sockets ${name} ${this.sockets[name]?.length ?? ''} ${ - this.totalSocketCount - }` - ) - installListeners(this, s as SerialPortSocket, options) - cb(null, s) + if (this.totalSocketCount >= 1) { + this.log('error', `tried to create more than one socket wrapper`) + cb(new Error('Cannot create more than one USB port wrapper')) + return } - // we do retries via recursion because this is all callback based anyway - const createSocketInner: ( - req: http.ClientRequest, - options: NodeJS.Dict, - cb: Function, - remainingRetries: number - ) => void = (req, options, cb, remainingRetries) => { - const socket: SerialPortSocket = new SerialPortSocket({ - path: this.options.path, - baudRate: 1152000, - // setting autoOpen false makes the rest of the logic a little easier because - // we always go through the "open-after-constructor" codepath - autoOpen: false, - }) - socket.open(err => { - if (err) { - if (remainingRetries > 0) { - setTimeout( - () => createSocketInner(req, options, cb, remainingRetries - 1), - SOCKET_OPEN_RETRY_TIME_MS - ) - } else { - oncreate(err) - } - } else { - oncreate(err, socket) - } - }) + if (!this.port.isOpen) { + this.log('error', `tried to create usb socket wrapper with closed port`) + cb(new Error('Underlying USB port is closed')) + return } - createSocketInner(req, options, cb, MAX_SOCKET_CREATE_RETRIES) + + const wrapper = socketEmulatorFromPort(this.port) + this.totalSocketCount++ + installListeners(this, wrapper, options) + this.log('info', `created usb socket wrapper writable: ${wrapper.writable}`) + cb(null, wrapper) + setImmediate(() => { + wrapper.emit('connect') + wrapper.emit('ready') + }) } } // most copied from _http_agent.js; onData and onFinish listeners added to log and close serial port function installListeners( agent: SerialPortHttpAgent, - s: SerialPortSocket, + s: Socket, options: { [k: string]: unknown } ): void { const onFree: () => void = () => { @@ -267,19 +347,10 @@ function installListeners( // the function, but we need the entire thing except like one conditional so we do this. agent.log('debug', 'CLIENT socket onFree') - // need to emit free to attach listeners to serialport - if (s._httpMessage) { - s._httpMessage.shouldKeepAlive = true - } agent.emit('free', s, options) } s.on('free', onFree) - s.on('open', () => { - s.emit('connect') - s.emit('ready') - }) - function onError(err: Error): void { agent.log('error', `CLIENT socket onError: ${err?.message}`) } @@ -287,25 +358,13 @@ function installListeners( function onClose(): void { agent.log('debug', 'CLIENT socket onClose') - // the 'close' event is emitted both by the serial port stream when it closes - // the serial port (yay) and by both the readable and writable streams that the - // serial port inherits from when they close which has nothing to do with the serial - // port (boo!) so if we get a close event we need to check if we're actually closed - // and if we're not do a real close (and also only remove the socket from the agent - // if it's real) - - if (s.isOpen) { - s.close() - } else { - agent.totalSocketCount-- - agent.removeSocket(s, options) - } + agent.totalSocketCount-- + agent.removeSocket(s, options) } s.on('close', onClose) function onFinish(): void { - agent.log('info', 'socket finishing: closing serialport') - s.close() + agent.log('info', 'socket finishing') } s.on('finish', onFinish)