Skip to content

Commit

Permalink
chore: Something is broken (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
simonas-notcat committed Dec 11, 2019
1 parent aea2418 commit f23263b
Show file tree
Hide file tree
Showing 16 changed files with 148 additions and 138 deletions.
24 changes: 6 additions & 18 deletions examples/expressjs-ethr/src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ const defaultPath = __dirname + '/.daf'
const identityStoreFilename = process.env.DAF_IDENTITY_STORE ?? defaultPath + '/identity-store.json'
const dataStoreFilename = process.env.DAF_DATA_STORE ?? defaultPath + '/data-store.sqlite3'
const infuraProjectId = process.env.DAF_INFURA_ID ?? '5ffc47f65c4042ce847ef66a3fa70d4c'
if (process.env.DAF_TG_URI) TG.ServiceController.defaultUri = process.env.DAF_TG_URI
if (process.env.DAF_TG_WSURI) TG.ServiceController.defaultWsUri = process.env.DAF_TG_WSURI
TG.ServiceController.webSocketImpl = ws

if (!process.env.DAF_IDENTITY_STORE || process.env.DAF_DATA_STORE || process.env.DAF_ENCRYPTION_STORE) {
const fs = require('fs')
Expand All @@ -41,6 +44,7 @@ if (process.env.DAF_UNIVERSAL_RESOLVER_URL) {
}

const identityControllers = [new EthrDidFsController(identityStoreFilename)]
const serviceControllers = [TG.ServiceController]

const messageValidator = new DBG.MessageValidator()
messageValidator
Expand All @@ -53,29 +57,13 @@ messageValidator
const actionHandler = new DBG.ActionHandler()
actionHandler
.setNext(new DIDComm.ActionHandler())
.setNext(
new TG.ActionHandler({
uri: process.env.DAF_TG_URI,
}),
)
.setNext(new TG.ActionHandler())
.setNext(new W3c.ActionHandler())
.setNext(new SD.ActionHandler())

const serviceControllersWithConfig = [
// { controller: Rnd.RandomMessageService, config: {}},
{
controller: TG.TrustGraphServiceController,
config: {
uri: process.env.DAF_TG_URI,
wsUri: process.env.DAF_TG_WSURI,
webSocketImpl: ws,
},
},
]

export const core = new Daf.Core({
identityControllers,
serviceControllersWithConfig,
serviceControllers,
didResolver,
messageValidator,
actionHandler,
Expand Down
5 changes: 3 additions & 2 deletions examples/expressjs-ethr/src/web-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ async function main() {
server.listen(port, async () => {
console.log(`Server running at http://localhost:${port}/`)

await core.startServices()
await core.syncServices(await dataStore.latestMessageTimestamps())
await core.setupServices()
await core.listen()
await core.getMessagesSince(await dataStore.latestMessageTimestamps())
})
}

Expand Down
10 changes: 5 additions & 5 deletions examples/expressjs-ethr/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -593,8 +593,8 @@ daf-core@../../packages/daf-core, daf-core@^0.10.0:
debug "^4.1.1"
events "^3.0.0"

daf-data-store@../../packages/daf-data-store, daf-data-store@^0.10.0:
version "0.10.0"
daf-data-store@../../packages/daf-data-store, daf-data-store@^0.10.1:
version "0.10.1"
dependencies:
blakejs "^1.1.0"
daf-core "^0.10.0"
Expand Down Expand Up @@ -631,9 +631,9 @@ daf-ethr-did-fs@../../packages/daf-ethr-did-fs:
ethr-did "^1.1.0"

daf-node-sqlite3@../../packages/daf-node-sqlite3:
version "0.10.0"
version "0.10.1"
dependencies:
daf-data-store "^0.10.0"
daf-data-store "^0.10.1"
debug "^4.1.1"
sqlite3 "^4.1.0"

Expand Down Expand Up @@ -668,7 +668,7 @@ daf-sodium-fs@../../packages/daf-sodium-fs:
libsodium-wrappers "^0.7.6"

daf-trust-graph@../../packages/daf-trust-graph:
version "0.10.0"
version "0.10.2"
dependencies:
apollo-cache-inmemory "^1.6.3"
apollo-client "^2.6.4"
Expand Down
6 changes: 3 additions & 3 deletions packages/daf-cli/src/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ export const listen = async (pollSeconds?: number) => {
console.log('New message type:', msg.type)
})

await core.startServices()
await core.syncServices(await dataStore.latestMessageTimestamps())
await core.setupServices()
await core.getMessagesSince(await dataStore.latestMessageTimestamps())

if (pollSeconds) {
setInterval(async () => {
await core.syncServices(await dataStore.latestMessageTimestamps())
await core.getMessagesSince(await dataStore.latestMessageTimestamps())
}, pollSeconds * 1000)
}
}
24 changes: 7 additions & 17 deletions packages/daf-cli/src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ if (process.env.DAF_UNIVERSAL_RESOLVER_URL) {
})
}

