Skip to content

Commit

Permalink
Subscription improvements (#2600)
Browse files Browse the repository at this point in the history
* WIP historical different trigger types

* Update to support delete in historical subscriptions, add support for WS in the playground

* Update changelog

* Update types for subscriptions

* Update changelog
  • Loading branch information
stwiname authored Nov 25, 2024
1 parent 9debdf0 commit 3b6e08a
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 44 deletions.
11 changes: 7 additions & 4 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ module.exports = {
'plugin:jest/recommended',
],
rules: {
// Note: you must disable the base rule as it can report incorrect errors
"require-await": "off",
"@typescript-eslint/require-await": "error",
// rules turned off in upstream project (also required when recommended-requiring-type-checking is extended)
'@typescript-eslint/explicit-function-return-type': 'off',
'@typescript-eslint/no-unsafe-argument': 'off',
Expand Down Expand Up @@ -62,11 +65,11 @@ module.exports = {
accessibility: 'no-public',
},
],
'@typescript-eslint/no-namespace': ['error', {allowDeclarations: true}],
'@typescript-eslint/no-namespace': ['error', { allowDeclarations: true }],
// "@typescript-eslint/member-ordering": "error",
// "@typescript-eslint/naming-convention": "error",
// "@typescript-eslint/no-param-reassign": "error",
'@typescript-eslint/promise-function-async': ['error', {checkArrowFunctions: false}],
'@typescript-eslint/promise-function-async': ['error', { checkArrowFunctions: false }],
// "arrow-body-style": "error",
complexity: ['error', 20],
curly: ['error', 'multi-line'],
Expand All @@ -87,7 +90,7 @@ module.exports = {
'line',
[
//Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
{pattern: ' Copyright \\d{4}(-\\d{4})? SubQuery Pte Ltd authors & contributors'},
{ pattern: ' Copyright \\d{4}(-\\d{4})? SubQuery Pte Ltd authors & contributors' },
' SPDX-License-Identifier: GPL-3.0',
],
2,
Expand All @@ -112,7 +115,7 @@ module.exports = {
'@typescript-eslint/parser': ['.ts', '.tsx'],
},
},
overrides:[
overrides: [
{
files: ['*.test.ts', '*.spec.ts'],
rules: {
Expand Down
2 changes: 1 addition & 1 deletion jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ module.exports = {
],

// The regexp pattern or array of patterns that Jest uses to detect test files
testRegex: '.*\\.spec\\.ts$',
testRegex: '.*\\.(spec)\\.ts$',

// This option allows the use of a custom results processor
// testResultsProcessor: undefined,
Expand Down
1 change: 1 addition & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- When using a GET query to retrieve an entity, it will include a “store” field.
- Support for historical indexing by timestamp as well as block height (#2584)
- Subscriptions not emitting deleted mutation type with historical (#2600)

### Removed
- Support for cockroach DB (#2584)
Expand Down
144 changes: 143 additions & 1 deletion packages/node-core/src/db/sync-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

import {INestApplication} from '@nestjs/common';
import {Test} from '@nestjs/testing';
import {getDbSizeAndUpdateMetadata} from '@subql/node-core';
import {delay} from '@subql/common';
import {Sequelize} from '@subql/x-sequelize';
import {NodeConfig} from '../configure/NodeConfig';
import {DbModule} from './db.module';
import {createSendNotificationTriggerFunction, createNotifyTrigger, getDbSizeAndUpdateMetadata} from './sync-helper';

const nodeConfig = new NodeConfig({subquery: 'packages/node-core/test/v1.0.0', subqueryName: 'test'});

Expand Down Expand Up @@ -42,4 +43,145 @@ describe('sync helper test', () => {
const dbSize = await getDbSizeAndUpdateMetadata(sequelize, schema);
expect(dbSize).not.toBeUndefined();
}, 50000);

describe('has the correct notification trigger payload', () => {
let client: unknown;

afterEach(async () => {
if (client) {
await (client as any).query('UNLISTEN "0xc4e66f9e1358fa3c"');
(client as any).removeAllListeners('notification');
sequelize.connectionManager.releaseConnection(client);
}
});

it('without historical', async () => {
const module = await Test.createTestingModule({
imports: [DbModule.forRootWithConfig(nodeConfig)],
}).compile();

app = module.createNestApplication();
await app.init();
sequelize = app.get(Sequelize);
schema = 'trigger-test';
const tableName = 'test';
await sequelize.createSchema(schema, {});
// mock create metadata table
await sequelize.query(`
CREATE TABLE IF NOT EXISTS "${schema}".${tableName} (
id text NOT NULL,
block_number int4 NOT NULL
)`);

await sequelize.query(createSendNotificationTriggerFunction(schema));
await sequelize.query(createNotifyTrigger(schema, tableName));

client = await sequelize.connectionManager.getConnection({
type: 'read',
});

await (client as any).query('LISTEN "0xc4e66f9e1358fa3c"');

const listener = jest.fn();
(client as any).on('notification', (msg: any) => {
console.log('Payload:', msg.payload);
listener(msg.payload);
});

await sequelize.query(`INSERT INTO "${schema}".${tableName} (id, block_number) VALUES ('1', 1);`);
await sequelize.query(`UPDATE "${schema}".${tableName} SET block_number = 2 WHERE id = '1';`);
await sequelize.query(`DELETE FROM "${schema}".${tableName} WHERE id = '1';`);
await delay(1);
expect(listener).toHaveBeenNthCalledWith(
1,
`{"id": "1", "_entity": {"id": "1", "block_number": 1}, "mutation_type": "INSERT"}`
);
expect(listener).toHaveBeenNthCalledWith(
2,
`{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "UPDATE"}`
);
expect(listener).toHaveBeenNthCalledWith(
3,
`{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "DELETE"}`
);
}, 10_000);

it('with historical', async () => {
const module = await Test.createTestingModule({
imports: [DbModule.forRootWithConfig(nodeConfig)],
}).compile();

app = module.createNestApplication();
await app.init();
sequelize = app.get(Sequelize);
schema = 'trigger-test';
const tableName = 'test';
await sequelize.createSchema(schema, {});
// mock create metadata table
await sequelize.query(`
CREATE TABLE IF NOT EXISTS "${schema}".${tableName} (
id text NOT NULL,
block_number int4 NOT NULL,
"_id" uuid NOT NULL,
"_block_range" int8range NOT NULL,
CONSTRAINT test_pk PRIMARY KEY (_id)
)`);

await sequelize.query(createSendNotificationTriggerFunction(schema));
await sequelize.query(createNotifyTrigger(schema, tableName));

client = await sequelize.connectionManager.getConnection({
type: 'read',
});

await (client as any).query('LISTEN "0xc4e66f9e1358fa3c"');

const listener = jest.fn();

(client as any).on('notification', (msg: any) => {
console.log('Payload:', msg.payload);
listener(msg.payload);
});

// Insert
await sequelize.query(
`INSERT INTO "${schema}".${tableName} (id, block_number, _id, _block_range) VALUES ('1', 1, 'adde2f8c-cb87-4e84-9600-77f434556e6d', int8range(1, NULL));`
);

// Simulate Update
const tx1 = await sequelize.transaction();
await sequelize.query(
`INSERT INTO "${schema}".${tableName} (id, block_number, _id, _block_range) VALUES ('1', 2, '9396aca4-cef2-4b52-98a7-c5f1ed3edb81', int8range(2, NULL));`
);
await sequelize.query(
`UPDATE "${schema}".${tableName} SET block_number = 2, _block_range = int8range(1, 2) WHERE _id = 'adde2f8c-cb87-4e84-9600-77f434556e6d';`,
{transaction: tx1}
);
await tx1.commit();

// Simulate delete
const tx2 = await sequelize.transaction();
await sequelize.query(
`UPDATE "${schema}".${tableName} SET _block_range = int8range(2, 3) WHERE _id = '9396aca4-cef2-4b52-98a7-c5f1ed3edb81';`,
{transaction: tx2}
);
await tx2.commit();

await delay(1);
// There is a limitation with our historical implementation where with the order or queries means we cant easily determine the difference between insert and update.
// For that reason the behaviour is kept the same as before delete was fixed.
expect(listener).toHaveBeenNthCalledWith(
1,
`{"id": "1", "_entity": {"id": "1", "block_number": 1}, "mutation_type": "UPDATE"}`
);
expect(listener).toHaveBeenNthCalledWith(
2,
`{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "UPDATE"}`
);
expect(listener).toHaveBeenNthCalledWith(
3,
`{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "DELETE"}`
);
}, 10_000);
});
});
39 changes: 34 additions & 5 deletions packages/node-core/src/db/sync-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,14 @@ CREATE OR REPLACE FUNCTION "${schema}".send_notification()
RETURNS trigger AS $$
DECLARE
channel TEXT;
table_name TEXT;
row RECORD;
payload JSONB;
prev_entity BOOLEAN;
next_entity BOOLEAN;
BEGIN
channel:= TG_ARGV[0];
table_name:= TG_TABLE_NAME;
IF (TG_OP = 'DELETE') THEN
row = OLD;
ELSE
Expand All @@ -189,13 +193,38 @@ BEGIN
'id', row.id,
'mutation_type', TG_OP,
'_entity', row);
IF payload -> '_entity' ? '_block_range' THEN
IF NOT upper_inf(row._block_range) THEN
IF payload -> '_entity' ? '_block_range' then
payload = payload #- '{"_entity","_id"}';
payload = payload #- '{"_entity","_block_range"}';
IF NOT upper_inf(row._block_range) then
-- Check if a newer version of the entity exists to determine operation
EXECUTE FORMAT(
'SELECT EXISTS (SELECT 1 FROM "${schema}".%I WHERE id = $1 AND lower(_block_range) = upper($2))',
TG_TABLE_NAME
)
INTO next_entity
USING row.id, row._block_range;
IF NOT next_entity THEN
payload = payload || '{"mutation_type": "DELETE"}';
ELSE
RETURN NULL;
END IF;
ELSE
-- Because of the oerder of operations with historical, we cannot determine if this is an update or insert
payload = payload || '{"mutation_type": "UPDATE"}';
-- EXECUTE FORMAT(
-- 'SELECT EXISTS (SELECT 1 FROM "${schema}".%I WHERE id = $1 AND upper(_block_range) = lower($2))',
-- TG_TABLE_NAME
-- )
-- INTO prev_entity
-- USING row.id, row._block_range;
-- IF NOT prev_entity THEN
-- payload = payload || '{"mutation_type": "INSERT"}';
-- ELSE
-- payload = payload || '{"mutation_type": "UPDATE"}';
-- END IF;
END IF;
payload = payload || '{"mutation_type": "UPDATE"}';
payload = payload #- '{"_entity","_id"}';
payload = payload #- '{"_entity","_block_range"}';
END IF;
IF (octet_length(payload::text) >= 8000) THEN
payload = payload || '{"_entity": null}';
Expand Down
3 changes: 3 additions & 0 deletions packages/query/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- WS libarary with support for graphiql (#2600)
- Subscription type from JSON to the correct entity type (#2600)

## [2.16.0] - 2024-11-22
### Changed
Expand Down
4 changes: 3 additions & 1 deletion packages/query/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@
"graphile-utils": "^4.12.2",
"graphql": "^15.8.0",
"graphql-query-complexity": "^0.11.0",
"graphql-ws": "^5.16.0",
"lodash": "^4.17.21",
"pg": "^8.12.0",
"pg-tsquery": "^8.4.2",
"postgraphile": "^4.13.0",
"postgraphile-plugin-connection-filter": "^2.2.2",
"rxjs": "^7.1.0",
"subscriptions-transport-ws": "^0.11.0",
"ws": "^8.18.0",
"yargs": "^16.2.0"
},
"devDependencies": {
Expand All @@ -65,6 +66,7 @@
"@types/express-pino-logger": "^4.0.5",
"@types/jest": "^27.5.2",
"@types/lodash": "^4.17.7",
"@types/ws": "^8",
"@types/yargs": "^16.0.9",
"nodemon": "^3.1.4"
}
Expand Down
22 changes: 16 additions & 6 deletions packages/query/src/graphql/graphql.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import {ApolloServer, UserInputError} from 'apollo-server-express';
import compression from 'compression';
import {NextFunction, Request, Response} from 'express';
import PinoLogger from 'express-pino-logger';
import {execute, GraphQLSchema, subscribe} from 'graphql';
import {GraphQLSchema} from 'graphql';
import {useServer} from 'graphql-ws/lib/use/ws';
import {set} from 'lodash';
import {Pool, PoolClient} from 'pg';
import {makePluginHook} from 'postgraphile';
import {SubscriptionServer} from 'subscriptions-transport-ws';
import {WebSocketServer} from 'ws';
import {Config} from '../configure';
import {queryExplainPlugin} from '../configure/x-postgraphile/debugClient';
import {getLogger, PinoConfig} from '../utils/logger';
Expand All @@ -36,6 +37,7 @@ const logger = getLogger('graphql-module');

const SCHEMA_RETRY_INTERVAL = 10; //seconds
const SCHEMA_RETRY_NUMBER = 5;
const WS_ROUTE = '/';

class NoInitError extends Error {
constructor() {
Expand All @@ -47,6 +49,7 @@ class NoInitError extends Error {
})
export class GraphqlModule implements OnModuleInit, OnModuleDestroy {
private _apolloServer?: ApolloServer;
private wsCleanup?: ReturnType<typeof useServer>;
constructor(
private readonly httpAdapterHost: HttpAdapterHost,
private readonly config: Config,
Expand Down Expand Up @@ -88,8 +91,9 @@ export class GraphqlModule implements OnModuleInit, OnModuleDestroy {

// eslint-disable-next-line @typescript-eslint/require-await
async onModuleDestroy(): Promise<void> {
return this.apolloServer?.stop();
await Promise.all([this.apolloServer?.stop(), this.wsCleanup?.dispose()]);
}

private async buildSchema(
dbSchema: string,
options: PostGraphileCoreOptions,
Expand Down Expand Up @@ -187,7 +191,9 @@ export class GraphqlModule implements OnModuleInit, OnModuleDestroy {
defaultMaxAge: 5,
calculateHttpHeaders: true,
}),
this.config.get('playground') ? playgroundPlugin({url: '/'}) : ApolloServerPluginLandingPageDisabled(),
this.config.get('playground')
? playgroundPlugin({url: '/', subscriptionUrl: argv.subscription ? WS_ROUTE : undefined})
: ApolloServerPluginLandingPageDisabled(),
queryComplexityPlugin({schema, maxComplexity: argv['query-complexity']}),
queryDepthLimitPlugin({schema, maxDepth: argv['query-depth-limit']}),
queryAliasLimit({schema, limit: argv['query-alias-limit']}),
Expand All @@ -207,8 +213,12 @@ export class GraphqlModule implements OnModuleInit, OnModuleDestroy {
});

if (argv.subscription) {
// TODO: Replace subscriptions-transport-ws with graphql-ws when support is added to graphql-playground
SubscriptionServer.create({schema, execute, subscribe}, {server: httpServer, path: '/'});
const wsServer = new WebSocketServer({
server: httpServer,
path: WS_ROUTE,
});

this.wsCleanup = useServer({schema}, wsServer);
}

app.use(PinoLogger(PinoConfig));
Expand Down
Loading

0 comments on commit 3b6e08a

Please sign in to comment.