Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support localStorage mode on sdk-base #109

Merged
merged 2 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 14 additions & 38 deletions .github/workflows/nodejs.yml
Original file line number Diff line number Diff line change
@@ -1,47 +1,23 @@
name: Node.js CI
name: CI

on:
push:
branches:
- main
- master
pull_request:
branches:
- main
- master
schedule:
- cron: '0 2 * * *'

jobs:
build:
runs-on: ${{ matrix.os }}

strategy:
fail-fast: false
matrix:
node-version: [14, 16, 18]
os: [ubuntu-latest]
branches: [ master ]

steps:
- name: Checkout Git Source
uses: actions/checkout@v2
pull_request:
branches: [ master ]

- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
workflow_dispatch: {}

- name: Install Dependencies
run: |
jobs:
Job:
name: Node.js
uses: artusjs/github-actions/.github/workflows/node-test.yml@v1
with:
os: 'ubuntu-latest'
version: '14, 16, 18'
install: |
tar xf zookeeper-3.4.6.tar.gz
mv zookeeper-3.4.6/conf/zoo_sample.cfg zookeeper-3.4.6/conf/zoo.cfg
./zookeeper-3.4.6/bin/zkServer.sh start
npm i

- name: Continuous Integration
run: npm run ci

- name: Code Coverage
uses: codecov/codecov-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}
npm i --no-package-lock --no-fund
17 changes: 17 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: Release

on:
push:
branches: [ master ]

workflow_dispatch: {}

jobs:
release:
name: Node.js
uses: artusjs/github-actions/.github/workflows/node-release.yml@v1
secrets:
NPM_TOKEN: ${{ secrets.NPM_TOKEN }}
GIT_TOKEN: ${{ secrets.GIT_TOKEN }}
with:
checkTest: false
4 changes: 2 additions & 2 deletions lib/client/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ class RpcConsumer extends Base {
async invoke(method, args, options = {}) {
if (!this._isReady) {
try {
await this.readyOrTimeout(options.responseTimeout || this.options.responseTimeout)
await this.readyOrTimeout(options.responseTimeout || this.options.responseTimeout);
} catch (err) {
throw new Error('[RpcConsumer] Consumer ready error: ' + err.message)
throw new Error('[RpcConsumer] Consumer ready error: ' + err.message);
}
}
const req = this.createRequest(method, args, options);
Expand Down
16 changes: 13 additions & 3 deletions lib/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,25 @@ class RpcServer extends Base {
}

async _handleRequest(req, conn) {
const id = req.data.serverSignature;
const serviceId = req.data.serverSignature;
req.data.interfaceName = req.data.interfaceName || req.data.serverSignature.split(':')[0];
const service = this._services.get(id);
const res = new this.responseClass(req, conn);
const ctx = this.createContext(req, res);
if (this.localStorage && ctx) {
await this.localStorage.run(ctx, async () => {
await this._invokeService(serviceId, req, res, ctx);
});
} else {
await this._invokeService(serviceId, req, res, ctx);
}
}

async _invokeService(serviceId, req, res, ctx) {
const service = this._services.get(serviceId);
this.emit('request', { req, ctx });
try {
if (!service) {
throw new Error('not found service: ' + id);
throw new Error('not found service: ' + serviceId);
}
await service.invoke(ctx, req, res);
} catch (err) {
Expand Down
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
"cov": "egg-bin cov",
"test": "npm run lint && npm run test-local",
"test-local": "egg-bin test",
"pkgfiles": "egg-bin pkgfiles --check",
"ci": "npm run start:zk && npm run pkgfiles && npm run lint && npm run cov",
"ci": "npm run start:zk && npm run lint && npm run cov",
"contributors": "contributors -f plain -o AUTHORS",
"start:zk": "node test/scripts/start.js",
"stop:zk": "node test/scripts/stop.js"
Expand Down Expand Up @@ -45,7 +44,7 @@
"koa-compose": "^4.1.0",
"mz-modules": "^2.1.0",
"pump": "^3.0.0",
"sdk-base": "^4.0.0",
"sdk-base": "^4.2.1",
"sofa-bolt-node": "^2.0.1",
"urlencode": "^1.1.0",
"utility": "^1.16.3",
Expand Down
12 changes: 6 additions & 6 deletions test/client/consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -516,20 +516,20 @@ describe('test/client/consumer.test.js', () => {
},
registry,
logger,
responseTimeout: 10
responseTimeout: 10,
});

await consumer.ready()
consumer._isReady = false
consumer.ready(false)
await consumer.ready();
consumer._isReady = false;
consumer.ready(false);

try {
await consumer.invoke('test', [{}])
await consumer.invoke('test', [{}]);
assert(false);
} catch (err) {
assert(err && err.message.includes('[RpcConsumer] Consumer ready error: Promise timed out after 10 milliseconds'));
}
})
});

describe('should filter invalid address', () => {
class CustomRegistry extends Base {
Expand Down
97 changes: 92 additions & 5 deletions test/server/server.test.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
'use strict';

const mm = require('mm');
const net = require('net');
const { AsyncLocalStorage } = require('async_hooks');
const assert = require('assert');
const mm = require('mm');
const sleep = require('mz-modules/sleep');
const request = require('../../').test;
const dubboProtocol = require('dubbo-remoting');
const protocol = require('sofa-bolt-node/lib/protocol');
const request = require('../../').test;
const RpcClient = require('../../').client.RpcClient;
const RpcServer = require('../../').server.RpcServer;
const protocol = require('sofa-bolt-node/lib/protocol');
const ZookeeperRegistry = require('../../').registry.ZookeeperRegistry;

const logger = console;
Expand Down Expand Up @@ -394,4 +393,92 @@ describe('test/server/server.test.js', () => {
});
});

describe('bolt with localStorage', () => {
const asyncLocalStorage = new AsyncLocalStorage();
before(async () => {
class NewRpcServer extends RpcServer {
createContext(req) {
return { req };
}
}
server = new NewRpcServer({
appName: 'test',
registry,
version,
logger,
port: 0,
localStorage: asyncLocalStorage,
});
server.addService({
interfaceName: 'com.alipay.x.facade.HelloRpcFacade',
version,
apiMeta: {
methods: [{
name: 'plus',
parameterTypes: [
'java.lang.Integer',
'java.lang.Integer',
],
returnType: 'java.lang.Integer',
}],
},
}, {
// a + b
async plus(a, b) {
return a + b;
},
});
server.addService({
interfaceName: 'com.alipay.test.TestService',
}, {
async error() {
const ctx = asyncLocalStorage.getStore();
console.log('ctx', !!ctx, typeof ctx);
throw new Error('mock error with ctx ' + !!ctx);
},
});
server.addService({
interfaceName: 'com.alipay.test.HelloService',
version,
uniqueId: 'hello',
}, {
async hello() {
await sleep(2000);
return 'hello';
},
});
await server.start();
await server.publish();
});

after(async () => {
await server.close();
server = null;
});

it('should invoke ok', () => {
return request(server)
.service('com.alipay.x.facade.HelloRpcFacade')
.invoke('plus')
.send([ 1, 2 ])
.expect(3);
});

it('should resultCode=01 if biz error', async () => {
let meta;
server.once('response', data => {
meta = data.res.meta;
});

await request(server)
.service('com.alipay.test.TestService')
.invoke('error')
.timeout(1000)
.send([])
.error(/mock error with ctx true/);

assert(meta && meta.resultCode === '01');
});
});

});