if (process.env.DAF_TG_URI) TG.ServiceController.defaultUri = process.env.DAF_TG_URI
if (process.env.DAF_TG_WSURI) TG.ServiceController.defaultWsUri = process.env.DAF_TG_WSURI
TG.ServiceController.webSocketImpl = ws

const identityControllers = [new EthrDidFsController(identityStoreFilename)]
const serviceControllers = [TG.ServiceController]

const messageValidator = new DBG.MessageValidator()
messageValidator
Expand All @@ -56,28 +61,13 @@ messageValidator
const actionHandler = new DBG.ActionHandler()
actionHandler
.setNext(new DIDComm.ActionHandler())
.setNext(
new TG.ActionHandler({
uri: process.env.DAF_TG_URI,
}),
)
.setNext(new TG.ActionHandler())
.setNext(new W3c.ActionHandler())
.setNext(new SD.ActionHandler())

const serviceControllersWithConfig = [
{
controller: TG.TrustGraphServiceController,
config: {
uri: process.env.DAF_TG_URI,
wsUri: process.env.DAF_TG_WSURI,
webSocketImpl: ws,
},
},
]

export const core = new Daf.Core({
identityControllers,
serviceControllersWithConfig,
serviceControllers,
didResolver,
messageValidator,
actionHandler,
Expand Down
39 changes: 29 additions & 10 deletions packages/daf-core/src/core.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import { EventEmitter } from 'events'
import { DIDDocument } from 'did-resolver'
import { IdentityManager, IdentityController } from './identity/identity-manager'
import { ServiceManager, ServiceControllerWithConfig, LastMessageTimestamp } from './service-manager'
import { ServiceManager, LastMessageTimestampForInstance, ServiceEventTypes } from './service/service-manager'
import { ServiceControllerDerived } from './service/abstract-service-controller'
import { MessageValidator } from './message/message-validator'
import { ActionHandler } from './action/action-handler'
import { Action } from './types'
import { EncryptionKeyManager } from './encryption-manager'
import { Message } from './message/message'

import Debug from 'debug'
const debug = Debug('core')
const debug = Debug('daf:core')

export const EventTypes = {
validatedMessage: 'validatedMessage',
Expand All @@ -23,7 +24,7 @@ export interface Resolver {
interface Config {
didResolver: Resolver
identityControllers: IdentityController[]
serviceControllersWithConfig: ServiceControllerWithConfig[]
serviceControllers: ServiceControllerDerived[]
messageValidator: MessageValidator
actionHandler?: ActionHandler
encryptionKeyManager?: EncryptionKeyManager
Expand All @@ -49,8 +50,7 @@ export class Core extends EventEmitter {
this.didResolver = config.didResolver

this.serviceManager = new ServiceManager({
serviceControllersWithConfig: config.serviceControllersWithConfig,
validateMessage: this.validateMessage.bind(this),
controllers: config.serviceControllers,
didResolver: this.didResolver,
})

Expand All @@ -59,14 +59,33 @@ export class Core extends EventEmitter {
this.actionHandler = config.actionHandler
}

async startServices() {
async setupServices() {
const issuers = await this.identityManager.listIssuers()
await this.serviceManager.configureServices(issuers)
await this.serviceManager.initServices()
await this.serviceManager.setupServices(issuers)
}

async syncServices(lastMessageTimestamps: LastMessageTimestamp[]) {
await this.serviceManager.syncServices(lastMessageTimestamps)
async listen() {
debug('Listening...')
this.serviceManager.on(ServiceEventTypes.NewMessages, this.validateMessages)
this.serviceManager.listen()
}

async getMessagesSince(ts: LastMessageTimestampForInstance[]): Promise<Message[]> {
const rawMessages = await this.serviceManager.getMessagesSince(ts)
return this.validateMessages(rawMessages)
}

public async validateMessages(messages: Message[]): Promise<Message[]> {
debug('validateMessages %O', messages)
const result: Message[] = []
for (const message of messages) {
try {
const validMessage = await this.validateMessage(message)
result.push(validMessage)
} catch (e) {}
}

return result
}

public async validateMessage(message: Message): Promise<Message> {
Expand Down
8 changes: 2 additions & 6 deletions packages/daf-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@ export { EncryptionKeyManager, KeyPair } from './encryption-manager'
export { IdentityController, IdentityManager, Issuer } from './identity/identity-manager'
export { AbstractMessageValidator } from './message/message-validator'
export { Message } from './message/message'
export {
ServiceController,
ServiceControllerOptions,
ServiceControllerWithConfig,
ServiceInstanceId,
} from './service-manager'
export { ServiceManager, LastMessageTimestampForInstance, ServiceEventTypes } from './service/service-manager'
export { AbstractServiceController } from './service/abstract-service-controller'
import * as Types from './types'
import { baseTypeDefs } from './graphql/graphql-base-type-defs'
import * as GqlCore from './graphql/graphql-core'
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
import { AbstractServiceController, ServiceEventTypes } from '../abstract-service-controller'
import { AbstractServiceController } from '../abstract-service-controller'
import { ServiceEventTypes } from '../service-manager'
import { Issuer } from '../../identity/identity-manager'
import { Resolver } from '../../core'
import { Message } from '../../message/message'

const msg1 = new Message({ raw: 'test1', meta: { type: 'test' } })
const msg2 = new Message({ raw: 'test2', meta: { type: 'test' } })
const msg1 = new Message({ raw: 'test1', meta: { type: 'mockService', id: 'https://from-did-doc' } })
const msg2 = new Message({ raw: 'test2', meta: { type: 'mockService', id: 'https://from-did-doc' } })

export class MockServiceController extends AbstractServiceController {
static defaultServiceEndpoint: string = 'https://default.host/path'
readonly type = 'mockService'
private endPointUrl: string

public ready: Promise<boolean>

constructor(issuer: Issuer, didResolver: Resolver) {
super(issuer, didResolver)
this.endPointUrl = 'https://from-did-doc'
this.ready = new Promise((resolve, reject) => {
// do some async stuff
resolve(true)
})
}

instanceId() {
Expand Down Expand Up @@ -50,6 +57,12 @@ it('should be possible to set configuration as a static property', async () => {
expect(MockServiceController.defaultServiceEndpoint).toEqual('https://custom.host/path')
})

it('resolves ready promise after finishing async logic in constructor', async () => {
const controller = new MockServiceController(mockIssuer, mockResolver)
const ready = await controller.ready
expect(ready).toEqual(true)
})

it('returns and emits an event with the same message array ', async () => {
const controller = new MockServiceController(mockIssuer, mockResolver)
spyOn(controller, 'emit')
Expand Down
5 changes: 1 addition & 4 deletions packages/daf-core/src/service/abstract-service-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@ import { Issuer } from '../identity/identity-manager'
import { Resolver } from '../core'
import { Message } from '../message/message'

export enum ServiceEventTypes {
NewMessages = 'NewMessages',
}

export abstract class AbstractServiceController extends EventEmitter {
constructor(readonly issuer: Issuer, readonly didResolver: Resolver) {
super()
}
abstract ready: Promise<boolean> // you cannot have an async constructor
abstract instanceId(): { did: string; type: string; id: string }
abstract getMessagesSince(timestamp: number): Promise<Message[]>
abstract listen(): void
Expand Down
20 changes: 12 additions & 8 deletions packages/daf-core/src/service/service-manager.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import { EventEmitter } from 'events'
import { Resolver } from '../core'
import {
AbstractServiceController,
ServiceControllerDerived,
ServiceEventTypes,
} from './abstract-service-controller'
import { AbstractServiceController, ServiceControllerDerived } from './abstract-service-controller'
import { Issuer } from '../identity/identity-manager'
import { Message } from '../message/message'
import Debug from 'debug'
const debug = Debug('daf:service-manager')

interface Options {
controllers: ServiceControllerDerived[]
didResolver: Resolver
export enum ServiceEventTypes {
NewMessages = 'NewMessages',
}

export interface LastMessageTimestampForInstance {
Expand All @@ -20,6 +17,11 @@ export interface LastMessageTimestampForInstance {
id: string
}

interface Options {
controllers: ServiceControllerDerived[]
didResolver: Resolver
}

export class ServiceManager extends EventEmitter {
private controllerInstances: AbstractServiceController[]
private controllers: ServiceControllerDerived[]
Expand All @@ -36,13 +38,15 @@ export class ServiceManager extends EventEmitter {
for (const issuer of issuers) {
for (const controller of this.controllers) {
const instance = new controller(issuer, this.didResolver)
await instance.ready
instance.on(ServiceEventTypes.NewMessages, this.onNewMessages)
this.controllerInstances.push(instance)
}
}
}

private onNewMessages(messages: Message[]) {
debug('onNewMessage kakakaka %O', messages)
this.emit(ServiceEventTypes.NewMessages, messages)
}

Expand Down
4 changes: 2 additions & 2 deletions packages/daf-data-store/src/data-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ export class DataStore {
}

async latestMessageTimestamps() {
let query = `SELECT * FROM ( SELECT m.id, m."timestamp", m.receiver AS did, md. "type" AS sourceType, md.id AS sourceId FROM messages AS m
LEFT JOIN messages_meta_data AS md ON m.id = md.message_id order by m.timestamp desc) GROUP BY did, sourceType`
let query = `SELECT * FROM ( SELECT m."timestamp", m.receiver AS did, md. "type" , md.id FROM messages AS m
LEFT JOIN messages_meta_data AS md ON m.id = md.message_id order by m.timestamp desc) GROUP BY did, "type", id`

return await this.db.rows(query, [])
}
Expand Down
Loading

0 comments on commit f23263b

Please sign in to comment.