Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add configuration for filter nested objects #310

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ node_modules
dist
dist-ts
.vscode
.idea
32 changes: 17 additions & 15 deletions lib/ddb/DDB.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { DynamoDB } from 'aws-sdk'
import { DynamoDBDocument, PutCommandInput, QueryCommandInput } from '@aws-sdk/lib-dynamodb'
import { DynamoDB } from '@aws-sdk/client-dynamodb'
import { LoggerFunction, DDBType } from '../types'

export interface DDBClient<T extends DDBType, TKey> {
get: (Key: TKey) => Promise<T | null>
put: (obj: T, putOptions?: Partial<DynamoDB.DocumentClient.PutItemInput>) => Promise<T>
put: (obj: T, putOptions?: Partial<PutCommandInput>) => Promise<T>
update: (Key: TKey, obj: Partial<T>) => Promise<T>
delete: (Key: TKey) => Promise<T>
query: (options: Omit<DynamoDB.DocumentClient.QueryInput, 'TableName' | 'Select'>) => AsyncGenerator<T, void, undefined>
query: (options: Omit<QueryCommandInput, 'TableName' | 'Select'>) => AsyncGenerator<T, void, undefined>
}

export const DDB = <T extends DDBType, TKey>({
Expand All @@ -18,15 +19,15 @@ export const DDB = <T extends DDBType, TKey>({
tableName: string
log: LoggerFunction
}): DDBClient<T, TKey> => {
const documentClient = new DynamoDB.DocumentClient({ service: dynamodb })
const documentClient = DynamoDBDocument.from(dynamodb)

const get = async (Key: TKey): Promise<null | T> => {
log('get', { tableName: tableName, Key })
try {
const { Item } = await documentClient.get({
TableName: tableName,
Key,
}).promise()
})
log('get:result', { Item })
return (Item as T) ?? null
} catch (e) {
Expand All @@ -35,15 +36,15 @@ export const DDB = <T extends DDBType, TKey>({
}
}

const put = async (Item: T, putOptions?: Partial<DynamoDB.DocumentClient.PutItemInput>): Promise<T> => {
const put = async (Item: T, putOptions?: Partial<PutCommandInput>): Promise<T> => {
log('put', { tableName: tableName, Item })
try {
const { Attributes } = await documentClient.put({
TableName: tableName,
Item,
ReturnValues: 'ALL_OLD',
...putOptions,
}).promise()
})
return Attributes as T
} catch (e) {
log('put:error', e)
Expand All @@ -55,15 +56,16 @@ export const DDB = <T extends DDBType, TKey>({
log('update', { tableName: tableName, Key, obj })
try {
const AttributeUpdates = Object.entries(obj)
.map(([key, Value]) => ({ [key]: { Value, Action: 'PUT' } }))
.map(([key, Value]) => ({ [key]: { Value } }))
.reduce((memo, val) => ({ ...memo, ...val }))

const { Attributes } = await documentClient.update({
TableName: tableName,
Key,
AttributeUpdates,
ReturnValues: 'ALL_NEW',
}).promise()
AttributeUpdates,
})

return Attributes as T
} catch (e) {
log('update:error', e)
Expand All @@ -78,22 +80,22 @@ export const DDB = <T extends DDBType, TKey>({
TableName: tableName,
Key,
ReturnValues: 'ALL_OLD',
}).promise()
})
return Attributes as T
} catch (e) {
log('delete:error', e)
throw e
}
}

const queryOnce = async (options: Omit<DynamoDB.DocumentClient.QueryInput, 'TableName' | 'Select'>) => {
const queryOnce = async (options: Omit<QueryCommandInput, 'TableName' | 'Select'>) => {
log('queryOnce', { tableName: tableName, options })
try {
const response = await documentClient.query({
TableName: tableName,
Select: 'ALL_ATTRIBUTES',
...options,
}).promise()
})

const { Items, LastEvaluatedKey, Count } = response
return {
Expand All @@ -107,7 +109,7 @@ export const DDB = <T extends DDBType, TKey>({
}
}

async function* query(options: Omit<DynamoDB.DocumentClient.QueryInput, 'TableName' | 'Select'>) {
async function* query(options: Omit<QueryCommandInput, 'TableName' | 'Select'>) {
log('query', { tableName: tableName, options })
try {
const results = await queryOnce(options)
Expand Down
26 changes: 0 additions & 26 deletions lib/index-test.ts

This file was deleted.

42 changes: 21 additions & 21 deletions lib/messages/connection_init.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
import { StepFunctions } from 'aws-sdk'
import { ConnectionInitMessage, MessageType } from 'graphql-ws'
import { StateFunctionInput, MessageHandler } from '../types'
import { postToConnection } from '../utils/postToConnection'
import { deleteConnection } from '../utils/deleteConnection'
import { defaultTTL } from '../utils/defaultTTL'
import { ConnectionInitMessage, MessageType } from 'graphql-ws';
import { MessageHandler } from '../types';
import { postToConnection } from '../utils/postToConnection';
import { deleteConnection } from '../utils/deleteConnection';
import { defaultTTL } from '../utils/defaultTTL';

/** Handler function for 'connection_init' message. */
export const connection_init: MessageHandler<ConnectionInitMessage> =
async ({ server, event, message }) => {
try {
const payload = await server.onConnectionInit?.({ event, message }) ?? message.payload ?? {}
const payload = (await server.onConnectionInit?.({ event, message })) ?? message.payload ?? {}

if (server.pingpong) {
await new StepFunctions()
.startExecution({
stateMachineArn: server.pingpong.machine,
name: event.requestContext.connectionId,
input: JSON.stringify({
connectionId: event.requestContext.connectionId,
domainName: event.requestContext.domainName,
stage: event.requestContext.stage,
state: 'PING',
choice: 'WAIT',
seconds: server.pingpong.delay,
} as StateFunctionInput),
})
.promise()
console.error('Missing implementation for pingpong');
// await new StepFunctions()
// .startExecution({
// stateMachineArn: server.pingpong.machine,
// name: event.requestContext.connectionId,
// input: JSON.stringify({
// connectionId: event.requestContext.connectionId,
// domainName: event.requestContext.domainName,
// stage: event.requestContext.stage,
// state: 'PING',
// choice: 'WAIT',
// seconds: server.pingpong.delay,
// } as StateFunctionInput),
// })
// .promise()
}

// Write to persistence
Expand Down
7 changes: 4 additions & 3 deletions lib/pubsub/getFilteredSubs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { collect } from 'streaming-iterables'
import { ServerClosure, Subscription } from '../types'

export const getFilteredSubs = async ({ server, event }: { server: Omit<ServerClosure, 'gateway'>, event: { topic: string, payload?: Record<string, any> } }): Promise<Subscription[]> => {
export const getFilteredSubs = async ({ server, event, filterNested }: { server: Omit<ServerClosure, 'gateway'>, event: { topic: string, payload?: Record<string, any> }, filterNested?: boolean }): Promise<Subscription[]> => {
if (!event.payload || Object.keys(event.payload).length === 0) {
server.log('getFilteredSubs', { event })

Expand All @@ -15,7 +15,7 @@ export const getFilteredSubs = async ({ server, event }: { server: Omit<ServerCl

return await collect(iterator)
}
const flattenPayload = collapseKeys(event.payload)
const flattenPayload = collapseKeys(event.payload, filterNested)

const filterExpressions: string[] = []
const expressionAttributeValues: { [key: string]: string | number | boolean } = {}
Expand Down Expand Up @@ -51,6 +51,7 @@ export const getFilteredSubs = async ({ server, event }: { server: Omit<ServerCl

export const collapseKeys = (
obj: Record<string, any>,
filterNested = true,
): Record<string, number | string | boolean> => {
const record = {}
for (const [k1, v1] of Object.entries(obj)) {
Expand All @@ -59,7 +60,7 @@ export const collapseKeys = (
continue
}

if (v1 && typeof v1 === 'object') {
if (filterNested && v1 && typeof v1 === 'object') {
const next = {}

for (const [k2, v2] of Object.entries(v1)) {
Expand Down
4 changes: 2 additions & 2 deletions lib/pubsub/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import { postToConnection } from '../utils/postToConnection'
import { buildContext } from '../utils/buildContext'
import { getFilteredSubs } from './getFilteredSubs'

export const publish = (serverPromise: Promise<ServerClosure> | ServerClosure): SubscriptionServer['publish'] => async event => {
export const publish = (serverPromise: Promise<ServerClosure> | ServerClosure): SubscriptionServer['publish'] => async (event: { topic: string, payload?: Record<string, any> }, filterNested = true) => {
const server = await serverPromise
server.log('pubsub:publish', { event })
const subscriptions = await getFilteredSubs({ server, event })
const subscriptions = await getFilteredSubs({ server, event, filterNested })
server.log('pubsub:publish', { subscriptions: subscriptions.map(({ connectionId, filter, subscription }) => ({ connectionId, filter, subscription }) ) })

const iters = subscriptions.map(async (sub) => {
Expand Down
67 changes: 0 additions & 67 deletions lib/test/mockServer.ts

This file was deleted.

16 changes: 7 additions & 9 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import { ConnectionInitMessage, PingMessage, PongMessage } from 'graphql-ws'
import { APIGatewayEventRequestContext, APIGatewayProxyEvent } from 'aws-lambda'
import { GraphQLError, GraphQLResolveInfo, GraphQLSchema } from 'graphql'
import { DynamoDB } from 'aws-sdk'
import { DynamoDB } from '@aws-sdk/client-dynamodb'
import { DDBClient } from './ddb/DDB'
import { ApiGatewayManagementApi } from '@aws-sdk/client-apigatewaymanagementapi'

export interface ServerArgs {
/**
Expand Down Expand Up @@ -84,7 +85,7 @@ export type ServerClosure = {
connection: DDBClient<Connection, {id: string }>
}
log: LoggerFunction
apiGatewayManagementApi?: ApiGatewayManagementApiSubset
apiGatewayManagementApi?: ApiGatewayManagementApi
pingpong?: {
machine: string
delay: number
Expand All @@ -105,8 +106,10 @@ export interface SubscriptionServer {
* Publish an event to all relevant subscriptions. This might take some time depending on how many subscriptions there are.
*
* The payload if present will be used to match against any filters the subscriptions might have.
*
* filterNested - If to applay filter nested objects, default: true.
*/
publish: (event: { topic: string, payload: Record<string, any>}) => Promise<void>
publish: (event: { topic: string, payload: Record<string, any>}, filterNested?: boolean) => Promise<void>
/**
* Send a complete message and end all relevant subscriptions. This might take some time depending on how many subscriptions there are.
*
Expand Down Expand Up @@ -191,12 +194,7 @@ export interface PubSubEvent {

export type MessageHandler<T> = (arg: { server: ServerClosure, event: APIGatewayWebSocketEvent, message: T }) => Promise<void>

/*
Matches the ApiGatewayManagementApi class from aws-sdk but only provides the methods we use
*/
export interface ApiGatewayManagementApiSubset {
postToConnection(input: { ConnectionId: string, Data: string }): { promise: () => Promise<any> }
deleteConnection(input: { ConnectionId: string }): { promise: () => Promise<any> }
export interface ApiGatewayManagementApiSubset extends ApiGatewayManagementApi {
}


Expand Down
27 changes: 14 additions & 13 deletions lib/utils/deleteConnection.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import { ApiGatewayManagementApi } from 'aws-sdk'
import { ApiGatewayManagementApi } from '@aws-sdk/client-apigatewaymanagementapi'
import { ServerClosure } from '../types'

export const deleteConnection = (server: ServerClosure) =>
async ({
connectionId: ConnectionId,
domainName,
stage,
}:{
export const deleteConnection =
(server: ServerClosure) =>
async ({
connectionId: ConnectionId,
domainName,
stage,
}: {
connectionId: string
domainName: string
stage: string
}): Promise<void> => {
server.log('deleteConnection', { connectionId: ConnectionId })
const api = server.apiGatewayManagementApi ??
server.log('deleteConnection', { connectionId: ConnectionId })
const api =
server.apiGatewayManagementApi ??
new ApiGatewayManagementApi({
apiVersion: 'latest',
endpoint: `${domainName}/${stage}`,
endpoint: `https://${domainName}/${stage}`,
})

await api.deleteConnection({ ConnectionId }).promise()
}
await api.deleteConnection({ ConnectionId })
}
Loading