Skip to content

Commit

Permalink
feat(sdk): split runtime out from sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
zfy0701 committed Dec 31, 2022
1 parent 45def79 commit 123d466
Show file tree
Hide file tree
Showing 45 changed files with 332 additions and 232 deletions.
22 changes: 0 additions & 22 deletions packages/base/package.json

This file was deleted.

3 changes: 0 additions & 3 deletions packages/base/src/index.ts

This file was deleted.

10 changes: 0 additions & 10 deletions packages/base/tsconfig.json

This file was deleted.

11 changes: 11 additions & 0 deletions packages/runtime/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/** @type {import('ts-jest/dist/types').InitialOptionsTsJest} */
const { pathsToModuleNameMapper } = require('ts-jest');
const { compilerOptions } = require('./tsconfig');

// TODO seems not fully ignored
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
modulePathIgnorePatterns: ["<rootDir>/dist/", "<rootDir>/lib/", "<rootDir>/templates/"],
moduleNameMapper: pathsToModuleNameMapper(compilerOptions.paths, { prefix: '<rootDir>/src/' } )
};
5 changes: 2 additions & 3 deletions packages/runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
"test": "jest"
},
"dependencies": {
"@sentio/protos": "^1.0.0",
"@ethersproject/providers": "~5.7.0",
"bignumber.js": "^9.1.0",
"command-line-args": "^5.2.1",
"command-line-usage": "^6.1.3",
"ethers": "~5.7.1",
"fs-extra": "^11.0.0",
"google-protobuf": "^3.15.8",
"js-yaml": "^4.1.0",
"nice-grpc": "^2.0.0",
"nice-grpc-client-middleware-retry": "^2.0.1",
"nice-grpc-error-details": "^0.1.4",
Expand All @@ -33,7 +33,6 @@
"@types/expect": "^24.3.0",
"@types/fs-extra": "^9.0.13",
"@types/google-protobuf": "^3.15.6",
"@types/js-yaml": "^4.0.5",
"@types/node": "^18.0.4"
},
"bin": {
Expand All @@ -43,7 +42,7 @@
"types": "./lib/index.d.ts",
"module": "./lib/index.js",
"files": [
"{lib,src,templates}",
"{lib,src}",
"!{lib,src}/tests",
"!**/*.test.{js,ts}"
]
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { Provider } from '@ethersproject/providers'

export class Endpoints {
static INSTANCE: Endpoints = new Endpoints()

static reset() {
Endpoints.INSTANCE = new Endpoints()
}
// evm providers
providers = new Map<number, Provider>()

Expand Down
61 changes: 61 additions & 0 deletions packages/runtime/src/full-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { CallContext } from 'nice-grpc'

// Different than the simple one which
import {
DataBinding,
HandlerType,
ProcessBindingsRequest,
ProcessConfigRequest,
ProcessorServiceImplementation,
StartRequest,
} from './gen/processor/protos/processor'

import { Empty } from '@sentio/protos/lib/google/protobuf/empty'

export class FullProcessorServiceImpl implements ProcessorServiceImplementation {
constructor(instance: ProcessorServiceImplementation) {
this.instance = instance
}

instance: ProcessorServiceImplementation

async getConfig(request: ProcessConfigRequest, context: CallContext) {
return this.instance.getConfig(request, context)
}

async start(request: StartRequest, context: CallContext) {
return this.instance.start(request, context)
}

async stop(request: Empty, context: CallContext) {
return this.instance.stop(request, context)
}

async processBindings(request: ProcessBindingsRequest, options: CallContext) {
for (const binding of request.bindings) {
this.adjustDataBinding(binding)
}
return this.instance.processBindings(request, options)
}

async *processBindingsStream(requests: AsyncIterable<DataBinding>, context: CallContext) {
throw new Error('Not Implemented for streaming')
// y this.instance.processBindingsStream(requests, context)
}

protected adjustDataBinding(dataBinding: DataBinding): void {
switch (dataBinding.handlerType) {
case HandlerType.UNKNOWN:
if (dataBinding.data?.ethBlock) {
if (dataBinding.data.raw.length === 0) {
// This is actually not needed in current system, just as initla test propose, move to test only
// when this is stable
dataBinding.data.raw = new TextEncoder().encode(JSON.stringify(dataBinding.data.ethBlock.block))
}
}
break
default:
break
}
}
}
Original file line number Diff line number Diff line change
@@ -1,86 +1,91 @@
/* eslint-disable */
import Long from "long";
import _m0 from "protobufjs/minimal";
import Long from 'long'
import _m0 from 'protobufjs/minimal'

export interface Timestamp {
seconds: bigint;
nanos: number;
seconds: bigint
nanos: number
}

function createBaseTimestamp(): Timestamp {
return { seconds: BigInt("0"), nanos: 0 };
return { seconds: BigInt('0'), nanos: 0 }
}

export const Timestamp = {
encode(message: Timestamp, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.seconds !== BigInt("0")) {
writer.uint32(8).int64(message.seconds.toString());
if (message.seconds !== BigInt('0')) {
writer.uint32(8).int64(message.seconds.toString())
}
if (message.nanos !== 0) {
writer.uint32(16).int32(message.nanos);
writer.uint32(16).int32(message.nanos)
}
return writer;
return writer
},

decode(input: _m0.Reader | Uint8Array, length?: number): Timestamp {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseTimestamp();
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input)
let end = length === undefined ? reader.len : reader.pos + length
const message = createBaseTimestamp()
while (reader.pos < end) {
const tag = reader.uint32();
const tag = reader.uint32()
switch (tag >>> 3) {
case 1:
message.seconds = longToBigint(reader.int64() as Long);
break;
message.seconds = longToBigint(reader.int64() as Long)
break
case 2:
message.nanos = reader.int32();
break;
message.nanos = reader.int32()
break
default:
reader.skipType(tag & 7);
break;
reader.skipType(tag & 7)
break
}
}
return message;
return message
},

fromJSON(object: any): Timestamp {
return {
seconds: isSet(object.seconds) ? BigInt(object.seconds) : BigInt("0"),
seconds: isSet(object.seconds) ? BigInt(object.seconds) : BigInt('0'),
nanos: isSet(object.nanos) ? Number(object.nanos) : 0,
};
}
},

toJSON(message: Timestamp): unknown {
const obj: any = {};
message.seconds !== undefined && (obj.seconds = message.seconds.toString());
message.nanos !== undefined && (obj.nanos = Math.round(message.nanos));
return obj;
const obj: any = {}
message.seconds !== undefined && (obj.seconds = message.seconds.toString())
message.nanos !== undefined && (obj.nanos = Math.round(message.nanos))
return obj
},

fromPartial(object: DeepPartial<Timestamp>): Timestamp {
const message = createBaseTimestamp();
message.seconds = object.seconds ?? BigInt("0");
message.nanos = object.nanos ?? 0;
return message;
const message = createBaseTimestamp()
message.seconds = object.seconds ?? BigInt('0')
message.nanos = object.nanos ?? 0
return message
},
};
}

