From 3b6e08ab2647fc4ebd41a31f52bb56e2f41a9322 Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Mon, 25 Nov 2024 16:44:28 +1300 Subject: [PATCH] Subscription improvements (#2600) * WIP historical different trigger types * Update to support delete in historical subscriptions, add support for WS in the playground * Update changelog * Update types for subscriptions * Update changelog --- .eslintrc.js | 11 +- jest.config.js | 2 +- packages/node-core/CHANGELOG.md | 1 + packages/node-core/src/db/sync-helper.test.ts | 144 +++++++++++++++++- packages/node-core/src/db/sync-helper.ts | 39 ++++- packages/query/CHANGELOG.md | 3 + packages/query/package.json | 4 +- packages/query/src/graphql/graphql.module.ts | 22 ++- .../graphql/plugins/PgSubscriptionPlugin.ts | 33 +++- .../src/graphql/plugins/PlaygroundPlugin.ts | 2 +- yarn.lock | 39 ++--- 11 files changed, 256 insertions(+), 44 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index 3779962f00..3a826d4a0a 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -32,6 +32,9 @@ module.exports = { 'plugin:jest/recommended', ], rules: { + // Note: you must disable the base rule as it can report incorrect errors + "require-await": "off", + "@typescript-eslint/require-await": "error", // rules turned off in upstream project (also required when recommended-requiring-type-checking is extended) '@typescript-eslint/explicit-function-return-type': 'off', '@typescript-eslint/no-unsafe-argument': 'off', @@ -62,11 +65,11 @@ module.exports = { accessibility: 'no-public', }, ], - '@typescript-eslint/no-namespace': ['error', {allowDeclarations: true}], + '@typescript-eslint/no-namespace': ['error', { allowDeclarations: true }], // "@typescript-eslint/member-ordering": "error", // "@typescript-eslint/naming-convention": "error", // "@typescript-eslint/no-param-reassign": "error", - '@typescript-eslint/promise-function-async': ['error', {checkArrowFunctions: false}], + '@typescript-eslint/promise-function-async': ['error', { checkArrowFunctions: false }], // "arrow-body-style": "error", complexity: ['error', 20], curly: ['error', 'multi-line'], @@ -87,7 +90,7 @@ module.exports = { 'line', [ //Copyright 2020-2024 SubQuery Pte Ltd authors & contributors - {pattern: ' Copyright \\d{4}(-\\d{4})? SubQuery Pte Ltd authors & contributors'}, + { pattern: ' Copyright \\d{4}(-\\d{4})? SubQuery Pte Ltd authors & contributors' }, ' SPDX-License-Identifier: GPL-3.0', ], 2, @@ -112,7 +115,7 @@ module.exports = { '@typescript-eslint/parser': ['.ts', '.tsx'], }, }, - overrides:[ + overrides: [ { files: ['*.test.ts', '*.spec.ts'], rules: { diff --git a/jest.config.js b/jest.config.js index 4bbaf6e5de..5eb4dd55b8 100644 --- a/jest.config.js +++ b/jest.config.js @@ -175,7 +175,7 @@ module.exports = { ], // The regexp pattern or array of patterns that Jest uses to detect test files - testRegex: '.*\\.spec\\.ts$', + testRegex: '.*\\.(spec)\\.ts$', // This option allows the use of a custom results processor // testResultsProcessor: undefined, diff --git a/packages/node-core/CHANGELOG.md b/packages/node-core/CHANGELOG.md index 463c9cfeb4..5cc8686db2 100644 --- a/packages/node-core/CHANGELOG.md +++ b/packages/node-core/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - When using a GET query to retrieve an entity, it will include a “store” field. - Support for historical indexing by timestamp as well as block height (#2584) +- Subscriptions not emitting deleted mutation type with historical (#2600) ### Removed - Support for cockroach DB (#2584) diff --git a/packages/node-core/src/db/sync-helper.test.ts b/packages/node-core/src/db/sync-helper.test.ts index 588b33849a..e9a09c1943 100644 --- a/packages/node-core/src/db/sync-helper.test.ts +++ b/packages/node-core/src/db/sync-helper.test.ts @@ -3,10 +3,11 @@ import {INestApplication} from '@nestjs/common'; import {Test} from '@nestjs/testing'; -import {getDbSizeAndUpdateMetadata} from '@subql/node-core'; +import {delay} from '@subql/common'; import {Sequelize} from '@subql/x-sequelize'; import {NodeConfig} from '../configure/NodeConfig'; import {DbModule} from './db.module'; +import {createSendNotificationTriggerFunction, createNotifyTrigger, getDbSizeAndUpdateMetadata} from './sync-helper'; const nodeConfig = new NodeConfig({subquery: 'packages/node-core/test/v1.0.0', subqueryName: 'test'}); @@ -42,4 +43,145 @@ describe('sync helper test', () => { const dbSize = await getDbSizeAndUpdateMetadata(sequelize, schema); expect(dbSize).not.toBeUndefined(); }, 50000); + + describe('has the correct notification trigger payload', () => { + let client: unknown; + + afterEach(async () => { + if (client) { + await (client as any).query('UNLISTEN "0xc4e66f9e1358fa3c"'); + (client as any).removeAllListeners('notification'); + sequelize.connectionManager.releaseConnection(client); + } + }); + + it('without historical', async () => { + const module = await Test.createTestingModule({ + imports: [DbModule.forRootWithConfig(nodeConfig)], + }).compile(); + + app = module.createNestApplication(); + await app.init(); + sequelize = app.get(Sequelize); + schema = 'trigger-test'; + const tableName = 'test'; + await sequelize.createSchema(schema, {}); + // mock create metadata table + await sequelize.query(` + CREATE TABLE IF NOT EXISTS "${schema}".${tableName} ( + id text NOT NULL, + block_number int4 NOT NULL + )`); + + await sequelize.query(createSendNotificationTriggerFunction(schema)); + await sequelize.query(createNotifyTrigger(schema, tableName)); + + client = await sequelize.connectionManager.getConnection({ + type: 'read', + }); + + await (client as any).query('LISTEN "0xc4e66f9e1358fa3c"'); + + const listener = jest.fn(); + (client as any).on('notification', (msg: any) => { + console.log('Payload:', msg.payload); + listener(msg.payload); + }); + + await sequelize.query(`INSERT INTO "${schema}".${tableName} (id, block_number) VALUES ('1', 1);`); + await sequelize.query(`UPDATE "${schema}".${tableName} SET block_number = 2 WHERE id = '1';`); + await sequelize.query(`DELETE FROM "${schema}".${tableName} WHERE id = '1';`); + await delay(1); + expect(listener).toHaveBeenNthCalledWith( + 1, + `{"id": "1", "_entity": {"id": "1", "block_number": 1}, "mutation_type": "INSERT"}` + ); + expect(listener).toHaveBeenNthCalledWith( + 2, + `{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "UPDATE"}` + ); + expect(listener).toHaveBeenNthCalledWith( + 3, + `{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "DELETE"}` + ); + }, 10_000); + + it('with historical', async () => { + const module = await Test.createTestingModule({ + imports: [DbModule.forRootWithConfig(nodeConfig)], + }).compile(); + + app = module.createNestApplication(); + await app.init(); + sequelize = app.get(Sequelize); + schema = 'trigger-test'; + const tableName = 'test'; + await sequelize.createSchema(schema, {}); + // mock create metadata table + await sequelize.query(` + CREATE TABLE IF NOT EXISTS "${schema}".${tableName} ( + id text NOT NULL, + block_number int4 NOT NULL, + "_id" uuid NOT NULL, + "_block_range" int8range NOT NULL, + CONSTRAINT test_pk PRIMARY KEY (_id) + )`); + + await sequelize.query(createSendNotificationTriggerFunction(schema)); + await sequelize.query(createNotifyTrigger(schema, tableName)); + + client = await sequelize.connectionManager.getConnection({ + type: 'read', + }); + + await (client as any).query('LISTEN "0xc4e66f9e1358fa3c"'); + + const listener = jest.fn(); + + (client as any).on('notification', (msg: any) => { + console.log('Payload:', msg.payload); + listener(msg.payload); + }); + + // Insert + await sequelize.query( + `INSERT INTO "${schema}".${tableName} (id, block_number, _id, _block_range) VALUES ('1', 1, 'adde2f8c-cb87-4e84-9600-77f434556e6d', int8range(1, NULL));` + ); + + // Simulate Update + const tx1 = await sequelize.transaction(); + await sequelize.query( + `INSERT INTO "${schema}".${tableName} (id, block_number, _id, _block_range) VALUES ('1', 2, '9396aca4-cef2-4b52-98a7-c5f1ed3edb81', int8range(2, NULL));` + ); + await sequelize.query( + `UPDATE "${schema}".${tableName} SET block_number = 2, _block_range = int8range(1, 2) WHERE _id = 'adde2f8c-cb87-4e84-9600-77f434556e6d';`, + {transaction: tx1} + ); + await tx1.commit(); + + // Simulate delete + const tx2 = await sequelize.transaction(); + await sequelize.query( + `UPDATE "${schema}".${tableName} SET _block_range = int8range(2, 3) WHERE _id = '9396aca4-cef2-4b52-98a7-c5f1ed3edb81';`, + {transaction: tx2} + ); + await tx2.commit(); + + await delay(1); + // There is a limitation with our historical implementation where with the order or queries means we cant easily determine the difference between insert and update. + // For that reason the behaviour is kept the same as before delete was fixed. + expect(listener).toHaveBeenNthCalledWith( + 1, + `{"id": "1", "_entity": {"id": "1", "block_number": 1}, "mutation_type": "UPDATE"}` + ); + expect(listener).toHaveBeenNthCalledWith( + 2, + `{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "UPDATE"}` + ); + expect(listener).toHaveBeenNthCalledWith( + 3, + `{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "DELETE"}` + ); + }, 10_000); + }); }); diff --git a/packages/node-core/src/db/sync-helper.ts b/packages/node-core/src/db/sync-helper.ts index 6f5cd8dc89..4c15a3d816 100644 --- a/packages/node-core/src/db/sync-helper.ts +++ b/packages/node-core/src/db/sync-helper.ts @@ -176,10 +176,14 @@ CREATE OR REPLACE FUNCTION "${schema}".send_notification() RETURNS trigger AS $$ DECLARE channel TEXT; + table_name TEXT; row RECORD; payload JSONB; + prev_entity BOOLEAN; + next_entity BOOLEAN; BEGIN channel:= TG_ARGV[0]; + table_name:= TG_TABLE_NAME; IF (TG_OP = 'DELETE') THEN row = OLD; ELSE @@ -189,13 +193,38 @@ BEGIN 'id', row.id, 'mutation_type', TG_OP, '_entity', row); - IF payload -> '_entity' ? '_block_range' THEN - IF NOT upper_inf(row._block_range) THEN + IF payload -> '_entity' ? '_block_range' then + payload = payload #- '{"_entity","_id"}'; + payload = payload #- '{"_entity","_block_range"}'; + IF NOT upper_inf(row._block_range) then + -- Check if a newer version of the entity exists to determine operation + EXECUTE FORMAT( + 'SELECT EXISTS (SELECT 1 FROM "${schema}".%I WHERE id = $1 AND lower(_block_range) = upper($2))', + TG_TABLE_NAME + ) + INTO next_entity + USING row.id, row._block_range; + + IF NOT next_entity THEN + payload = payload || '{"mutation_type": "DELETE"}'; + ELSE RETURN NULL; + END IF; + ELSE + -- Because of the oerder of operations with historical, we cannot determine if this is an update or insert + payload = payload || '{"mutation_type": "UPDATE"}'; + -- EXECUTE FORMAT( + -- 'SELECT EXISTS (SELECT 1 FROM "${schema}".%I WHERE id = $1 AND upper(_block_range) = lower($2))', + -- TG_TABLE_NAME + -- ) + -- INTO prev_entity + -- USING row.id, row._block_range; + -- IF NOT prev_entity THEN + -- payload = payload || '{"mutation_type": "INSERT"}'; + -- ELSE + -- payload = payload || '{"mutation_type": "UPDATE"}'; + -- END IF; END IF; - payload = payload || '{"mutation_type": "UPDATE"}'; - payload = payload #- '{"_entity","_id"}'; - payload = payload #- '{"_entity","_block_range"}'; END IF; IF (octet_length(payload::text) >= 8000) THEN payload = payload || '{"_entity": null}'; diff --git a/packages/query/CHANGELOG.md b/packages/query/CHANGELOG.md index 2c213457c7..70f5ffd04a 100644 --- a/packages/query/CHANGELOG.md +++ b/packages/query/CHANGELOG.md @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Changed +- WS libarary with support for graphiql (#2600) +- Subscription type from JSON to the correct entity type (#2600) ## [2.16.0] - 2024-11-22 ### Changed diff --git a/packages/query/package.json b/packages/query/package.json index f44142173a..7a9da09920 100644 --- a/packages/query/package.json +++ b/packages/query/package.json @@ -48,13 +48,14 @@ "graphile-utils": "^4.12.2", "graphql": "^15.8.0", "graphql-query-complexity": "^0.11.0", + "graphql-ws": "^5.16.0", "lodash": "^4.17.21", "pg": "^8.12.0", "pg-tsquery": "^8.4.2", "postgraphile": "^4.13.0", "postgraphile-plugin-connection-filter": "^2.2.2", "rxjs": "^7.1.0", - "subscriptions-transport-ws": "^0.11.0", + "ws": "^8.18.0", "yargs": "^16.2.0" }, "devDependencies": { @@ -65,6 +66,7 @@ "@types/express-pino-logger": "^4.0.5", "@types/jest": "^27.5.2", "@types/lodash": "^4.17.7", + "@types/ws": "^8", "@types/yargs": "^16.0.9", "nodemon": "^3.1.4" } diff --git a/packages/query/src/graphql/graphql.module.ts b/packages/query/src/graphql/graphql.module.ts index e6be02e73c..53023f8629 100644 --- a/packages/query/src/graphql/graphql.module.ts +++ b/packages/query/src/graphql/graphql.module.ts @@ -14,11 +14,12 @@ import {ApolloServer, UserInputError} from 'apollo-server-express'; import compression from 'compression'; import {NextFunction, Request, Response} from 'express'; import PinoLogger from 'express-pino-logger'; -import {execute, GraphQLSchema, subscribe} from 'graphql'; +import {GraphQLSchema} from 'graphql'; +import {useServer} from 'graphql-ws/lib/use/ws'; import {set} from 'lodash'; import {Pool, PoolClient} from 'pg'; import {makePluginHook} from 'postgraphile'; -import {SubscriptionServer} from 'subscriptions-transport-ws'; +import {WebSocketServer} from 'ws'; import {Config} from '../configure'; import {queryExplainPlugin} from '../configure/x-postgraphile/debugClient'; import {getLogger, PinoConfig} from '../utils/logger'; @@ -36,6 +37,7 @@ const logger = getLogger('graphql-module'); const SCHEMA_RETRY_INTERVAL = 10; //seconds const SCHEMA_RETRY_NUMBER = 5; +const WS_ROUTE = '/'; class NoInitError extends Error { constructor() { @@ -47,6 +49,7 @@ class NoInitError extends Error { }) export class GraphqlModule implements OnModuleInit, OnModuleDestroy { private _apolloServer?: ApolloServer; + private wsCleanup?: ReturnType; constructor( private readonly httpAdapterHost: HttpAdapterHost, private readonly config: Config, @@ -88,8 +91,9 @@ export class GraphqlModule implements OnModuleInit, OnModuleDestroy { // eslint-disable-next-line @typescript-eslint/require-await async onModuleDestroy(): Promise { - return this.apolloServer?.stop(); + await Promise.all([this.apolloServer?.stop(), this.wsCleanup?.dispose()]); } + private async buildSchema( dbSchema: string, options: PostGraphileCoreOptions, @@ -187,7 +191,9 @@ export class GraphqlModule implements OnModuleInit, OnModuleDestroy { defaultMaxAge: 5, calculateHttpHeaders: true, }), - this.config.get('playground') ? playgroundPlugin({url: '/'}) : ApolloServerPluginLandingPageDisabled(), + this.config.get('playground') + ? playgroundPlugin({url: '/', subscriptionUrl: argv.subscription ? WS_ROUTE : undefined}) + : ApolloServerPluginLandingPageDisabled(), queryComplexityPlugin({schema, maxComplexity: argv['query-complexity']}), queryDepthLimitPlugin({schema, maxDepth: argv['query-depth-limit']}), queryAliasLimit({schema, limit: argv['query-alias-limit']}), @@ -207,8 +213,12 @@ export class GraphqlModule implements OnModuleInit, OnModuleDestroy { }); if (argv.subscription) { - // TODO: Replace subscriptions-transport-ws with graphql-ws when support is added to graphql-playground - SubscriptionServer.create({schema, execute, subscribe}, {server: httpServer, path: '/'}); + const wsServer = new WebSocketServer({ + server: httpServer, + path: WS_ROUTE, + }); + + this.wsCleanup = useServer({schema}, wsServer); } app.use(PinoLogger(PinoConfig)); diff --git a/packages/query/src/graphql/plugins/PgSubscriptionPlugin.ts b/packages/query/src/graphql/plugins/PgSubscriptionPlugin.ts index 28c3f1efd0..3862dad304 100644 --- a/packages/query/src/graphql/plugins/PgSubscriptionPlugin.ts +++ b/packages/query/src/graphql/plugins/PgSubscriptionPlugin.ts @@ -4,6 +4,7 @@ import {hashName} from '@subql/utils'; import {PgIntrospectionResultsByKind} from '@subql/x-graphile-build-pg'; import {makeExtendSchemaPlugin, gql, embed} from 'graphile-utils'; +import {DocumentNode} from 'graphql'; const filter = (event, args) => { if (args.mutation && !args.mutation.includes(event.mutation_type)) { @@ -15,6 +16,19 @@ const filter = (event, args) => { return true; }; +function makePayload(entityType: string): {type: DocumentNode; name: string} { + const name = `${entityType}Payload`; + const type = gql` + type ${name} { + id: ID! + mutation_type: MutationType! + _entity: ${entityType} + } + `; + + return {name, type}; +} + export const PgSubscriptionPlugin = makeExtendSchemaPlugin((build) => { const {inflection, pgIntrospectionResultsByKind} = build; @@ -25,26 +39,26 @@ export const PgSubscriptionPlugin = makeExtendSchemaPlugin((build) => { UPDATE DELETE } - - type SubscriptionPayload { - id: ID! - mutation_type: MutationType! - _entity: JSON - } `, ]; + const resolvers: Record = {}; + // Generate subscription fields for all database tables (pgIntrospectionResultsByKind as PgIntrospectionResultsByKind).class.forEach((table) => { if (!table.namespace || table.name === '_metadata') return; const field = inflection.allRows(table); + const type = inflection.tableType(table); + + const {name: payloadName, type: payloadType} = makePayload(type); const topic = hashName(table.namespace.name, 'notify_channel', table.name); typeDefs.push( gql` + ${payloadType} extend type Subscription { - ${field}(id: [ID!], mutation: [MutationType!]): SubscriptionPayload + ${field}(id: [ID!], mutation: [MutationType!]): ${payloadName} @pgSubscription( topic: ${embed(topic)} filter: ${embed(filter)} @@ -53,5 +67,8 @@ export const PgSubscriptionPlugin = makeExtendSchemaPlugin((build) => { ); }); - return {typeDefs}; + return { + typeDefs, + resolvers, + }; }); diff --git a/packages/query/src/graphql/plugins/PlaygroundPlugin.ts b/packages/query/src/graphql/plugins/PlaygroundPlugin.ts index 504de2d125..1487c44f61 100644 --- a/packages/query/src/graphql/plugins/PlaygroundPlugin.ts +++ b/packages/query/src/graphql/plugins/PlaygroundPlugin.ts @@ -3,7 +3,7 @@ import type {ApolloServerPlugin, GraphQLServerListener} from 'apollo-server-plugin-base'; -export function playgroundPlugin(options: {url: string}): ApolloServerPlugin { +export function playgroundPlugin(options: {url: string; subscriptionUrl?: string}): ApolloServerPlugin { return { // eslint-disable-next-line @typescript-eslint/require-await async serverWillStart(): Promise { diff --git a/yarn.lock b/yarn.lock index f3ec486cab..b411496078 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6867,6 +6867,7 @@ __metadata: "@types/express-pino-logger": ^4.0.5 "@types/jest": ^27.5.2 "@types/lodash": ^4.17.7 + "@types/ws": ^8 "@types/yargs": ^16.0.9 apollo-server-express: ^3.12.0 compression: ^1.7.4 @@ -6875,6 +6876,7 @@ __metadata: graphile-utils: ^4.12.2 graphql: ^15.8.0 graphql-query-complexity: ^0.11.0 + graphql-ws: ^5.16.0 lodash: ^4.17.21 nodemon: ^3.1.4 pg: ^8.12.0 @@ -6882,7 +6884,7 @@ __metadata: postgraphile: ^4.13.0 postgraphile-plugin-connection-filter: ^2.2.2 rxjs: ^7.1.0 - subscriptions-transport-ws: ^0.11.0 + ws: ^8.18.0 yargs: ^16.2.0 bin: subql-query: ./bin/run @@ -8058,6 +8060,15 @@ __metadata: languageName: node linkType: hard +"@types/ws@npm:^8": + version: 8.5.13 + resolution: "@types/ws@npm:8.5.13" + dependencies: + "@types/node": "*" + checksum: f17023ce7b89c6124249c90211803a4aaa02886e12bc2d0d2cd47fa665eeb058db4d871ce4397d8e423f6beea97dd56835dd3fdbb921030fe4d887601e37d609 + languageName: node + linkType: hard + "@types/yargs-parser@npm:*": version: 21.0.0 resolution: "@types/yargs-parser@npm:21.0.0" @@ -13571,6 +13582,15 @@ __metadata: languageName: node linkType: hard +"graphql-ws@npm:^5.16.0": + version: 5.16.0 + resolution: "graphql-ws@npm:5.16.0" + peerDependencies: + graphql: ">=0.11 <=16" + checksum: e3e077ec187a92be3fd5dfae49e23af11a82711d3537064384f6861c2b5ceb339f60dc1871d0026b47ff05e4ed3c941404812a8086347e454688e0e6ef0e69f3 + languageName: node + linkType: hard + "graphql-ws@npm:^5.6.2": version: 5.8.1 resolution: "graphql-ws@npm:5.8.1" @@ -20577,21 +20597,6 @@ __metadata: languageName: unknown linkType: soft -"subscriptions-transport-ws@npm:^0.11.0": - version: 0.11.0 - resolution: "subscriptions-transport-ws@npm:0.11.0" - dependencies: - backo2: ^1.0.2 - eventemitter3: ^3.1.0 - iterall: ^1.2.1 - symbol-observable: ^1.0.4 - ws: ^5.2.0 || ^6.0.0 || ^7.0.0 - peerDependencies: - graphql: ^15.7.2 || ^16.0.0 - checksum: cc2e98d5c9d89c44d2e15eca188781c6ebae13d1661c42a99cee9d2897aebe2a22bc118eefff83244a79c88ee4ea24d46973ebf26ae7cb47ac1857fb8ee2c947 - languageName: node - linkType: hard - "subscriptions-transport-ws@npm:^0.9.18": version: 0.9.19 resolution: "subscriptions-transport-ws@npm:0.9.19" @@ -22481,7 +22486,7 @@ __metadata: languageName: node linkType: hard -"ws@npm:^8.16.0": +"ws@npm:^8.16.0, ws@npm:^8.18.0": version: 8.18.0 resolution: "ws@npm:8.18.0" peerDependencies: