Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription improvements #2600

Merged
merged 6 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ module.exports = {
'plugin:jest/recommended',
],
rules: {
// Note: you must disable the base rule as it can report incorrect errors
"require-await": "off",
"@typescript-eslint/require-await": "error",
// rules turned off in upstream project (also required when recommended-requiring-type-checking is extended)
'@typescript-eslint/explicit-function-return-type': 'off',
'@typescript-eslint/no-unsafe-argument': 'off',
Expand Down Expand Up @@ -62,11 +65,11 @@ module.exports = {
accessibility: 'no-public',
},
],
'@typescript-eslint/no-namespace': ['error', {allowDeclarations: true}],
'@typescript-eslint/no-namespace': ['error', { allowDeclarations: true }],
// "@typescript-eslint/member-ordering": "error",
// "@typescript-eslint/naming-convention": "error",
// "@typescript-eslint/no-param-reassign": "error",
'@typescript-eslint/promise-function-async': ['error', {checkArrowFunctions: false}],
'@typescript-eslint/promise-function-async': ['error', { checkArrowFunctions: false }],
// "arrow-body-style": "error",
complexity: ['error', 20],
curly: ['error', 'multi-line'],
Expand All @@ -87,7 +90,7 @@ module.exports = {
'line',
[
//Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
{pattern: ' Copyright \\d{4}(-\\d{4})? SubQuery Pte Ltd authors & contributors'},
{ pattern: ' Copyright \\d{4}(-\\d{4})? SubQuery Pte Ltd authors & contributors' },
' SPDX-License-Identifier: GPL-3.0',
],
2,
Expand All @@ -112,7 +115,7 @@ module.exports = {
'@typescript-eslint/parser': ['.ts', '.tsx'],
},
},
overrides:[
overrides: [
{
files: ['*.test.ts', '*.spec.ts'],
rules: {
Expand Down
2 changes: 1 addition & 1 deletion jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ module.exports = {
],

// The regexp pattern or array of patterns that Jest uses to detect test files
testRegex: '.*\\.spec\\.ts$',
testRegex: '.*\\.(spec)\\.ts$',

// This option allows the use of a custom results processor
// testResultsProcessor: undefined,
Expand Down
1 change: 1 addition & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- When using a GET query to retrieve an entity, it will include a “store” field.
- Support for historical indexing by timestamp as well as block height (#2584)
- Subscriptions not emitting deleted mutation type with historical (#2600)

### Removed
- Support for cockroach DB (#2584)
Expand Down
144 changes: 143 additions & 1 deletion packages/node-core/src/db/sync-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

import {INestApplication} from '@nestjs/common';
import {Test} from '@nestjs/testing';
import {getDbSizeAndUpdateMetadata} from '@subql/node-core';
import {delay} from '@subql/common';
import {Sequelize} from '@subql/x-sequelize';
import {NodeConfig} from '../configure/NodeConfig';
import {DbModule} from './db.module';
import {createSendNotificationTriggerFunction, createNotifyTrigger, getDbSizeAndUpdateMetadata} from './sync-helper';

const nodeConfig = new NodeConfig({subquery: 'packages/node-core/test/v1.0.0', subqueryName: 'test'});

Expand Down Expand Up @@ -42,4 +43,145 @@ describe('sync helper test', () => {
const dbSize = await getDbSizeAndUpdateMetadata(sequelize, schema);
expect(dbSize).not.toBeUndefined();
}, 50000);

describe('has the correct notification trigger payload', () => {
let client: unknown;

afterEach(async () => {
if (client) {
await (client as any).query('UNLISTEN "0xc4e66f9e1358fa3c"');
(client as any).removeAllListeners('notification');
sequelize.connectionManager.releaseConnection(client);
}
});

it('without historical', async () => {
const module = await Test.createTestingModule({
imports: [DbModule.forRootWithConfig(nodeConfig)],
}).compile();

app = module.createNestApplication();
await app.init();
sequelize = app.get(Sequelize);
schema = 'trigger-test';
const tableName = 'test';
await sequelize.createSchema(schema, {});
// mock create metadata table
await sequelize.query(`
CREATE TABLE IF NOT EXISTS "${schema}".${tableName} (
id text NOT NULL,
block_number int4 NOT NULL
)`);

await sequelize.query(createSendNotificationTriggerFunction(schema));
await sequelize.query(createNotifyTrigger(schema, tableName));

client = await sequelize.connectionManager.getConnection({
type: 'read',
});

await (client as any).query('LISTEN "0xc4e66f9e1358fa3c"');

const listener = jest.fn();
(client as any).on('notification', (msg: any) => {
console.log('Payload:', msg.payload);
listener(msg.payload);
});

await sequelize.query(`INSERT INTO "${schema}".${tableName} (id, block_number) VALUES ('1', 1);`);
await sequelize.query(`UPDATE "${schema}".${tableName} SET block_number = 2 WHERE id = '1';`);
await sequelize.query(`DELETE FROM "${schema}".${tableName} WHERE id = '1';`);
await delay(1);
expect(listener).toHaveBeenNthCalledWith(
1,
`{"id": "1", "_entity": {"id": "1", "block_number": 1}, "mutation_type": "INSERT"}`
);
expect(listener).toHaveBeenNthCalledWith(
2,
`{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "UPDATE"}`
);
expect(listener).toHaveBeenNthCalledWith(
3,
`{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "DELETE"}`
);
}, 10_000);

it('with historical', async () => {
const module = await Test.createTestingModule({
imports: [DbModule.forRootWithConfig(nodeConfig)],
}).compile();

app = module.createNestApplication();
await app.init();
sequelize = app.get(Sequelize);
schema = 'trigger-test';
const tableName = 'test';
await sequelize.createSchema(schema, {});
// mock create metadata table
await sequelize.query(`
CREATE TABLE IF NOT EXISTS "${schema}".${tableName} (
id text NOT NULL,
block_number int4 NOT NULL,
"_id" uuid NOT NULL,
"_block_range" int8range NOT NULL,
CONSTRAINT test_pk PRIMARY KEY (_id)
)`);

await sequelize.query(createSendNotificationTriggerFunction(schema));
await sequelize.query(createNotifyTrigger(schema, tableName));

client = await sequelize.connectionManager.getConnection({
type: 'read',
});

await (client as any).query('LISTEN "0xc4e66f9e1358fa3c"');

const listener = jest.fn();

(client as any).on('notification', (msg: any) => {
console.log('Payload:', msg.payload);
listener(msg.payload);
});

// Insert
await sequelize.query(
`INSERT INTO "${schema}".${tableName} (id, block_number, _id, _block_range) VALUES ('1', 1, 'adde2f8c-cb87-4e84-9600-77f434556e6d', int8range(1, NULL));`
);

// Simulate Update
const tx1 = await sequelize.transaction();
await sequelize.query(
`INSERT INTO "${schema}".${tableName} (id, block_number, _id, _block_range) VALUES ('1', 2, '9396aca4-cef2-4b52-98a7-c5f1ed3edb81', int8range(2, NULL));`
);
await sequelize.query(
`UPDATE "${schema}".${tableName} SET block_number = 2, _block_range = int8range(1, 2) WHERE _id = 'adde2f8c-cb87-4e84-9600-77f434556e6d';`,
{transaction: tx1}
);
await tx1.commit();

// Simulate delete
const tx2 = await sequelize.transaction();
await sequelize.query(
`UPDATE "${schema}".${tableName} SET _block_range = int8range(2, 3) WHERE _id = '9396aca4-cef2-4b52-98a7-c5f1ed3edb81';`,
{transaction: tx2}
);
await tx2.commit();

await delay(1);
// There is a limitation with our historical implementation where with the order or queries means we cant easily determine the difference between insert and update.
// For that reason the behaviour is kept the same as before delete was fixed.
expect(listener).toHaveBeenNthCalledWith(
1,
`{"id": "1", "_entity": {"id": "1", "block_number": 1}, "mutation_type": "UPDATE"}`
);
expect(listener).toHaveBeenNthCalledWith(
2,
`{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "UPDATE"}`
);
expect(listener).toHaveBeenNthCalledWith(
3,
`{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "DELETE"}`
);
}, 10_000);
});
});
39 changes: 34 additions & 5 deletions packages/node-core/src/db/sync-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,14 @@ CREATE OR REPLACE FUNCTION "${schema}".send_notification()
RETURNS trigger AS $$
DECLARE
channel TEXT;
table_name TEXT;
row RECORD;
payload JSONB;
prev_entity BOOLEAN;
next_entity BOOLEAN;
BEGIN
channel:= TG_ARGV[0];
table_name:= TG_TABLE_NAME;
IF (TG_OP = 'DELETE') THEN
row = OLD;
ELSE
Expand All @@ -189,13 +193,38 @@ BEGIN
'id', row.id,
'mutation_type', TG_OP,
'_entity', row);
IF payload -> '_entity' ? '_block_range' THEN
IF NOT upper_inf(row._block_range) THEN
IF payload -> '_entity' ? '_block_range' then
payload = payload #- '{"_entity","_id"}';
payload = payload #- '{"_entity","_block_range"}';
IF NOT upper_inf(row._block_range) then
-- Check if a newer version of the entity exists to determine operation
EXECUTE FORMAT(
'SELECT EXISTS (SELECT 1 FROM "${schema}".%I WHERE id = $1 AND lower(_block_range) = upper($2))',
TG_TABLE_NAME
)
INTO next_entity
USING row.id, row._block_range;

IF NOT next_entity THEN
payload = payload || '{"mutation_type": "DELETE"}';
ELSE
RETURN NULL;
END IF;
ELSE
-- Because of the oerder of operations with historical, we cannot determine if this is an update or insert
payload = payload || '{"mutation_type": "UPDATE"}';
-- EXECUTE FORMAT(
-- 'SELECT EXISTS (SELECT 1 FROM "${schema}".%I WHERE id = $1 AND upper(_block_range) = lower($2))',
-- TG_TABLE_NAME
-- )
-- INTO prev_entity
-- USING row.id, row._block_range;
-- IF NOT prev_entity THEN
-- payload = payload || '{"mutation_type": "INSERT"}';
-- ELSE
-- payload = payload || '{"mutation_type": "UPDATE"}';
-- END IF;
END IF;
payload = payload || '{"mutation_type": "UPDATE"}';
payload = payload #- '{"_entity","_id"}';
payload = payload #- '{"_entity","_block_range"}';
END IF;
IF (octet_length(payload::text) >= 8000) THEN
payload = payload || '{"_entity": null}';
Expand Down
3 changes: 3 additions & 0 deletions packages/query/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- WS libarary with support for graphiql (#2600)
- Subscription type from JSON to the correct entity type (#2600)

## [2.16.0] - 2024-11-22
### Changed
Expand Down
4 changes: 3 additions & 1 deletion packages/query/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@
"graphile-utils": "^4.12.2",
"graphql": "^15.8.0",
"graphql-query-complexity": "^0.11.0",
"graphql-ws": "^5.16.0",
"lodash": "^4.17.21",
"pg": "^8.12.0",
"pg-tsquery": "^8.4.2",
"postgraphile": "^4.13.0",
"postgraphile-plugin-connection-filter": "^2.2.2",
"rxjs": "^7.1.0",
"subscriptions-transport-ws": "^0.11.0",
"ws": "^8.18.0",
"yargs": "^16.2.0"
},
"devDependencies": {
Expand All @@ -65,6 +66,7 @@
"@types/express-pino-logger": "^4.0.5",
"@types/jest": "^27.5.2",
"@types/lodash": "^4.17.7",
"@types/ws": "^8",
"@types/yargs": "^16.0.9",
"nodemon": "^3.1.4"
}
Expand Down
22 changes: 16 additions & 6 deletions packages/query/src/graphql/graphql.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import {ApolloServer, UserInputError} from 'apollo-server-express';
import compression from 'compression';
import {NextFunction, Request, Response} from 'express';
import PinoLogger from 'express-pino-logger';
import {execute, GraphQLSchema, subscribe} from 'graphql';
import {GraphQLSchema} from 'graphql';
import {useServer} from 'graphql-ws/lib/use/ws';
import {set} from 'lodash';
import {Pool, PoolClient} from 'pg';
import {makePluginHook} from 'postgraphile';
import {SubscriptionServer} from 'subscriptions-transport-ws';
import {WebSocketServer} from 'ws';
import {Config} from '../configure';
import {queryExplainPlugin} from '../configure/x-postgraphile/debugClient';
import {getLogger, PinoConfig} from '../utils/logger';
Expand All @@ -36,6 +37,7 @@ const logger = getLogger('graphql-module');

const SCHEMA_RETRY_INTERVAL = 10; //seconds
const SCHEMA_RETRY_NUMBER = 5;
const WS_ROUTE = '/';

class NoInitError extends Error {
constructor() {
Expand All @@ -47,6 +49,7 @@ class NoInitError extends Error {
})
export class GraphqlModule implements OnModuleInit, OnModuleDestroy {
private _apolloServer?: ApolloServer;
private wsCleanup?: ReturnType<typeof useServer>;
constructor(
private readonly httpAdapterHost: HttpAdapterHost,
private readonly config: Config,
Expand Down Expand Up @@ -88,8 +91,9 @@ export class GraphqlModule implements OnModuleInit, OnModuleDestroy {

// eslint-disable-next-line @typescript-eslint/require-await
async onModuleDestroy(): Promise<void> {
return this.apolloServer?.stop();
await Promise.all([this.apolloServer?.stop(), this.wsCleanup?.dispose()]);
}

private async buildSchema(
dbSchema: string,
options: PostGraphileCoreOptions,
Expand Down Expand Up @@ -187,7 +191,9 @@ export class GraphqlModule implements OnModuleInit, OnModuleDestroy {
defaultMaxAge: 5,
calculateHttpHeaders: true,
}),
this.config.get('playground') ? playgroundPlugin({url: '/'}) : ApolloServerPluginLandingPageDisabled(),
this.config.get('playground')
? playgroundPlugin({url: '/', subscriptionUrl: argv.subscription ? WS_ROUTE : undefined})
: ApolloServerPluginLandingPageDisabled(),
queryComplexityPlugin({schema, maxComplexity: argv['query-complexity']}),
queryDepthLimitPlugin({schema, maxDepth: argv['query-depth-limit']}),
queryAliasLimit({schema, limit: argv['query-alias-limit']}),
Expand All @@ -207,8 +213,12 @@ export class GraphqlModule implements OnModuleInit, OnModuleDestroy {
});

if (argv.subscription) {
// TODO: Replace subscriptions-transport-ws with graphql-ws when support is added to graphql-playground
SubscriptionServer.create({schema, execute, subscribe}, {server: httpServer, path: '/'});
const wsServer = new WebSocketServer({
server: httpServer,
path: WS_ROUTE,
});

this.wsCleanup = useServer({schema}, wsServer);
}

app.use(PinoLogger(PinoConfig));
Expand Down
Loading
Loading