type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined;
type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined

type DeepPartial<T> = T extends Builtin ? T
: T extends Array<infer U> ? Array<DeepPartial<U>> : T extends ReadonlyArray<infer U> ? ReadonlyArray<DeepPartial<U>>
: T extends {} ? { [K in keyof T]?: DeepPartial<T[K]> }
: Partial<T>;
type DeepPartial<T> = T extends Builtin
? T
: T extends Array<infer U>
? Array<DeepPartial<U>>
: T extends ReadonlyArray<infer U>
? ReadonlyArray<DeepPartial<U>>
: T extends {}
? { [K in keyof T]?: DeepPartial<T[K]> }
: Partial<T>

function longToBigint(long: Long) {
return BigInt(long.toString());
return BigInt(long.toString())
}

if (_m0.util.Long !== Long) {
_m0.util.Long = Long as any;
_m0.configure();
_m0.util.Long = Long as any
_m0.configure()
}

function isSet(value: any): boolean {
return value !== null && value !== undefined;
return value !== null && value !== undefined
}
7 changes: 7 additions & 0 deletions packages/runtime/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export * from './provider'
export * from './plugin'
export * from './state'
export * from './utils'
export * from './endpoints'
export * from './chain-config'
export * from './service'
File renamed without changes.
26 changes: 19 additions & 7 deletions packages/base/src/plugin.ts → packages/runtime/src/plugin.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import { DataBinding, HandlerType, ProcessConfigResponse, ProcessResult } from '@sentio/protos'
import { DataBinding, HandlerType, ProcessConfigResponse, ProcessResult, StartRequest } from '@sentio/protos'

