Skip to content

Commit

Permalink
feat(cu): persist state buffer as compressed document attachment in P…
Browse files Browse the repository at this point in the history
…ouchDB #132
  • Loading branch information
TillaTheHun0 committed Nov 9, 2023
1 parent 5d51d02 commit c7aba5c
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 42 deletions.
14 changes: 7 additions & 7 deletions servers/cu/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion servers/cu/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"test": "node --test ./src"
},
"dependencies": {
"@permaweb/ao-loader": "~0.0.4",
"@permaweb/ao-loader": "0.0.7",
"cors": "^2.8.5",
"debug": "^4.3.4",
"express": "^4.18.2",
Expand Down
121 changes: 97 additions & 24 deletions servers/cu/src/domain/client/pouchdb.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { deflate, inflate } from 'node:zlib'
import { promisify } from 'node:util'

import { fromPromise, of, Rejected, Resolved } from 'hyper-async'
import { always, applySpec, head, map, prop } from 'ramda'
import { always, applySpec, head, lensPath, map, omit, pipe, prop, set } from 'ramda'
import { z } from 'zod'

import PouchDb from 'pouchdb'
Expand All @@ -8,6 +11,9 @@ import LevelDb from 'pouchdb-adapter-leveldb'

import { evaluationSchema, processSchema } from '../model.js'

const deflateP = promisify(deflate)
const inflateP = promisify(inflate)

/**
* An implementation of the db client using pouchDB
*/
Expand All @@ -29,15 +35,6 @@ const processDocSchema = z.object({
type: z.literal('process')
})

const evaluationDocSchema = z.object({
_id: z.string().min(1),
sortKey: evaluationSchema.shape.sortKey,
parent: z.string().min(1),
evaluatedAt: evaluationSchema.shape.evaluatedAt,
output: evaluationSchema.shape.output,
type: z.literal('evaluation')
})

function createEvaluationId ({ processId, sortKey }) {
return [processId, sortKey].join(',')
}
Expand Down Expand Up @@ -121,6 +118,17 @@ export function findLatestEvaluationWith ({ pouchDb }) {
return selector
}

const foundEvaluationDocSchema = z.object({
_id: z.string().min(1),
sortKey: evaluationSchema.shape.sortKey,
parent: z.string().min(1),
evaluatedAt: evaluationSchema.shape.evaluatedAt,
output: evaluationSchema.shape.output,
type: z.literal('evaluation')
})

const bufferLens = lensPath(['output', 'buffer'])

return ({ processId, to }) => {
return of({ processId, to })
.map(createSelector)
Expand All @@ -142,11 +150,22 @@ export function findLatestEvaluationWith ({ pouchDb }) {
}))
.map(head)
.chain((doc) => doc ? Resolved(doc) : Rejected(undefined))
/**
* Also retrieve the state buffer, persisted as an attachment
* and set it on the output.buffer field to match the expected output shape
*/
.chain(fromPromise(async (doc) => {
const buffer = await pouchDb.getAttachment(doc._id, 'buffer.txt')
/**
* Make sure to decompress the state buffer
*/
return set(bufferLens, await inflateP(buffer), doc)
}))
/**
* Ensure the input matches the expected
* shape
*/
.map(evaluationDocSchema.parse)
.map(foundEvaluationDocSchema.parse)
.map(applySpec({
sortKey: prop('sortKey'),
processId: prop('parent'),
Expand All @@ -160,31 +179,85 @@ export function findLatestEvaluationWith ({ pouchDb }) {
export function saveEvaluationWith ({ pouchDb, logger: _logger }) {
const logger = _logger.child('pouchDb:saveEvaluation')

const savedEvaluationDocSchema = z.object({
_id: z.string().min(1),
sortKey: evaluationSchema.shape.sortKey,
parent: z.string().min(1),
evaluatedAt: evaluationSchema.shape.evaluatedAt,
/**
* Omit buffer from the document schema (see _attachments below)
*/
output: evaluationSchema.shape.output.omit({ buffer: true }),
type: z.literal('evaluation'),
/**
* Since Bibo, the state of a process is a buffer, so we will store it as
* a document attachment in PouchDb, then reassemable the evaluation shape
* when it is used as a start point for eval (see findEvaluation)
*
* See https://pouchdb.com/api.html#save_attachment
*/
_attachments: z.object({
'buffer.txt': z.object({
content_type: z.literal('text/plain'),
data: z.any()
})
})
})

return (evaluation) => {
return of(evaluation)
.map(applySpec({
.chain(fromPromise(async (evaluation) =>
applySpec({
/**
* The processId concatenated with the sortKey
* is used as the _id for an evaluation
*
* This makes it easier to query using a range query against the
* primary index
*/
_id: (evaluation) =>
createEvaluationId({
processId: evaluation.processId,
sortKey: evaluation.sortKey
}),
sortKey: prop('sortKey'),
parent: prop('processId'),
output: prop('output'),
evaluatedAt: prop('evaluatedAt'),
type: always('evaluation')
}))
_id: (evaluation) =>
createEvaluationId({
processId: evaluation.processId,
sortKey: evaluation.sortKey
}),
sortKey: prop('sortKey'),
parent: prop('processId'),
output: pipe(
prop('output'),
/**
* Make sure to omit the buffer from the output field
* on the document. We will instead persist the state buffer
* as an attachment (see below)
*/
omit(['buffer'])
),
evaluatedAt: prop('evaluatedAt'),
type: always('evaluation'),
/**
* Store the state produced from the evaluation
* as an attachment. This allows for efficient storage
* and retrieval of the Buffer
*
* See https://pouchdb.com/api.html#save_attachment
*/
_attachments: always({
'buffer.txt': {
content_type: 'text/plain',
/**
* zlib compress the buffer before persisting
*
* In testing, this results in orders of magnitude
* smaller buffer and smaller persistence times
*/
data: await deflateP(evaluation.output.buffer)
}
})
})(evaluation)
))
/**
* Ensure the expected shape before writing to the db
*/
.map(evaluationDocSchema.parse)
.map(savedEvaluationDocSchema.parse)
.chain((doc) =>
of(doc)
.chain(fromPromise((doc) => pouchDb.get(doc._id)))
Expand Down
54 changes: 44 additions & 10 deletions servers/cu/src/domain/client/pouchdb.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/* eslint-disable no-throw-literal */
import { describe, test } from 'node:test'
import assert from 'node:assert'
import { deflate } from 'node:zlib'
import { promisify } from 'node:util'

import { findEvaluationsSchema, findLatestEvaluationSchema, findProcessSchema, saveEvaluationSchema, saveProcessSchema } from '../dal.js'
import {
Expand All @@ -14,6 +16,7 @@ import {
import { createLogger } from '../logger.js'

const logger = createLogger('ao-cu:readState')
const deflateP = promisify(deflate)

describe('pouchdb', () => {
describe('findProcess', () => {
Expand Down Expand Up @@ -135,6 +138,8 @@ describe('pouchdb', () => {
describe('findLatestEvaluation', () => {
test('return the lastest evaluation', async () => {
const evaluatedAt = new Date().toISOString()
const buffer = Buffer.from('Hello World', 'utf-8')

const findLatestEvaluation = findLatestEvaluationSchema.implement(
findLatestEvaluationWith({
pouchDb: {
Expand All @@ -155,12 +160,18 @@ describe('pouchdb', () => {
_id: 'process-123,sortkey-890',
sortKey: 'sortkey-890',
parent: 'process-123',
output: { state: { foo: 'bar' } },
output: { messages: [{ foo: 'bar' }] },
evaluatedAt,
type: 'evaluation'
}
]
}
},
getAttachment: async (_id, name) => {
assert.equal(_id, 'process-123,sortkey-890')
assert.equal(name, 'buffer.txt')
// impl will inflate this buffer
return deflateP(buffer)
}
},
logger
Expand All @@ -173,12 +184,14 @@ describe('pouchdb', () => {

assert.equal(res.sortKey, 'sortkey-890')
assert.equal(res.processId, 'process-123')
assert.deepStrictEqual(res.output, { state: { foo: 'bar' } })
assert.deepStrictEqual(res.output, { buffer, messages: [{ foo: 'bar' }] })
assert.equal(res.evaluatedAt.toISOString(), evaluatedAt)
})

test("without 'to', return the lastest interaction using collation sequence max char", async () => {
const evaluatedAt = new Date().toISOString()
const buffer = Buffer.from('Hello World', 'utf-8')

const findLatestEvaluation = findLatestEvaluationSchema.implement(
findLatestEvaluationWith({
pouchDb: {
Expand All @@ -199,12 +212,18 @@ describe('pouchdb', () => {
_id: 'process-123,sortkey-890',
sortKey: 'sortkey-890',
parent: 'process-123',
output: { state: { foo: 'bar' } },
output: { messages: [{ foo: 'bar' }] },
evaluatedAt,
type: 'evaluation'
}
]
}
},
getAttachment: async (_id, name) => {
assert.equal(_id, 'process-123,sortkey-890')
assert.equal(name, 'buffer.txt')
// impl will inflate this buffer
return deflateP(buffer)
}
},
logger
Expand All @@ -216,7 +235,7 @@ describe('pouchdb', () => {

assert.equal(res.sortKey, 'sortkey-890')
assert.equal(res.processId, 'process-123')
assert.deepStrictEqual(res.output, { state: { foo: 'bar' } })
assert.deepStrictEqual(res.output, { buffer, messages: [{ foo: 'bar' }] })
assert.equal(res.evaluatedAt.toISOString(), evaluatedAt)
})

Expand All @@ -239,17 +258,32 @@ describe('pouchdb', () => {
})

describe('saveEvaluation', () => {
test('save the evaluation to pouchdb', async () => {
test('save the evaluation to pouchdb with the buffer as an attachment', async () => {
const evaluatedAt = new Date().toISOString()
const buffer = Buffer.from('Hello World', 'utf-8')

const saveEvaluation = saveEvaluationSchema.implement(
saveEvaluationWith({
pouchDb: {
get: async () => undefined,
put: (doc) => {
put: async (doc) => {
assert.equal(doc._id, 'process-123,sortkey-890')
assert.equal(doc.sortKey, 'sortkey-890')
assert.equal(doc.parent, 'process-123')
assert.deepStrictEqual(doc.output, { state: { foo: 'bar' } })
// buffer is omitted from output and moved to _attachments
assert.deepStrictEqual(doc.output, { messages: [{ foo: 'bar' }] })
assert.deepStrictEqual(doc._attachments, {
'buffer.txt': {
content_type: 'text/plain',
/**
* zlib compress the buffer before persisting
*
* In testing, this results in orders of magnitude
* smaller buffer and smaller persistence times
*/
data: await deflateP(buffer)
}
})
assert.equal(doc.evaluatedAt.toISOString(), evaluatedAt)
return Promise.resolve(true)
}
Expand All @@ -261,7 +295,7 @@ describe('pouchdb', () => {
await saveEvaluation({
sortKey: 'sortkey-890',
processId: 'process-123',
output: { state: { foo: 'bar' } },
output: { buffer, messages: [{ foo: 'bar' }] },
evaluatedAt
})
})
Expand All @@ -274,7 +308,7 @@ describe('pouchdb', () => {
_id: 'process-123,sortkey-890',
sortKey: 'sortkey-890',
parent: 'process-123',
output: { state: { foo: 'bar' } },
output: { buffer: Buffer.from('Hello World', 'utf-8'), messages: [{ foo: 'bar' }] },
evaluatedAt: new Date()
}),
put: assert.fail
Expand All @@ -286,7 +320,7 @@ describe('pouchdb', () => {
await saveEvaluation({
sortKey: 'sortkey-890',
processId: 'process-123',
output: { state: { foo: 'bar' } },
output: { buffer: Buffer.from('Hello World', 'utf-8'), messages: [{ foo: 'bar' }] },
evaluatedAt: new Date()
})
})
Expand Down
Loading

0 comments on commit c7aba5c

Please sign in to comment.