Skip to content

Commit

Permalink
Merge pull request #1096 from permaweb/tillathehun0/ru-mode
Browse files Browse the repository at this point in the history
Add Read Unit mode to CU
  • Loading branch information
TillaTheHun0 authored Jan 6, 2025
2 parents 67bbbf7 + ac3837f commit 859af1e
Show file tree
Hide file tree
Showing 17 changed files with 142 additions and 27 deletions.
1 change: 1 addition & 0 deletions servers/cu/src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export const server = pipeP(
*/
const server = app.listen({ port: config.port, host: '0.0.0.0' }, () => {
logger(`Server is running on http://localhost:${config.port}`)
logger(`Server in unit mode: "${config.UNIT_MODE}"`)
})

const memMonitor = setInterval(async () => {
Expand Down
3 changes: 2 additions & 1 deletion servers/cu/src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ export const createApis = async (ctx) => {
id: workerId,
MODE: ctx.MODE,
LOG_CONFIG_PATH: ctx.LOG_CONFIG_PATH,
DEFAULT_LOG_LEVEL: ctx.DEFAULT_LOG_LEVEL
DEFAULT_LOG_LEVEL: ctx.DEFAULT_LOG_LEVEL,
DISABLE_PROCESS_EVALUATION_CACHE: ctx.DISABLE_PROCESS_EVALUATION_CACHE
}
}
}
Expand Down
34 changes: 31 additions & 3 deletions servers/cu/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ const DEFAULT_PROCESS_WASM_MODULE_FORMATS = [
*/
const serverConfigSchema = domainConfigSchema.extend({
MODE: z.enum(['development', 'production']),
/**
* Whether the unit is operating as a Compute Unit
* or Read Unit. Defaults to 'cu'.
*/
UNIT_MODE: z.enum(['cu', 'ru']),
port: positiveIntSchema,
ENABLE_METRICS_ENDPOINT: z.preprocess((val) => !!val, z.boolean())
})
Expand All @@ -51,13 +56,32 @@ const serverConfigSchema = domainConfigSchema.extend({
*/
/* eslint-disable no-throw-literal */

const preprocessUnitMode = (envConfig) => {
const { UNIT_MODE } = envConfig

if (UNIT_MODE === 'cu') return envConfig

/**
* A Read Unit's primary concern is serving dry-runs,
* and so does not create checkpoints and does not cache evaluation
* results to be served later.
*/
return {
...envConfig,
DISABLE_PROCESS_EVALUATION_CACHE: true,
DISABLE_PROCESS_CHECKPOINT_CREATION: true,
DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: true,
PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL: 0
}
}

/**
* If the WALLET is defined, then do nothing.
*
* Otherwise, check whether the WALLET_FILE env var is defined and load it contents
* as WALLET
*/
export const preprocessWallet = (envConfig) => {
const preprocessWallet = (envConfig) => {
const { WALLET, WALLET_FILE, ...theRestOfTheConfig } = envConfig

// WALLET takes precendent. nothing to do here
Expand All @@ -83,7 +107,7 @@ export const preprocessWallet = (envConfig) => {
const preprocessedServerConfigSchema = z.preprocess(
(envConfig, zodRefinementContext) => {
try {
return pipe(preprocessWallet, preprocessUrls)(envConfig)
return pipe(preprocessUnitMode, preprocessWallet, preprocessUrls)(envConfig)
} catch (message) {
zodRefinementContext.addIssue({ code: ZodIssueCode.custom, message })
}
Expand All @@ -100,6 +124,7 @@ const preprocessedServerConfigSchema = z.preprocess(
const CONFIG_ENVS = {
development: {
MODE,
UNIT_MODE: process.env.UNIT_MODE || 'cu',
DEFAULT_LOG_LEVEL: process.env.DEFAULT_LOG_LEVEL || 'debug',
LOG_CONFIG_PATH: process.env.LOG_CONFIG_PATH || '.loglevel',
MODULE_MODE: process.env.MODULE_MODE,
Expand All @@ -117,6 +142,7 @@ const CONFIG_ENVS = {
PROCESS_CHECKPOINT_CREATION_THROTTLE: process.env.PROCESS_CHECKPOINT_CREATION_THROTTLE || ms('30m'),
DISABLE_PROCESS_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_CHECKPOINT_CREATION !== 'false',
DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_FILE_CHECKPOINT_CREATION !== 'false',
DISABLE_PROCESS_EVALUATION_CACHE: process.env.DISABLE_PROCESS_EVALUATION_CACHE,
/**
* EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: Amount of gas for 2 hours of continuous compute (300_000_000_000_000)
* This was calculated by creating a process built to do continuous compute. After 2 hours, this process used
Expand Down Expand Up @@ -149,6 +175,7 @@ const CONFIG_ENVS = {
},
production: {
MODE,
UNIT_MODE: process.env.UNIT_MODE || 'cu',
DEFAULT_LOG_LEVEL: process.env.DEFAULT_LOG_LEVEL || 'debug',
LOG_CONFIG_PATH: process.env.LOG_CONFIG_PATH || '.loglevel',
MODULE_MODE: process.env.MODULE_MODE,
Expand All @@ -164,8 +191,9 @@ const CONFIG_ENVS = {
WALLET_FILE: process.env.WALLET_FILE,
MEM_MONITOR_INTERVAL: process.env.MEM_MONITOR_INTERVAL || ms('30s'),
PROCESS_CHECKPOINT_CREATION_THROTTLE: process.env.PROCESS_CHECKPOINT_CREATION_THROTTLE || ms('30m'),
DISABLE_PROCESS_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_CHECKPOINT_CREATION !== 'false', // TODO: disabled by default for now. Enable by default later
DISABLE_PROCESS_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_CHECKPOINT_CREATION !== 'false',
DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_FILE_CHECKPOINT_CREATION !== 'false',
DISABLE_PROCESS_EVALUATION_CACHE: process.env.DISABLE_PROCESS_EVALUATION_CACHE,
/**
* EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: Amount of gas for 2 hours of continuous compute (300_000_000_000_000)
* This was calculated by creating a process built to do continuous compute by adding and clearing a table.
Expand Down
17 changes: 13 additions & 4 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) {
}
}

export function maybePrependProcessMessage (ctx, logger) {
export function maybePrependProcessMessage (ctx, logger, loadTransactionData) {
return async function * ($messages) {
const isColdStart = isNil(ctx.from)

Expand Down Expand Up @@ -405,6 +405,15 @@ export function maybePrependProcessMessage (ctx, logger) {
*/
if (done || (parseTags(value.message.Tags).Type !== 'Process') || value.message.Cron) {
logger('Emitting process message at beginning of evaluation stream for process %s cold start', ctx.id)

/**
* data for a process can potentially be very large, and it's only needed
* as part of the very first process message sent to the process (aka. Bootloader).
*
* So in lieu of caching the process data, we fetch it once here, on cold start,
*/
const processData = await loadTransactionData(ctx.id).then(res => res.text())

yield {
/**
* Ensure the noSave flag is set, so evaluation does not persist
Expand All @@ -416,7 +425,7 @@ export function maybePrependProcessMessage (ctx, logger) {
message: {
Id: ctx.id,
Signature: ctx.signature,
Data: ctx.data,
Data: processData,
Owner: ctx.owner,
/**
* the target of the process message is itself
Expand Down Expand Up @@ -484,7 +493,7 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) {
)
}

function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, saveBlocks, logger }) {
function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, loadTransactionData, saveBlocks, logger }) {
loadTimestamp = fromPromise(loadTimestampSchema.implement(loadTimestamp))

const reconcileBlocks = reconcileBlocksWith({ findBlocks, loadBlocksMeta, saveBlocks })
Expand Down Expand Up @@ -697,7 +706,7 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, save
.map($messages => {
return composeStreams(
$messages,
Transform.from(maybePrependProcessMessage(ctx, logger))
Transform.from(maybePrependProcessMessage(ctx, logger, loadTransactionData))
)
})
.map(messages => ({ messages }))
Expand Down
16 changes: 12 additions & 4 deletions servers/cu/src/domain/lib/loadMessages.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,11 @@ describe('loadMessages', () => {
}
}

const loadTransactionData = async (id) => {
assert.equal(id, 'process-123')
return new Response('process data')
}

describe('should prepend the process message on cold start', () => {
test('if first stream message is not the process', async () => {
const $messages = Readable.from([
Expand All @@ -486,13 +491,14 @@ describe('loadMessages', () => {
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)
const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 2)
assert.equal(results[0].name, 'Process Message process-123')
assert.equal(results[0].message.Data, 'process data')
})

test('if the first stream message is a cron message', async () => {
Expand All @@ -506,24 +512,26 @@ describe('loadMessages', () => {
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)
const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 2)
assert.equal(results[0].name, 'Process Message process-123')
assert.equal(results[0].message.Data, 'process data')
})

test('if there are no messages', async () => {
const $messages = Readable.from([])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)
const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 1)
assert.equal(results[0].name, 'Process Message process-123')
assert.equal(results[0].message.Data, 'process data')
})
})

Expand All @@ -546,7 +554,7 @@ describe('loadMessages', () => {
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)
const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages)

const results = []
for await (const m of $merged) results.push(m)
Expand Down
5 changes: 5 additions & 0 deletions servers/cu/src/domain/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ export const domainConfigSchema = z.object({
* Whether to disable File Process Checkpoint creation entirely.
*/
DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: z.preprocess((val) => !!val, z.boolean()),
/**
* Whether to disable caching process evaluations, useful when operating as
* a RU
*/
DISABLE_PROCESS_EVALUATION_CACHE: z.preprocess((val) => !!val, z.boolean()),
/**
* If a process uses this amount of
* gas, then it will immediately create a Checkpoint at the end of the
Expand Down
12 changes: 7 additions & 5 deletions servers/cu/src/effects/ao-evaluation.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export function findEvaluationWith ({ db }) {
}
}

export function saveEvaluationWith ({ db, logger: _logger }) {
export function saveEvaluationWith ({ DISABLE_PROCESS_EVALUATION_CACHE, db, logger: _logger }) {
const toEvaluationDoc = pipe(
converge(
unapply(mergeAll),
Expand Down Expand Up @@ -138,8 +138,10 @@ export function saveEvaluationWith ({ db, logger: _logger }) {

function createQuery (evaluation) {
const evalDoc = toEvaluationDoc(evaluation)
const statements = [
{
const statements = []

if (!DISABLE_PROCESS_EVALUATION_CACHE) {
statements.push({
sql: `
INSERT OR IGNORE INTO ${EVALUATIONS_TABLE}
(id, "processId", "messageId", "deepHash", nonce, epoch, timestamp, ordinate, "blockHeight", cron, "evaluatedAt", output)
Expand All @@ -160,8 +162,8 @@ export function saveEvaluationWith ({ db, logger: _logger }) {
evalDoc.evaluatedAt.getTime(),
JSON.stringify(evalDoc.output)
]
}
]
})
}

/**
* Cron messages are not needed to be saved in the messages table
Expand Down
32 changes: 32 additions & 0 deletions servers/cu/src/effects/ao-evaluation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,38 @@ describe('ao-evaluation', () => {
evaluatedAt
})
})

test('noop insert evaluation if DISABLE_PROCESS_EVALUATION_CACHE', async () => {
const saveEvaluation = saveEvaluationSchema.implement(
saveEvaluationWith({
DISABLE_PROCESS_EVALUATION_CACHE: true,
db: {
transaction: async (statements) => {
assert.equal(statements.length, 1)
const [{ sql: messageDocSql }] = statements
assert.ok(messageDocSql.trim().startsWith(`INSERT OR IGNORE INTO ${MESSAGES_TABLE}`))

return Promise.resolve('process-123,1702677252111,1')
}
},
logger
})
)

await saveEvaluation({
isAssignment: false,
deepHash: 'deepHash-123',
timestamp: 1702677252111,
nonce: '1',
epoch: 0,
ordinate: 1,
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
output: { Messages: [{ foo: 'bar' }], Memory: 'foo' },
evaluatedAt
})
})
})

describe('findEvaluations', () => {
Expand Down
7 changes: 7 additions & 0 deletions servers/cu/src/effects/ao-process.js
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@ export function saveProcessWith ({ db }) {
}
return (process) => {
return of(process)
/**
* The data for the process could be very large, so we do not persist
* it, and instead hydrate it on the process message later, if needed.
*/
.map(evolve({
data: () => null
}))
/**
* Ensure the expected shape before writing to the db
*/
Expand Down
2 changes: 1 addition & 1 deletion servers/cu/src/effects/ao-process.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ describe('ao-process', () => {
assert.deepStrictEqual(parameters, [
'process-123',
'sig-123',
'data-123',
null, // data is nullified
null,
JSON.stringify({ address: 'owner-123', key: 'key-123' }),
JSON.stringify([{ name: 'foo', value: 'bar' }]),
Expand Down
6 changes: 5 additions & 1 deletion servers/cu/src/effects/worker/evaluator/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ export const createApis = async (ctx) => {
CHECKPOINT_GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL
}),
bootstrapWasmInstance: WasmClient.bootstrapWasmInstanceWith(),
saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db, logger: ctx.logger }),
saveEvaluation: AoEvaluationClient.saveEvaluationWith({
DISABLE_PROCESS_EVALUATION_CACHE: ctx.DISABLE_PROCESS_EVALUATION_CACHE,
db,
logger: ctx.logger
}),
ARWEAVE_URL: ctx.ARWEAVE_URL,
logger: ctx.logger
})
Expand Down
4 changes: 3 additions & 1 deletion servers/cu/src/routes/cron.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { always, compose, identity } from 'ramda'
import { z } from 'zod'

import { withMetrics, withMiddleware, withProcessRestrictionFromPath } from './middleware/index.js'
import { withCuMode } from './middleware/withCuMode.js'

/**
* TODO: could be moved into a route utils or middleware
Expand Down Expand Up @@ -37,8 +38,9 @@ export const withCronRoutes = app => {
'/cron/:processId',
compose(
withMiddleware,
withMetrics({ tracesFrom: (req) => ({ process_id: req.params.processId }) }),
withCuMode,
withProcessRestrictionFromPath,
withMetrics({ tracesFrom: (req) => ({ process_id: req.params.processId }) }),
always(async (req, res) => {
const {
params: { processId },
Expand Down
1 change: 1 addition & 0 deletions servers/cu/src/routes/middleware/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { withDomain } from './withDomain.js'

export * from './withProcessRestriction.js'
export * from './withMetrics.js'
export * from './withCuMode.js'

/**
* A convenience method that composes common middleware needed on most routes,
Expand Down
12 changes: 12 additions & 0 deletions servers/cu/src/routes/middleware/withCuMode.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { config } from '../../config.js'

const withUnitMode = (mode) => (handler) => (req, res, next) => {
const { UNIT_MODE } = config

if (UNIT_MODE !== mode) return res.status(404).send('Not Found')

return handler(req, res, next)
}

export const withCuMode = withUnitMode('cu')
export const withRuMode = withUnitMode('ru')
Loading

0 comments on commit 859af1e

Please sign in to comment.