export interface Plugin {
export abstract class Plugin {
name: string
supportedHandlers: HandlerType[]
supportedHandlers: HandlerType[] = []

configure(config: ProcessConfigResponse): void
processBinding(request: DataBinding): Promise<ProcessResult>
configure(config: ProcessConfigResponse) {}
start(start: StartRequest) {}
stateDiff(config: ProcessConfigResponse): boolean {
return false
}
async processBinding(request: DataBinding): Promise<ProcessResult> {
return ProcessResult.fromPartial({})
}
}

export class PluginManager {
Expand All @@ -16,21 +22,27 @@ export class PluginManager {

register(plugin: Plugin) {
this.plugins.push(plugin)
// for (const plugin of this.plugins) {
for (const handlerType of plugin.supportedHandlers) {
const exsited = this.typesToPlugin.get(handlerType)
if (exsited) {
throw new Error(`Duplicate plugin for ${handlerType}: ${exsited.name} and ${plugin.name}`)
}
this.typesToPlugin.set(handlerType, plugin)
}
// }
}

configure(config: ProcessConfigResponse) {
this.plugins.forEach((plugin) => plugin.configure(config))
}

start(start: StartRequest) {
this.plugins.forEach((plugin) => plugin.start(start))
}

stateDiff(config: ProcessConfigResponse): boolean {
return this.plugins.some((plugin) => plugin.stateDiff(config))
}

processBinding(request: DataBinding): Promise<ProcessResult> {
const plugin = this.typesToPlugin.get(request.handlerType)
if (!plugin) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import { CompressionAlgorithms } from '@grpc/grpc-js/build/src/compression-algor
import { ProcessorDefinition } from '@sentio/protos'
import { ProcessorServiceImpl } from './service'
import { setProvider } from './provider'
import { State } from '@sentio/base'
import { load } from './loader'
import { State } from './state'
import { Endpoints } from './endpoints'

import { load } from './loader'
import { FullProcessorServiceImpl } from './full-service'

State.reset()
global.ENDPOINTS = new Endpoints()
Endpoints.reset()

const optionDefinitions = [
{ name: 'target', type: String, defaultOption: true },
Expand Down Expand Up @@ -72,8 +74,8 @@ const fullPath = path.resolve(options['chains-config'])
const chainsConfig = fs.readJsonSync(fullPath)

setProvider(chainsConfig, options.concurrency, options['use-chainserver'])
globalThis.ENDPOINTS.chainQueryAPI = options['chainquery-server']
globalThis.ENDPOINTS.priceFeedAPI = options['pricefeed-server']
Endpoints.INSTANCE.chainQueryAPI = options['chainquery-server']
Endpoints.INSTANCE.priceFeedAPI = options['pricefeed-server']

if (options.debug) {
console.log('Starting Server', options)
Expand All @@ -85,7 +87,9 @@ const server = createServer({
'grpc.default_compression_algorithm': CompressionAlgorithms.gzip,
})

const service = new ProcessorServiceImpl(() => load(options.target), server.shutdown)
const baseService = new ProcessorServiceImpl(() => load(options.target), server.shutdown)
const service = new FullProcessorServiceImpl(baseService)

server.add(ProcessorDefinition, service)

server.listen('0.0.0.0:' + options.port)
Expand Down
Loading

0 comments on commit 123d466

Please sign in to comment.