diff --git a/boxes/yarn.lock b/boxes/yarn.lock index 3e2af4835f8..79699d7e7f0 100644 --- a/boxes/yarn.lock +++ b/boxes/yarn.lock @@ -78,7 +78,7 @@ __metadata: "@aztec/ethereum": "workspace:^" "@aztec/foundation": "workspace:^" idb: "npm:^8.0.0" - lmdb: "npm:^3.0.6" + lmdb: "npm:^3.2.0" languageName: node linkType: soft @@ -1553,44 +1553,44 @@ __metadata: languageName: node linkType: hard -"@lmdb/lmdb-darwin-arm64@npm:3.1.6": - version: 3.1.6 - resolution: "@lmdb/lmdb-darwin-arm64@npm:3.1.6" +"@lmdb/lmdb-darwin-arm64@npm:3.2.1": + version: 3.2.1 + resolution: "@lmdb/lmdb-darwin-arm64@npm:3.2.1" conditions: os=darwin & cpu=arm64 languageName: node linkType: hard -"@lmdb/lmdb-darwin-x64@npm:3.1.6": - version: 3.1.6 - resolution: "@lmdb/lmdb-darwin-x64@npm:3.1.6" +"@lmdb/lmdb-darwin-x64@npm:3.2.1": + version: 3.2.1 + resolution: "@lmdb/lmdb-darwin-x64@npm:3.2.1" conditions: os=darwin & cpu=x64 languageName: node linkType: hard -"@lmdb/lmdb-linux-arm64@npm:3.1.6": - version: 3.1.6 - resolution: "@lmdb/lmdb-linux-arm64@npm:3.1.6" +"@lmdb/lmdb-linux-arm64@npm:3.2.1": + version: 3.2.1 + resolution: "@lmdb/lmdb-linux-arm64@npm:3.2.1" conditions: os=linux & cpu=arm64 languageName: node linkType: hard -"@lmdb/lmdb-linux-arm@npm:3.1.6": - version: 3.1.6 - resolution: "@lmdb/lmdb-linux-arm@npm:3.1.6" +"@lmdb/lmdb-linux-arm@npm:3.2.1": + version: 3.2.1 + resolution: "@lmdb/lmdb-linux-arm@npm:3.2.1" conditions: os=linux & cpu=arm languageName: node linkType: hard -"@lmdb/lmdb-linux-x64@npm:3.1.6": - version: 3.1.6 - resolution: "@lmdb/lmdb-linux-x64@npm:3.1.6" +"@lmdb/lmdb-linux-x64@npm:3.2.1": + version: 3.2.1 + resolution: "@lmdb/lmdb-linux-x64@npm:3.2.1" conditions: os=linux & cpu=x64 languageName: node linkType: hard -"@lmdb/lmdb-win32-x64@npm:3.1.6": - version: 3.1.6 - resolution: "@lmdb/lmdb-win32-x64@npm:3.1.6" +"@lmdb/lmdb-win32-x64@npm:3.2.1": + version: 3.2.1 + resolution: "@lmdb/lmdb-win32-x64@npm:3.2.1" conditions: os=win32 & cpu=x64 languageName: node linkType: hard @@ -8398,16 +8398,16 @@ __metadata: languageName: node linkType: hard -"lmdb@npm:^3.0.6": - version: 3.1.6 - resolution: "lmdb@npm:3.1.6" - dependencies: - "@lmdb/lmdb-darwin-arm64": "npm:3.1.6" - "@lmdb/lmdb-darwin-x64": "npm:3.1.6" - "@lmdb/lmdb-linux-arm": "npm:3.1.6" - "@lmdb/lmdb-linux-arm64": "npm:3.1.6" - "@lmdb/lmdb-linux-x64": "npm:3.1.6" - "@lmdb/lmdb-win32-x64": "npm:3.1.6" +"lmdb@npm:^3.2.0": + version: 3.2.1 + resolution: "lmdb@npm:3.2.1" + dependencies: + "@lmdb/lmdb-darwin-arm64": "npm:3.2.1" + "@lmdb/lmdb-darwin-x64": "npm:3.2.1" + "@lmdb/lmdb-linux-arm": "npm:3.2.1" + "@lmdb/lmdb-linux-arm64": "npm:3.2.1" + "@lmdb/lmdb-linux-x64": "npm:3.2.1" + "@lmdb/lmdb-win32-x64": "npm:3.2.1" msgpackr: "npm:^1.11.2" node-addon-api: "npm:^6.1.0" node-gyp: "npm:latest" @@ -8429,7 +8429,7 @@ __metadata: optional: true bin: download-lmdb-prebuilds: bin/download-prebuilds.js - checksum: 10c0/081804f72aab6eb0f712654e3bbb2d454dd455bbfe09f223e10728971f201cfc166d4d6dd6a3099aabf79e4fd62e9c2a5eb9117bd5f2153ec5a419333f69a338 + checksum: 10c0/cccf17b95a821c56d3eef87abbdd7c30e4bc3147fbdd1cae44a5ba8f1977dbb3177021cfda9b7e8a293fc53fe78fc81979ce032885b777d3bec556c255c25724 languageName: node linkType: hard diff --git a/cspell.json b/cspell.json index 69fe7cf7dce..f8f39c661ab 100644 --- a/cspell.json +++ b/cspell.json @@ -11,8 +11,8 @@ "asyncify", "auditability", "authwit", - "authwits", "authwitness", + "authwits", "Automine", "autonat", "autorun", @@ -125,6 +125,7 @@ "ierc", "indexeddb", "initialise", + "initialised", "initialising", "interruptible", "isequal", diff --git a/spartan/aztec-network/templates/_helpers.tpl b/spartan/aztec-network/templates/_helpers.tpl index 9191dab8895..3a7ef490e2c 100644 --- a/spartan/aztec-network/templates/_helpers.tpl +++ b/spartan/aztec-network/templates/_helpers.tpl @@ -183,3 +183,8 @@ affinity: topologyKey: "kubernetes.io/hostname" namespaceSelector: {} {{- end -}} + +{{- define "aztec-network.gcpLocalSsd" -}} +nodeSelector: + cloud.google.com/gke-ephemeral-storage-local-ssd: "true" +{{- end -}} diff --git a/spartan/aztec-network/templates/boot-node.yaml b/spartan/aztec-network/templates/boot-node.yaml index e4ce5f4a6aa..6506d22bf30 100644 --- a/spartan/aztec-network/templates/boot-node.yaml +++ b/spartan/aztec-network/templates/boot-node.yaml @@ -11,6 +11,7 @@ spec: matchLabels: {{- include "aztec-network.selectorLabels" . | nindent 6 }} app: boot-node + {{- if not .Values.storage.localSsd }} volumeClaimTemplates: - metadata: name: boot-node-data @@ -19,12 +20,16 @@ spec: resources: requests: storage: {{ .Values.bootNode.storageSize }} + {{- end }} template: metadata: labels: {{- include "aztec-network.selectorLabels" . | nindent 8 }} app: boot-node spec: + {{- if .Values.storage.localSsd }} + {{- include "aztec-network.gcpLocalSsd" . | nindent 6 }} + {{- end }} dnsPolicy: ClusterFirstWithHostNet {{- if .Values.network.public }} hostNetwork: true @@ -174,6 +179,8 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: NODE_OPTIONS + value: "--max-old-space-size={{ .Values.bootNode.maxOldSpaceSize}}" - name: AZTEC_PORT value: "{{ .Values.bootNode.service.nodePort }}" - name: LOG_LEVEL @@ -220,6 +227,12 @@ spec: value: {{ .Values.bootNode.viemPollingInterval | quote }} - name: PEER_ID_PRIVATE_KEY value: "{{ .Values.bootNode.peerIdPrivateKey }}" + - name: DATA_DIRECTORY + value: "{{ .Values.bootNode.dataDir }}" + - name: DATA_STORE_MAP_SIZE_KB + value: "{{ .Values.storage.dataStoreMapSize }}" + - name: WS_DB_MAP_SIZE_KB + value: "{{ .Values.storage.worldStateMapSize }}" ports: - containerPort: {{ .Values.bootNode.service.nodePort }} - containerPort: {{ .Values.bootNode.service.p2pTcpPort }} @@ -232,9 +245,14 @@ spec: emptyDir: {} - name: config emptyDir: {} + {{- if .Values.storage.localSsd }} + - name: boot-node-data + emptyDir: {} + {{ else }} - name: boot-node-data persistentVolumeClaim: claimName: boot-node-data + {{- end }} {{- if .Values.bootNode.deployContracts }} - name: scripts configMap: diff --git a/spartan/aztec-network/templates/prover-broker.yaml b/spartan/aztec-network/templates/prover-broker.yaml index 8c0ee46d94e..75ad4b5635e 100644 --- a/spartan/aztec-network/templates/prover-broker.yaml +++ b/spartan/aztec-network/templates/prover-broker.yaml @@ -1,6 +1,6 @@ {{- if .Values.proverBroker.enabled }} apiVersion: apps/v1 -kind: ReplicaSet +kind: StatefulSet metadata: name: {{ include "aztec-network.fullname" . }}-prover-broker labels: @@ -11,6 +11,16 @@ spec: matchLabels: {{- include "aztec-network.selectorLabels" . | nindent 6 }} app: prover-broker + {{- if not .Values.storage.localSsd }} + volumeClaimTemplates: + - metadata: + name: prover-broker-data + spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: {{ .Values.proverBroker.storageSize }} + {{- end }} template: metadata: labels: @@ -18,6 +28,9 @@ spec: app: prover-broker spec: serviceAccountName: {{ include "aztec-network.fullname" . }}-node + {{- if .Values.storage.localSsd }} + {{- include "aztec-network.gcpLocalSsd" . | nindent 6 }} + {{- end }} {{- if .Values.network.public }} hostNetwork: true dnsPolicy: ClusterFirstWithHostNet @@ -26,6 +39,8 @@ spec: volumes: - name: config emptyDir: {} + - name: prover-broker-data + emptyDir: {} - name: scripts configMap: name: {{ include "aztec-network.fullname" . }}-scripts @@ -55,6 +70,8 @@ spec: volumeMounts: - name: config mountPath: /shared/config + - name: prover-broker-data + mountPath: {{ .Values.proverBroker.dataDir }} command: - "/bin/bash" - "-c" @@ -70,10 +87,14 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: NODE_OPTIONS + value: "--max-old-space-size={{ .Values.proverBroker.maxOldSpaceSize}}" - name: AZTEC_PORT value: "{{ .Values.proverBroker.service.nodePort }}" - name: LOG_LEVEL value: "{{ .Values.proverBroker.logLevel }}" + - name: DATA_DIRECTORY + value: "{{ .Values.proverBroker.dataDir }}" - name: LOG_JSON value: "1" - name: PROVER_BROKER_POLL_INTERVAL_MS @@ -83,11 +104,27 @@ spec: - name: PROVER_BROKER_JOB_MAX_RETRIES value: "{{ .Values.proverBroker.jobMaxRetries }}" - name: PROVER_BROKER_DATA_DIRECTORY - value: "{{ .Values.proverBroker.dataDirectory }}" + value: "{{ .Values.proverBroker.dataDir }}" + - name: PROVER_BROKER_DATA_MAP_SIZE_KB + value: "{{ .Values.storage.dataStoreMapSize }}" - name: OTEL_RESOURCE_ATTRIBUTES value: service.name={{ .Release.Name }},service.namespace={{ .Release.Namespace }},service.version={{ .Chart.AppVersion }},environment={{ .Values.environment | default "production" }} resources: {{- toYaml .Values.proverBroker.resources | nindent 12 }} + volumes: + - name: scripts + configMap: + name: {{ include "aztec-network.fullname" . }}-scripts + - name: config + emptyDir: {} + {{- if .Values.storage.localSsd }} + - name: prover-broker-data + emptyDir: {} + {{ else }} + - name: prover-broker-data + persistentVolumeClaim: + claimName: prover-broker-data + {{- end }} {{- end }} --- # Headless service for StatefulSet DNS entries diff --git a/spartan/aztec-network/templates/prover-node.yaml b/spartan/aztec-network/templates/prover-node.yaml index 0ee0241a1d3..34c52d10331 100644 --- a/spartan/aztec-network/templates/prover-node.yaml +++ b/spartan/aztec-network/templates/prover-node.yaml @@ -11,12 +11,25 @@ spec: matchLabels: {{- include "aztec-network.selectorLabels" . | nindent 6 }} app: prover-node + {{- if not .Values.storage.localSsd }} + volumeClaimTemplates: + - metadata: + name: prover-node-data + spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: {{ .Values.proverNode.storageSize }} + {{- end }} template: metadata: labels: {{- include "aztec-network.selectorLabels" . | nindent 8 }} app: prover-node spec: + {{- if .Values.storage.localSsd }} + {{- include "aztec-network.gcpLocalSsd" . | nindent 6 }} + {{- end }} {{- if .Values.network.public }} hostNetwork: true dnsPolicy: ClusterFirstWithHostNet @@ -106,6 +119,8 @@ spec: mountPath: /shared/p2p - name: config mountPath: /shared/config + - name: prover-node-data + mountPath: {{ .Values.proverNode.dataDir }} env: - name: K8S_POD_UID valueFrom: @@ -119,6 +134,8 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP + - name: NODE_OPTIONS + value: "--max-old-space-size={{ .Values.proverNode.maxOldSpaceSize}}" - name: AZTEC_PORT value: "{{ .Values.proverNode.service.nodePort }}" - name: LOG_LEVEL @@ -171,6 +188,12 @@ spec: value: "{{ .Values.aztec.epochProofClaimWindow }}" - name: PROVER_VIEM_POLLING_INTERVAL_MS value: {{ .Values.proverNode.viemPollingInterval | quote }} + - name: DATA_DIRECTORY + value: "{{ .Values.proverNode.dataDir }}" + - name: DATA_STORE_MAP_SIZE_KB + value: "{{ .Values.storage.dataStoreMapSize }}" + - name: WS_DB_MAP_SIZE_KB + value: "{{ .Values.storage.worldStateMapSize }}" ports: - containerPort: {{ .Values.proverNode.service.nodePort }} - containerPort: {{ .Values.proverNode.service.p2pTcpPort }} @@ -188,16 +211,14 @@ spec: emptyDir: {} - name: config emptyDir: {} - volumeClaimTemplates: - - metadata: - name: shared-volume - labels: - {{- include "aztec-network.labels" . | nindent 8 }} - spec: - accessModes: ["ReadWriteOnce"] - resources: - requests: - storage: {{ .Values.proverNode.storage }} + {{- if .Values.storage.localSsd }} + - name: prover-node-data + emptyDir: {} + {{ else }} + - name: prover-node-data + persistentVolumeClaim: + claimName: prover-node-data + {{- end }} {{if not .Values.network.public }} --- apiVersion: v1 diff --git a/spartan/aztec-network/templates/reth.yaml b/spartan/aztec-network/templates/reth.yaml index 69bd6037e29..e621630327c 100644 --- a/spartan/aztec-network/templates/reth.yaml +++ b/spartan/aztec-network/templates/reth.yaml @@ -143,7 +143,7 @@ spec: accessModes: ["ReadWriteOnce"] resources: requests: - storage: {{ .Values.ethereum.storage }} + storage: {{ .Values.ethereum.storageSize }} {{- end }} --- {{ end }} \ No newline at end of file diff --git a/spartan/aztec-network/templates/validator.yaml b/spartan/aztec-network/templates/validator.yaml index 8ff7cf7735f..02331567b02 100644 --- a/spartan/aztec-network/templates/validator.yaml +++ b/spartan/aztec-network/templates/validator.yaml @@ -12,6 +12,7 @@ spec: matchLabels: {{- include "aztec-network.selectorLabels" . | nindent 6 }} app: validator + {{- if not .Values.storage.localSsd }} volumeClaimTemplates: - metadata: name: validator-data @@ -20,12 +21,17 @@ spec: resources: requests: storage: {{ .Values.validator.storageSize }} + {{- end }} template: metadata: labels: {{- include "aztec-network.selectorLabels" . | nindent 8 }} app: validator spec: + {{- if .Values.storage.localSsd }} + {{- include "aztec-network.gcpLocalSsd" . | nindent 6 }} + {{- end }} + {{- if .Values.network.public }} hostNetwork: true dnsPolicy: ClusterFirstWithHostNet @@ -156,6 +162,8 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP + - name: NODE_OPTIONS + value: "--max-old-space-size={{ .Values.validator.maxOldSpaceSize}}" - name: AZTEC_PORT value: "{{ .Values.validator.service.nodePort }}" - name: LOG_LEVEL @@ -200,6 +208,12 @@ spec: value: {{ .Values.validator.viemPollingInterval | quote }} - name: SEQ_VIEM_POLLING_INTERVAL_MS value: {{ .Values.validator.viemPollingInterval | quote }} + - name: DATA_DIRECTORY + value: "{{ .Values.validator.dataDir }}" + - name: DATA_STORE_MAP_SIZE_KB + value: "{{ .Values.storage.dataStoreMapSize }}" + - name: WS_DB_MAP_SIZE_KB + value: "{{ .Values.storage.worldStateMapSize }}" ports: - containerPort: {{ .Values.validator.service.nodePort }} - containerPort: {{ .Values.validator.service.p2pTcpPort }} @@ -222,9 +236,14 @@ spec: emptyDir: {} - name: config emptyDir: {} + {{- if .Values.storage.localSsd }} + - name: validator-data + emptyDir: {} + {{ else }} - name: validator-data persistentVolumeClaim: claimName: validator-data + {{- end }} --- # If this is not a public network, create a headless service for StatefulSet DNS entries {{ if not .Values.network.public }} diff --git a/spartan/aztec-network/values.yaml b/spartan/aztec-network/values.yaml index ea363b9c88d..9f7ee290d30 100644 --- a/spartan/aztec-network/values.yaml +++ b/spartan/aztec-network/values.yaml @@ -11,6 +11,11 @@ network: public: false setupL2Contracts: true +storage: + localSsd: false + dataStoreMapSize: "134217728" # 128 GB + worldStateMapSize: "134217728" # 128 GB + telemetry: enabled: false otelCollectorEndpoint: @@ -59,6 +64,7 @@ bootNode: requests: memory: "4Gi" cpu: "1" + maxOldSpaceSize: "3584" deployContracts: true # Set to false to use manual contract addresses startupProbe: periodSeconds: 10 @@ -72,7 +78,6 @@ bootNode: outboxAddress: "" feeJuiceAddress: "" feeJuicePortalAddress: "" - storage: "8Gi" archiverPollingInterval: 1000 archiverViemPollingInterval: 1000 viemPollingInterval: 1000 @@ -115,6 +120,7 @@ validator: requests: memory: "4Gi" cpu: "1" + maxOldSpaceSize: "3584" archiverPollingInterval: 1000 archiverViemPollingInterval: 1000 attestationPollingInterval: 1000 @@ -146,11 +152,13 @@ proverNode: requests: memory: "4Gi" cpu: "1" - storage: "8Gi" + maxOldSpaceSize: "3584" archiverPollingInterval: 1000 archiverViemPollingInterval: 1000 pollInterval: 1000 viemPollingInterval: 1000 + dataDir: "/data" + storageSize: "1Gi" txGathering: timeoutMs: 60000 intervalMs: 1000 @@ -225,7 +233,7 @@ ethereum: requests: memory: "4Gi" cpu: "1" - storage: "80Gi" + storageSize: "80Gi" deployL1ContractsPrivateKey: proverAgent: @@ -240,7 +248,6 @@ proverAgent: logLevel: "debug; info: aztec:simulator, json-rpc" bb: hardwareConcurrency: "" - nodeSelector: {} resources: requests: memory: "4Gi" @@ -255,13 +262,14 @@ proverBroker: jobTimeoutMs: 30000 pollIntervalMs: 1000 jobMaxRetries: 3 - dataDirectory: "" logLevel: "debug; info: aztec:simulator, json-rpc" - nodeSelector: {} + storageSize: "1Gi" + dataDir: "/data" resources: requests: memory: "4Gi" cpu: "1" + maxOldSpaceSize: "3584" jobs: deployL1Verifier: diff --git a/spartan/aztec-network/values/exp-1.yaml b/spartan/aztec-network/values/exp-1.yaml index 0f086a93c65..190fb2fefe0 100644 --- a/spartan/aztec-network/values/exp-1.yaml +++ b/spartan/aztec-network/values/exp-1.yaml @@ -1,6 +1,9 @@ network: public: false +storage: + localSsd: true + aztec: slotDuration: 36 epochDuration: 32 @@ -15,7 +18,7 @@ images: pullPolicy: Always validator: - storageSize: "100Gi" + storageSize: "300Gi" replicas: 48 validatorKeys: - 0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 @@ -117,13 +120,15 @@ validator: - 0x3c3E2E178C69D4baD964568415a0f0c84fd6320A resources: requests: - memory: "8Gi" - cpu: "3.5" + memory: "5Gi" + cpu: "1.5" + ephemeral-storage: "275Gi" + maxOldSpaceSize: "4608" validator: disabled: false sequencer: - maxTxsPerBlock: 36 - enforceTimeTable: false + maxTxsPerBlock: 40 + enforceTimeTable: true bootNode: peerIdPrivateKey: 080212200ba8451c6d62b03c4441f0a466c0bce7a3a595f2cf50a055ded3305c77aa3af0 @@ -132,8 +137,10 @@ bootNode: disabled: true resources: requests: - memory: "8Gi" - cpu: "3.5" + memory: "5Gi" + cpu: "1.5" + ephemeral-storage: "275Gi" + maxOldSpaceSize: "4608" proverAgent: replicas: 8 @@ -141,14 +148,18 @@ proverAgent: proverBroker: resources: requests: - memory: "8Gi" + memory: "12Gi" cpu: "3.5" + ephemeral-storage: "275Gi" + maxOldSpaceSize: "11776" proverNode: resources: requests: - memory: "8Gi" + memory: "12Gi" cpu: "3.5" + ephemeral-storage: "275Gi" + maxOldSpaceSize: "11776" bot: replicas: 10 @@ -163,5 +174,5 @@ jobs: ethereum: resources: requests: - memory: "8Gi" - cpu: "3.5" + memory: "5Gi" + cpu: "1.5" diff --git a/spartan/terraform/gke-cluster/main.tf b/spartan/terraform/gke-cluster/main.tf index 6539ac5b7c7..31c841158be 100644 --- a/spartan/terraform/gke-cluster/main.tf +++ b/spartan/terraform/gke-cluster/main.tf @@ -115,6 +115,70 @@ resource "google_container_node_pool" "primary_nodes" { } } +# Create 2 core node pool with local ssd +resource "google_container_node_pool" "aztec_nodes_2core_ssd" { + name = "aztec-nodes-2core-ssd" + location = var.zone + cluster = google_container_cluster.primary.name + + # Enable autoscaling + autoscaling { + min_node_count = 1 + max_node_count = 256 + } + + # Node configuration + node_config { + machine_type = "n2d-standard-2" + ephemeral_storage_local_ssd_config { + local_ssd_count = 1 + } + + service_account = google_service_account.gke_sa.email + oauth_scopes = [ + "https://www.googleapis.com/auth/cloud-platform" + ] + + labels = { + env = "production" + + } + tags = ["aztec-gke-node", "aztec"] + } +} + +# Create 4 core node pool with local ssd +resource "google_container_node_pool" "aztec_nodes_4core_ssd" { + name = "aztec-nodes-4core-ssd" + location = var.zone + cluster = google_container_cluster.primary.name + + # Enable autoscaling + autoscaling { + min_node_count = 1 + max_node_count = 256 + } + + # Node configuration + node_config { + machine_type = "n2d-standard-4" + ephemeral_storage_local_ssd_config { + local_ssd_count = 1 + } + + service_account = google_service_account.gke_sa.email + oauth_scopes = [ + "https://www.googleapis.com/auth/cloud-platform" + ] + + labels = { + env = "production" + + } + tags = ["aztec-gke-node", "aztec"] + } +} + # Create node pool for simulated aztec nodes (validators, prover nodes, boot nodes) resource "google_container_node_pool" "aztec_nodes_simulated" { name = "aztec-node-pool-simulated" diff --git a/yarn-project/archiver/src/archiver/instrumentation.ts b/yarn-project/archiver/src/archiver/instrumentation.ts index f9b921df580..b6792ca9d17 100644 --- a/yarn-project/archiver/src/archiver/instrumentation.ts +++ b/yarn-project/archiver/src/archiver/instrumentation.ts @@ -17,7 +17,7 @@ export class ArchiverInstrumentation { public readonly tracer: Tracer; private blockHeight: Gauge; - private blockSize: Gauge; + private txCount: UpDownCounter; private syncDuration: Histogram; private l1BlocksSynced: UpDownCounter; private proofsSubmittedDelay: Histogram; @@ -35,8 +35,8 @@ export class ArchiverInstrumentation { valueType: ValueType.INT, }); - this.blockSize = meter.createGauge(Metrics.ARCHIVER_BLOCK_SIZE, { - description: 'The number of transactions in a block', + this.txCount = meter.createUpDownCounter(Metrics.ARCHIVER_TX_COUNT, { + description: 'The total number of transactions', valueType: ValueType.INT, }); @@ -95,7 +95,7 @@ export class ArchiverInstrumentation { this.blockHeight.record(Math.max(...blocks.map(b => b.number))); this.l1BlocksSynced.add(blocks.length); for (const block of blocks) { - this.blockSize.record(block.body.txEffects.length); + this.txCount.add(block.body.txEffects.length); } } diff --git a/yarn-project/aztec-node/src/aztec-node/server.test.ts b/yarn-project/aztec-node/src/aztec-node/server.test.ts index 4a417906496..7ba1860158b 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.test.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.test.ts @@ -40,8 +40,12 @@ describe('aztec node', () => { globalVariablesBuilder.getCurrentBaseFees.mockResolvedValue(new GasFees(0, 0)); merkleTreeOps = mock(); - merkleTreeOps.findLeafIndices.mockImplementation((_treeId: MerkleTreeId, _value: any[]) => { - return Promise.resolve([undefined]); + merkleTreeOps.findLeafIndices.mockImplementation((treeId: MerkleTreeId, _value: any[]) => { + if (treeId === MerkleTreeId.ARCHIVE) { + return Promise.resolve([1n]); + } else { + return Promise.resolve([undefined]); + } }); const worldState = mock({ @@ -112,9 +116,13 @@ describe('aztec node', () => { // We make a nullifier from `doubleSpendWithExistingTx` a part of the nullifier tree, so it gets rejected as double spend const doubleSpendNullifier = doubleSpendWithExistingTx.data.forRollup!.end.nullifiers[0].toBuffer(); merkleTreeOps.findLeafIndices.mockImplementation((treeId: MerkleTreeId, value: any[]) => { - return Promise.resolve( - treeId === MerkleTreeId.NULLIFIER_TREE && value[0].equals(doubleSpendNullifier) ? [1n] : [undefined], - ); + let retVal: [bigint | undefined] = [undefined]; + if (treeId === MerkleTreeId.ARCHIVE) { + retVal = [1n]; + } else if (treeId === MerkleTreeId.NULLIFIER_TREE) { + retVal = value[0].equals(doubleSpendNullifier) ? [1n] : [undefined]; + } + return Promise.resolve(retVal); }); expect(await node.isValidTx(doubleSpendWithExistingTx)).toEqual({ diff --git a/yarn-project/aztec/src/cli/cmds/start_prover_broker.ts b/yarn-project/aztec/src/cli/cmds/start_prover_broker.ts index ce5ef637ff6..503646c2bb3 100644 --- a/yarn-project/aztec/src/cli/cmds/start_prover_broker.ts +++ b/yarn-project/aztec/src/cli/cmds/start_prover_broker.ts @@ -31,7 +31,5 @@ export async function startProverBroker( services.proverBroker = [broker, ProvingJobBrokerSchema]; signalHandlers.push(() => broker.stop()); - await broker.start(); - return broker; } diff --git a/yarn-project/circuit-types/src/interfaces/epoch-prover.ts b/yarn-project/circuit-types/src/interfaces/epoch-prover.ts index e8faa020647..5c1539d8a40 100644 --- a/yarn-project/circuit-types/src/interfaces/epoch-prover.ts +++ b/yarn-project/circuit-types/src/interfaces/epoch-prover.ts @@ -28,4 +28,7 @@ export interface EpochProver extends Omit { /** Returns the block assembled at a given index (zero-based) within the epoch. */ getBlock(index: number): L2Block; + + /** Called when no longer required, cleans up internal resources */ + stop(): Promise; } diff --git a/yarn-project/circuit-types/src/interfaces/prover-broker.ts b/yarn-project/circuit-types/src/interfaces/prover-broker.ts index 546b26aa672..be1b2dbae0c 100644 --- a/yarn-project/circuit-types/src/interfaces/prover-broker.ts +++ b/yarn-project/circuit-types/src/interfaces/prover-broker.ts @@ -2,7 +2,6 @@ import { type ProofUri, type ProvingJob, type ProvingJobId, - type ProvingJobSettledResult, type ProvingJobStatus, type ProvingRequestType, } from '@aztec/circuit-types'; @@ -19,6 +18,8 @@ export const ProverBrokerConfig = z.object({ proverBrokerPollIntervalMs: z.number(), /** If starting a prover broker locally, the directory to store broker data */ proverBrokerDataDirectory: z.string().optional(), + /** The size of the data store map */ + proverBrokerDataMapSizeKB: z.number(), }); export type ProverBrokerConfig = z.infer; @@ -43,6 +44,11 @@ export const proverBrokerConfigMappings: ConfigMappingsType env: 'PROVER_BROKER_DATA_DIRECTORY', description: 'If starting a prover broker locally, the directory to store broker data', }, + proverBrokerDataMapSizeKB: { + env: 'PROVER_BROKER_DATA_MAP_SIZE_KB', + description: 'The size of the data store map', + ...numberConfigHelper(128 * 1_024 * 1_024), // Defaulted to 128 GB + }, }; /** @@ -53,7 +59,7 @@ export interface ProvingJobProducer { * Enqueues a proving job * @param job - The job to enqueue */ - enqueueProvingJob(job: ProvingJob): Promise; + enqueueProvingJob(job: ProvingJob): Promise; /** * Cancels a proving job. @@ -68,10 +74,11 @@ export interface ProvingJobProducer { getProvingJobStatus(id: ProvingJobId): Promise; /** - * Waits for the job to settle and returns to the result - * @param id - The ID of the job to get the status of + * Returns the ids of jobs that have been completed since the last call + * Also returns the set of provided job ids that are completed + * @param ids - The set of job ids to check for completion */ - waitForJobToSettle(id: ProvingJobId): Promise; + getCompletedJobs(ids: ProvingJobId[]): Promise; } export type ProvingJobFilter = { diff --git a/yarn-project/circuit-types/src/interfaces/proving-job.ts b/yarn-project/circuit-types/src/interfaces/proving-job.ts index 542ce92c39e..635fbfd6e57 100644 --- a/yarn-project/circuit-types/src/interfaces/proving-job.ts +++ b/yarn-project/circuit-types/src/interfaces/proving-job.ts @@ -290,6 +290,15 @@ export const ProvingJob = z.object({ inputsUri: ProofUri, }); +export const makeProvingJobId = (epochNumber: number, type: ProvingRequestType, inputsHash: string) => { + return `${epochNumber}:${ProvingRequestType[type]}:${inputsHash}`; +}; + +export const getEpochFromProvingJobId = (id: ProvingJobId) => { + const components = id.split(':'); + return +components[0]; +}; + export type ProvingJob = z.infer; export function makeProvingRequestResult( diff --git a/yarn-project/end-to-end/src/e2e_epochs.test.ts b/yarn-project/end-to-end/src/e2e_epochs.test.ts index 14850cb1509..b6217c6206b 100644 --- a/yarn-project/end-to-end/src/e2e_epochs.test.ts +++ b/yarn-project/end-to-end/src/e2e_epochs.test.ts @@ -79,6 +79,7 @@ describe('e2e_epochs', () => { } const newL2ProvenBlockNumber = Number(await rollup.getProvenBlockNumber()); + if (l2ProvenBlockNumber !== newL2ProvenBlockNumber) { const epochNumber = await rollup.getEpochNumber(BigInt(newL2ProvenBlockNumber)); msg += ` with proof up to L2 block ${newL2ProvenBlockNumber} for epoch ${epochNumber}`; diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 8689a5512d5..868dc8191e5 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -114,6 +114,7 @@ export type EnvVar = | 'PROVER_BROKER_POLL_INTERVAL_MS' | 'PROVER_BROKER_JOB_MAX_RETRIES' | 'PROVER_BROKER_DATA_DIRECTORY' + | 'PROVER_BROKER_DATA_MAP_SIZE_KB' | 'PROVER_COORDINATION_NODE_URL' | 'PROVER_DISABLED' | 'PROVER_ID' diff --git a/yarn-project/kv-store/package.json b/yarn-project/kv-store/package.json index ec96af1ae24..aba97045383 100644 --- a/yarn-project/kv-store/package.json +++ b/yarn-project/kv-store/package.json @@ -28,7 +28,7 @@ "@aztec/ethereum": "workspace:^", "@aztec/foundation": "workspace:^", "idb": "^8.0.0", - "lmdb": "^3.0.6" + "lmdb": "^3.2.0" }, "devDependencies": { "@aztec/circuits.js": "workspace:^", diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/block_header_validator.test.ts b/yarn-project/p2p/src/msg_validators/tx_validator/block_header_validator.test.ts new file mode 100644 index 00000000000..ec745265a12 --- /dev/null +++ b/yarn-project/p2p/src/msg_validators/tx_validator/block_header_validator.test.ts @@ -0,0 +1,40 @@ +import { type AnyTx, type TxValidationResult, mockTxForRollup } from '@aztec/circuit-types'; +import { Fr } from '@aztec/circuits.js'; + +import { type MockProxy, mock, mockFn } from 'jest-mock-extended'; + +import { type ArchiveSource, BlockHeaderTxValidator } from './block_header_validator.js'; + +describe('BlockHeaderTxValidator', () => { + let txValidator: BlockHeaderTxValidator; + let archiveSource: MockProxy; + + beforeEach(() => { + archiveSource = mock({ + getArchiveIndices: mockFn().mockImplementation(() => { + return Promise.resolve([undefined]); + }), + }); + txValidator = new BlockHeaderTxValidator(archiveSource); + }); + + it('rejects tx with invalid block header', async () => { + const badTx = mockTxForRollup(); + badTx.data.constants.historicalHeader.globalVariables.blockNumber = + badTx.data.constants.historicalHeader.globalVariables.blockNumber.add(new Fr(1)); + + const goodTx = mockTxForRollup(); + archiveSource.getArchiveIndices.mockImplementation((archives: Fr[]) => { + if (archives[0].equals(goodTx.data.constants.historicalHeader.hash())) { + return Promise.resolve([1n]); + } else { + return Promise.resolve([undefined]); + } + }); + await expect(txValidator.validateTx(goodTx)).resolves.toEqual({ result: 'valid' } satisfies TxValidationResult); + await expect(txValidator.validateTx(badTx)).resolves.toEqual({ + result: 'invalid', + reason: ['Block header not found'], + } satisfies TxValidationResult); + }); +}); diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/block_header_validator.ts b/yarn-project/p2p/src/msg_validators/tx_validator/block_header_validator.ts new file mode 100644 index 00000000000..79784be7fa8 --- /dev/null +++ b/yarn-project/p2p/src/msg_validators/tx_validator/block_header_validator.ts @@ -0,0 +1,25 @@ +import { type AnyTx, Tx, type TxValidationResult, type TxValidator } from '@aztec/circuit-types'; +import { type Fr } from '@aztec/circuits.js'; +import { createLogger } from '@aztec/foundation/log'; + +export interface ArchiveSource { + getArchiveIndices: (archives: Fr[]) => Promise<(bigint | undefined)[]>; +} + +export class BlockHeaderTxValidator implements TxValidator { + #log = createLogger('p2p:tx_validator:tx_block_header'); + #archiveSource: ArchiveSource; + + constructor(archiveSource: ArchiveSource) { + this.#archiveSource = archiveSource; + } + + async validateTx(tx: T): Promise { + const [index] = await this.#archiveSource.getArchiveIndices([tx.data.constants.historicalHeader.hash()]); + if (index === undefined) { + this.#log.warn(`Rejecting tx ${Tx.getHash(tx)} for referencing an unknown block header`); + return { result: 'invalid', reason: ['Block header not found'] }; + } + return { result: 'valid' }; + } +} diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/index.ts b/yarn-project/p2p/src/msg_validators/tx_validator/index.ts index 6937dfa460d..de02c4d7d96 100644 --- a/yarn-project/p2p/src/msg_validators/tx_validator/index.ts +++ b/yarn-project/p2p/src/msg_validators/tx_validator/index.ts @@ -3,3 +3,4 @@ export * from './data_validator.js'; export * from './double_spend_validator.js'; export * from './metadata_validator.js'; export * from './tx_proof_validator.js'; +export * from './block_header_validator.js'; diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator.ts b/yarn-project/prover-client/src/orchestrator/orchestrator.ts index 8a67ad25bfe..74b97b284eb 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator.ts @@ -134,6 +134,12 @@ export class ProvingOrchestrator implements EpochProver { this.paddingTxProof = undefined; } + public stop(): Promise { + this.cancel(); + this.reset(); + return Promise.resolve(); + } + public startNewEpoch(epochNumber: number, firstBlockNumber: number, totalNumBlocks: number) { const { promise: _promise, resolve, reject } = promiseWithResolvers(); const promise = _promise.catch((reason): ProvingResult => ({ status: 'failure', reason })); diff --git a/yarn-project/prover-client/src/prover-client/prover-client.ts b/yarn-project/prover-client/src/prover-client/prover-client.ts index 115b2a9f784..a7d7f645949 100644 --- a/yarn-project/prover-client/src/prover-client/prover-client.ts +++ b/yarn-project/prover-client/src/prover-client/prover-client.ts @@ -20,6 +20,7 @@ import { ProvingOrchestrator } from '../orchestrator/orchestrator.js'; import { BrokerCircuitProverFacade } from '../proving_broker/broker_prover_facade.js'; import { InlineProofStore } from '../proving_broker/proof_store.js'; import { ProvingAgent } from '../proving_broker/proving_agent.js'; +import { ServerEpochProver } from './server-epoch-prover.js'; /** Manages proving of epochs by orchestrating the proving of individual blocks relying on a pool of prover agents. */ export class ProverClient implements EpochProverManager { @@ -39,12 +40,9 @@ export class ProverClient implements EpochProverManager { } public createEpochProver(): EpochProver { - return new ProvingOrchestrator( - this.worldState, - new BrokerCircuitProverFacade(this.orchestratorClient), - this.telemetry, - this.config.proverId, - ); + const facade = new BrokerCircuitProverFacade(this.orchestratorClient); + const orchestrator = new ProvingOrchestrator(this.worldState, facade, this.telemetry, this.config.proverId); + return new ServerEpochProver(facade, orchestrator); } public getProverId(): Fr { diff --git a/yarn-project/prover-client/src/prover-client/server-epoch-prover.ts b/yarn-project/prover-client/src/prover-client/server-epoch-prover.ts new file mode 100644 index 00000000000..e42a5b9945b --- /dev/null +++ b/yarn-project/prover-client/src/prover-client/server-epoch-prover.ts @@ -0,0 +1,41 @@ +import { type EpochProver, type L2Block, type ProcessedTx } from '@aztec/circuit-types'; +import { type BlockHeader, type Fr, type GlobalVariables, type Proof } from '@aztec/circuits.js'; +import { type RootRollupPublicInputs } from '@aztec/circuits.js/rollup'; + +import { type ProvingOrchestrator } from '../orchestrator/orchestrator.js'; +import { type BrokerCircuitProverFacade } from '../proving_broker/broker_prover_facade.js'; + +/** Encapsulates the proving orchestrator and the broker facade */ +export class ServerEpochProver implements EpochProver { + constructor(private facade: BrokerCircuitProverFacade, private orchestrator: ProvingOrchestrator) {} + + startNewEpoch(epochNumber: number, firstBlockNumber: number, totalNumBlocks: number): void { + this.orchestrator.startNewEpoch(epochNumber, firstBlockNumber, totalNumBlocks); + this.facade.start(); + } + setBlockCompleted(blockNumber: number, expectedBlockHeader?: BlockHeader): Promise { + return this.orchestrator.setBlockCompleted(blockNumber, expectedBlockHeader); + } + finaliseEpoch(): Promise<{ publicInputs: RootRollupPublicInputs; proof: Proof }> { + return this.orchestrator.finaliseEpoch(); + } + cancel(): void { + this.orchestrator.cancel(); + } + getProverId(): Fr { + return this.orchestrator.getProverId(); + } + getBlock(index: number): L2Block { + return this.orchestrator.getBlock(index); + } + async stop(): Promise { + await this.facade.stop(); + await this.orchestrator.stop(); + } + startNewBlock(globalVariables: GlobalVariables, l1ToL2Messages: Fr[]): Promise { + return this.orchestrator.startNewBlock(globalVariables, l1ToL2Messages); + } + addTxs(txs: ProcessedTx[]): Promise { + return this.orchestrator.addTxs(txs); + } +} diff --git a/yarn-project/prover-client/src/proving_broker/broker_prover_facade.test.ts b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.test.ts index 13c09f09011..b512274ab6b 100644 --- a/yarn-project/prover-client/src/proving_broker/broker_prover_facade.test.ts +++ b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.test.ts @@ -25,6 +25,12 @@ describe('BrokerCircuitProverFacade', () => { facade = new BrokerCircuitProverFacade(broker, proofStore); await broker.start(); + facade.start(); + }); + + afterEach(async () => { + await broker.stop(); + await facade.stop(); }); it('sends jobs to the broker', async () => { @@ -55,14 +61,6 @@ describe('BrokerCircuitProverFacade', () => { promises.push(facade.getBaseParityProof(inputs, controller.signal, 42)); } - await sleep(agentPollInterval); - // the broker should have received all of them - expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(CALLS); - - // but really, it should have only enqueued just one - expect(prover.getBaseParityProof).toHaveBeenCalledTimes(1); - expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs, expect.anything(), 42); - // now we have 50 promises all waiting on the same result // resolve the proof const result = makePublicInputsAndRecursiveProof( @@ -72,18 +70,26 @@ describe('BrokerCircuitProverFacade', () => { ); resultPromise.resolve(result); + await Promise.all(promises); + + // the broker will only have been told about one of the calls + expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); + + expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs, expect.anything(), 42); + // enqueue another N requests for the same jobs for (let i = 0; i < CALLS; i++) { promises.push(facade.getBaseParityProof(inputs, controller.signal, 42)); } - await sleep(agentPollInterval); - // the broker will have received the new requests - expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(2 * CALLS); + await Promise.all(promises); + + // the broker will have received one new request + expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(2); // but no new jobs where created expect(prover.getBaseParityProof).toHaveBeenCalledTimes(1); - // and all 2 * N requests will have been resolved with the same result + // and all requests will have been resolved with the same result for (const promise of promises) { await expect(promise).resolves.toEqual(result); } @@ -106,30 +112,30 @@ describe('BrokerCircuitProverFacade', () => { } await sleep(agentPollInterval); - // the broker should have received all of them - expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(CALLS); - - // but really, it should have only enqueued just one - expect(prover.getBaseParityProof).toHaveBeenCalledTimes(1); - expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs, expect.anything(), 42); resultPromise.reject(new Error('TEST ERROR')); + await Promise.all(promises); + + // the broker should only have been called once + expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); + + expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs, expect.anything(), 42); + // enqueue another N requests for the same jobs for (let i = 0; i < CALLS; i++) { promises.push(facade.getBaseParityProof(inputs, controller.signal, 42).catch(err => ({ err }))); } - await sleep(agentPollInterval); - // the broker will have received the new requests - expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(2 * CALLS); - // but no new jobs where created - expect(prover.getBaseParityProof).toHaveBeenCalledTimes(1); - // and all 2 * N requests will have been resolved with the same result for (const promise of promises) { await expect(promise).resolves.toEqual({ err: new Error('TEST ERROR') }); } + + // the broker will have received one new request + expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(2); + // but no new jobs where created + expect(prover.getBaseParityProof).toHaveBeenCalledTimes(1); }); it('handles aborts', async () => { @@ -149,4 +155,19 @@ describe('BrokerCircuitProverFacade', () => { await expect(promise).resolves.toEqual({ err: new Error('Aborted') }); }); + + it('rejects jobs when the facade is stopped', async () => { + const inputs = makeBaseParityInputs(); + const controller = new AbortController(); + + const resultPromise = promiseWithResolvers(); + jest.spyOn(broker, 'enqueueProvingJob'); + jest.spyOn(prover, 'getBaseParityProof').mockReturnValue(resultPromise.promise); + + const promise = facade.getBaseParityProof(inputs, controller.signal, 42).catch(err => ({ err })); + + await facade.stop(); + + await expect(promise).resolves.toEqual({ err: new Error('Broker facade stopped') }); + }); }); diff --git a/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts index ae28955a202..30f265b806c 100644 --- a/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts +++ b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts @@ -4,6 +4,7 @@ import { type ProvingJobInputsMap, type ProvingJobProducer, type ProvingJobResultsMap, + type ProvingJobStatus, ProvingRequestType, type PublicInputsAndRecursiveProof, type ServerCircuitProver, @@ -36,7 +37,8 @@ import { } from '@aztec/circuits.js/rollup'; import { sha256 } from '@aztec/foundation/crypto'; import { createLogger } from '@aztec/foundation/log'; -import { retryUntil } from '@aztec/foundation/retry'; +import { RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise'; +import { SerialQueue } from '@aztec/foundation/queue'; import { truncate } from '@aztec/foundation/string'; import { InlineProofStore, type ProofStore } from './proof_store.js'; @@ -44,7 +46,27 @@ import { InlineProofStore, type ProofStore } from './proof_store.js'; // 20 minutes, roughly the length of an Aztec epoch. If a proof isn't ready in this amount of time then we've failed to prove the whole epoch const MAX_WAIT_MS = 1_200_000; +// Perform a snapshot sync every 30 seconds +const SNAPSHOT_SYNC_INTERVAL_MS = 30_000; + +const MAX_CONCURRENT_JOB_SETTLED_REQUESTS = 10; +const SNAPSHOT_SYNC_CHECK_MAX_REQUEST_SIZE = 1000; + +type ProvingJob = { + id: ProvingJobId; + type: ProvingRequestType; + promise: PromiseWithResolvers; + abortFn?: () => Promise; + signal?: AbortSignal; +}; + export class BrokerCircuitProverFacade implements ServerCircuitProver { + private jobs: Map = new Map(); + private runningPromise?: RunningPromise; + private timeOfLastSnapshotSync = Date.now(); + private queue: SerialQueue = new SerialQueue(); + private jobsToRetrieve: Set = new Set(); + constructor( private broker: ProvingJobProducer, private proofStore: ProofStore = new InlineProofStore(), @@ -53,77 +75,315 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { private log = createLogger('prover-client:broker-circuit-prover-facade'), ) {} - private async enqueueAndWaitForJob( + private enqueueJob( id: ProvingJobId, type: T, inputs: ProvingJobInputsMap[T], epochNumber = 0, signal?: AbortSignal, ): Promise { + if (!this.queue) { + throw new Error('BrokerCircuitProverFacade not started'); + } + return this.queue!.put(() => this._enqueueJob(id, type, inputs, epochNumber, signal)).then( + ({ enqueuedPromise }) => enqueuedPromise, + ); + } + + private async _enqueueJob( + id: ProvingJobId, + type: T, + inputs: ProvingJobInputsMap[T], + epochNumber = 0, + signal?: AbortSignal, + ): Promise<{ enqueuedPromise: Promise }> { + // Check if there is already a promise for this job + const existingPromise = this.jobs.get(id); + if (existingPromise) { + this.log.verbose(`Job already found in facade id=${id} type=${ProvingRequestType[type]}`, { + provingJobId: id, + provingJobType: ProvingRequestType[type], + epochNumber, + }); + return { enqueuedPromise: existingPromise.promise.promise as Promise }; + } const inputsUri = await this.proofStore.saveProofInput(id, type, inputs); - await this.broker.enqueueProvingJob({ + const jobStatus = await this.broker.enqueueProvingJob({ id, type, inputsUri, epochNumber, }); - this.log.verbose( - `Sent proving job to broker id=${id} type=${ProvingRequestType[type]} epochNumber=${epochNumber}`, - { - provingJobId: id, - provingJobType: ProvingRequestType[type], - epochNumber, - inputsUri: truncate(inputsUri), - }, - ); - - // notify broker of cancelled job + // Create a promise for this job id, regardless of whether it was enqueued at the broker + // The running promise will monitor for the job to be completed and resolve it either way + const promise = promiseWithResolvers(); const abortFn = async () => { signal?.removeEventListener('abort', abortFn); await this.broker.cancelProvingJob(id); }; + const job: ProvingJob = { + id, + type, + promise, + abortFn, + signal, + }; + this.jobs.set(id, job); + + // If we are here then the job was successfully accepted by the broker + // the returned status is for before any action was performed + if (jobStatus.status === 'not-found') { + // Job added for the first time + // notify the broker if job is aborted + signal?.addEventListener('abort', abortFn); + + this.log.verbose( + `Job enqueued with broker id=${id} type=${ProvingRequestType[type]} epochNumber=${epochNumber}`, + { + provingJobId: id, + provingJobType: ProvingRequestType[type], + epochNumber, + inputsUri: truncate(inputsUri), + numOutstandingJobs: this.jobs.size, + }, + ); + } else if (jobStatus.status === 'fulfilled' || jobStatus.status === 'rejected') { + // Job was already completed by the broker + // No need to notify the broker on aborted job + job.abortFn = undefined; + this.log.verbose( + `Job already completed when sent to broker id=${id} type=${ProvingRequestType[type]} epochNumber=${epochNumber}`, + { + provingJobId: id, + provingJobType: ProvingRequestType[type], + epochNumber, + inputsUri: truncate(inputsUri), + }, + ); - signal?.addEventListener('abort', abortFn); - - try { - // loop here until the job settles - // NOTE: this could also terminate because the job was cancelled through event listener above - const result = await retryUntil( - async () => { - try { - return await this.broker.waitForJobToSettle(id); - } catch (err) { - // waitForJobToSettle can only fail for network errors - // keep retrying until we time out - } + // Job was not enqueued. It must be completed already, add to our set of already completed jobs + this.jobsToRetrieve.add(id); + } else { + // Job was previously sent to the broker but is not completed + // notify the broker if job is aborted + signal?.addEventListener('abort', abortFn); + this.log.verbose( + `Job already in queue or in progress when sent to broker id=${id} type=${ProvingRequestType[type]} epochNumber=${epochNumber}`, + { + provingJobId: id, + provingJobType: ProvingRequestType[type], + epochNumber, + inputsUri: truncate(inputsUri), }, - `Proving job=${id} type=${ProvingRequestType[type]}`, - this.waitTimeoutMs / 1000, - this.pollIntervalMs / 1000, ); + } + const typedPromise = promise.promise as Promise; + return { enqueuedPromise: typedPromise }; + } + + public start() { + if (this.runningPromise) { + throw new Error('BrokerCircuitProverFacade already started'); + } + + this.log.verbose('Starting BrokerCircuitProverFacade'); + + this.runningPromise = new RunningPromise(() => this.monitorForCompletedJobs(), this.log, this.pollIntervalMs); + this.runningPromise.start(); + + this.queue = new SerialQueue(); + this.queue.start(); + } + + public async stop(): Promise { + if (!this.runningPromise) { + throw new Error('BrokerCircuitProverFacade not started'); + } + this.log.verbose('Stopping BrokerCircuitProverFacade'); + await this.runningPromise.stop(); + + if (this.queue) { + await this.queue.cancel(); + await this.queue.end(); + } + + // Reject any outstanding promises as stopped + for (const [_, v] of this.jobs) { + v.promise.reject(new Error('Broker facade stopped')); + } + this.jobs.clear(); + } + + private async updateCompletedJobs() { + // Here we check for completed jobs. If everything works well (there are no service restarts etc) then all we need to do + // to maintain correct job state is to check for incrementally completed jobs. i.e. call getCompletedJobs with an empty array + // However, if there are any problems then we may lose sync with the broker's actual set of completed jobs. + // In this case we need to perform a full snapshot sync. This involves sending all of our outstanding job Ids to the broker + // and have the broker report on whether they are completed or not. + // We perform an incremental sync on every call of this function with a full snapshot sync periodically. + // This should keep us in sync without over-burdening the broker with snapshot sync requests + + const getAllCompletedJobs = async (ids: ProvingJobId[]) => { + // In this function we take whatever set of snapshot ids and we ask the broker for completed job notifications + // We collect all returned notifications and return them + const allCompleted = new Set(); + try { + let numRequests = 0; + while (ids.length > 0) { + const slice = ids.splice(0, SNAPSHOT_SYNC_CHECK_MAX_REQUEST_SIZE); + const completed = await this.broker.getCompletedJobs(slice); + completed.forEach(id => allCompleted.add(id)); + ++numRequests; + } + if (numRequests === 0) { + const final = await this.broker.getCompletedJobs([]); + final.forEach(id => allCompleted.add(id)); + } + } catch (err) { + this.log.error(`Error thrown when requesting completed job notifications from the broker`, err); + } + return allCompleted; + }; + + const snapshotSyncIds = []; + const currentTime = Date.now(); + const secondsSinceLastSnapshotSync = currentTime - this.timeOfLastSnapshotSync; + if (secondsSinceLastSnapshotSync > SNAPSHOT_SYNC_INTERVAL_MS) { + this.timeOfLastSnapshotSync = currentTime; + snapshotSyncIds.push(...this.jobs.keys()); + this.log.trace(`Performing full snapshot sync of completed jobs with ${snapshotSyncIds.length} job(s)`); + } else { + this.log.trace(`Performing incremental sync of completed jobs`); + } + + // Now request the notifications from the broker + const snapshotIdsLength = snapshotSyncIds.length; + const completedJobs = await getAllCompletedJobs(snapshotSyncIds); + + // We now have an additional set of completed job notifications to add to our cached set giving us the full set of jobs that we have been told are ready + // We filter this list to what we actually need, in case for any reason it is different and store in our cache + const allJobsReady = [...completedJobs, ...this.jobsToRetrieve]; + this.jobsToRetrieve = new Set(allJobsReady.filter(id => this.jobs.has(id))); + + if (completedJobs.size > 0) { + this.log.verbose( + `Check for job completion notifications returned ${completedJobs.size} job(s), snapshot ids length: ${snapshotIdsLength}, num outstanding jobs: ${this.jobs.size}, total jobs ready: ${this.jobsToRetrieve.size}`, + ); + } else { + this.log.trace( + `Check for job completion notifications returned 0 jobs, snapshot ids length: ${snapshotIdsLength}, num outstanding jobs: ${this.jobs.size}, total jobs ready: ${this.jobsToRetrieve.size}`, + ); + } + } + private async retrieveJobsThatShouldBeReady() { + const convertJobResult = async ( + result: ProvingJobStatus, + jobType: ProvingRequestType, + ): Promise<{ + success: boolean; + reason?: string; + result?: ProvingJobResultsMap[T]; + }> => { if (result.status === 'fulfilled') { const output = await this.proofStore.getProofOutput(result.value); - if (output.type === type) { - return output.result as ProvingJobResultsMap[T]; + if (output.type === jobType) { + return { result: output.result as ProvingJobResultsMap[T], success: true }; } else { - throw new Error(`Unexpected proof type: ${output.type}. Expected: ${type}`); + return { success: false, reason: `Unexpected proof type: ${output.type}. Expected: ${jobType}` }; } + } else if (result.status === 'rejected') { + return { success: false, reason: result.reason }; } else { - throw new Error(result.reason); + throw new Error(`Unexpected proving job status ${result.status}`); } - } finally { - signal?.removeEventListener('abort', abortFn); + }; + + const processJob = async (job: ProvingJob) => { + // First retrieve the settled job from the broker + this.log.debug(`Received notification of completed job id=${job.id} type=${ProvingRequestType[job.type]}`); + let settledResult; + try { + settledResult = await this.broker.getProvingJobStatus(job.id); + } catch (err) { + // If an error occurs retrieving the job result then just log it and move on. + // We will try again on the next iteration + this.log.error( + `Error retrieving job result from broker job id=${job.id} type=${ProvingRequestType[job.type]}`, + err, + ); + return false; + } + + // Then convert the result and resolve/reject the promise + let result; + try { + result = await convertJobResult(settledResult, job.type); + } catch (err) { + // If an error occurs retrieving the job result then just log it and move on. + // We will try again on the next iteration + this.log.error(`Error processing job result job id=${job.id} type=${ProvingRequestType[job.type]}`, err); + return false; + } + + if (result.success) { + this.log.verbose(`Resolved proving job id=${job.id} type=${ProvingRequestType[job.type]}`); + job.promise.resolve(result.result); + } else { + this.log.error( + `Resolving proving job with error id=${job.id} type=${ProvingRequestType[job.type]}`, + result.reason, + ); + job.promise.reject(new Error(result.reason)); + } + + if (job.abortFn && job.signal) { + job.signal?.removeEventListener('abort', job.abortFn); + } + + // Job is now processed removed from our cache + this.jobs.delete(job.id); + this.jobsToRetrieve.delete(job.id); + return true; + }; + + const toBeRetrieved = Array.from(this.jobsToRetrieve.values()) + .map(id => this.jobs.get(id)!) + .filter(x => x !== undefined); + const totalJobsToRetrieve = toBeRetrieved.length; + let totalJobsRetrieved = 0; + while (toBeRetrieved.length > 0) { + const slice = toBeRetrieved.splice(0, MAX_CONCURRENT_JOB_SETTLED_REQUESTS); + const results = await Promise.all(slice.map(job => processJob(job!))); + totalJobsRetrieved += results.filter(x => x).length; + } + if (totalJobsToRetrieve > 0) { + this.log.verbose( + `Successfully retrieved ${totalJobsRetrieved} of ${totalJobsToRetrieve} jobs that should be ready, total ready jobs is now: ${this.jobsToRetrieve.size}`, + ); } } + private async monitorForCompletedJobs() { + // Monitoring for completed jobs involves 2 stages. + + // 1. Update our list of completed jobs. + // We poll the broker for any new job completion notifications and after filtering/deduplication add them to our cached + // list of jobs that we have been told are ready. + await this.updateCompletedJobs(); + + // 2. Retrieve the jobs that should be ready. + // We have a list of jobs that we have been told are ready, so we go ahead and ask for their results + await this.retrieveJobsThatShouldBeReady(); + } + getAvmProof( inputs: AvmCircuitInputs, signal?: AbortSignal, epochNumber?: number, ): Promise> { - return this.enqueueAndWaitForJob( + return this.enqueueJob( this.generateId(ProvingRequestType.PUBLIC_VM, inputs, epochNumber), ProvingRequestType.PUBLIC_VM, inputs, @@ -137,7 +397,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { signal?: AbortSignal, epochNumber?: number, ): Promise> { - return this.enqueueAndWaitForJob( + return this.enqueueJob( this.generateId(ProvingRequestType.BASE_PARITY, inputs, epochNumber), ProvingRequestType.BASE_PARITY, inputs, @@ -153,7 +413,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { ): Promise< PublicInputsAndRecursiveProof > { - return this.enqueueAndWaitForJob( + return this.enqueueJob( this.generateId(ProvingRequestType.BLOCK_MERGE_ROLLUP, input, epochNumber), ProvingRequestType.BLOCK_MERGE_ROLLUP, input, @@ -169,7 +429,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { ): Promise< PublicInputsAndRecursiveProof > { - return this.enqueueAndWaitForJob( + return this.enqueueJob( this.generateId(ProvingRequestType.BLOCK_ROOT_ROLLUP, input, epochNumber), ProvingRequestType.BLOCK_ROOT_ROLLUP, input, @@ -185,7 +445,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { ): Promise< PublicInputsAndRecursiveProof > { - return this.enqueueAndWaitForJob( + return this.enqueueJob( this.generateId(ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, input, epochNumber), ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, input, @@ -204,7 +464,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { typeof NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH > > { - return this.enqueueAndWaitForJob( + return this.enqueueJob( this.generateId(ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs, epochNumber), ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs, @@ -220,7 +480,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { ): Promise< PublicInputsAndRecursiveProof > { - return this.enqueueAndWaitForJob( + return this.enqueueJob( this.generateId(ProvingRequestType.MERGE_ROLLUP, input, epochNumber), ProvingRequestType.MERGE_ROLLUP, input, @@ -235,7 +495,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { ): Promise< PublicInputsAndRecursiveProof > { - return this.enqueueAndWaitForJob( + return this.enqueueJob( this.generateId(ProvingRequestType.PRIVATE_BASE_ROLLUP, baseRollupInput, epochNumber), ProvingRequestType.PRIVATE_BASE_ROLLUP, baseRollupInput, @@ -251,7 +511,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { ): Promise< PublicInputsAndRecursiveProof > { - return this.enqueueAndWaitForJob( + return this.enqueueJob( this.generateId(ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs, epochNumber), ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs, @@ -265,7 +525,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { signal?: AbortSignal, epochNumber?: number, ): Promise> { - return this.enqueueAndWaitForJob( + return this.enqueueJob( this.generateId(ProvingRequestType.ROOT_PARITY, inputs, epochNumber), ProvingRequestType.ROOT_PARITY, inputs, @@ -279,7 +539,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { signal?: AbortSignal, epochNumber?: number, ): Promise> { - return this.enqueueAndWaitForJob( + return this.enqueueJob( this.generateId(ProvingRequestType.ROOT_ROLLUP, input, epochNumber), ProvingRequestType.ROOT_ROLLUP, input, @@ -293,7 +553,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { signal?: AbortSignal, epochNumber?: number, ): Promise> { - return this.enqueueAndWaitForJob( + return this.enqueueJob( this.generateId(ProvingRequestType.TUBE_PROOF, tubeInput, epochNumber), ProvingRequestType.TUBE_PROOF, tubeInput, diff --git a/yarn-project/prover-client/src/proving_broker/factory.ts b/yarn-project/prover-client/src/proving_broker/factory.ts index 67295fb6011..9d644e561cd 100644 --- a/yarn-project/prover-client/src/proving_broker/factory.ts +++ b/yarn-project/prover-client/src/proving_broker/factory.ts @@ -1,5 +1,4 @@ import { type ProverBrokerConfig } from '@aztec/circuit-types'; -import { AztecLmdbStore } from '@aztec/kv-store/lmdb'; import { type TelemetryClient } from '@aztec/telemetry-client'; import { ProvingBroker } from './proving_broker.js'; @@ -11,7 +10,7 @@ export async function createAndStartProvingBroker( client: TelemetryClient, ): Promise { const database = config.proverBrokerDataDirectory - ? new KVBrokerDatabase(AztecLmdbStore.open(config.proverBrokerDataDirectory), client) + ? await KVBrokerDatabase.new(config, client) : new InMemoryBrokerDatabase(); const broker = new ProvingBroker(database, client, { diff --git a/yarn-project/prover-client/src/proving_broker/fixtures.ts b/yarn-project/prover-client/src/proving_broker/fixtures.ts new file mode 100644 index 00000000000..1f7c1ac0a5d --- /dev/null +++ b/yarn-project/prover-client/src/proving_broker/fixtures.ts @@ -0,0 +1,14 @@ +import { type ProofUri, type ProvingJobId, ProvingRequestType, makeProvingJobId } from '@aztec/circuit-types'; +import { randomBytes } from '@aztec/foundation/crypto'; + +export function makeRandomProvingJobId(epochNumber?: number): ProvingJobId { + return makeProvingJobId(epochNumber ?? 1, ProvingRequestType.BASE_PARITY, randomBytes(8).toString('hex')); +} + +export function makeInputsUri(): ProofUri { + return randomBytes(8).toString('hex') as ProofUri; +} + +export function makeOutputsUri(): ProofUri { + return randomBytes(8).toString('hex') as ProofUri; +} diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts index 4782cd61489..46ae3e8d4fe 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts @@ -1,28 +1,38 @@ import { type ProofUri, + type ProverBrokerConfig, type ProvingJob, type ProvingJobId, type ProvingJobStatus, ProvingRequestType, } from '@aztec/circuit-types'; -import { randomBytes } from '@aztec/foundation/crypto'; import { sleep } from '@aztec/foundation/sleep'; -import { openTmpStore } from '@aztec/kv-store/lmdb'; import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { jest } from '@jest/globals'; +import { mkdtemp } from 'fs/promises'; +import { tmpdir } from 'os'; +import { join } from 'path'; +import { makeInputsUri, makeOutputsUri, makeRandomProvingJobId } from './fixtures.js'; import { ProvingBroker } from './proving_broker.js'; import { type ProvingBrokerDatabase } from './proving_broker_database.js'; import { InMemoryBrokerDatabase } from './proving_broker_database/memory.js'; import { KVBrokerDatabase } from './proving_broker_database/persisted.js'; describe.each([ - () => ({ database: new InMemoryBrokerDatabase(), cleanup: undefined }), - () => { - const store = openTmpStore(true); - const database = new KVBrokerDatabase(store, new NoopTelemetryClient()); - const cleanup = () => store.close(); + () => Promise.resolve({ database: new InMemoryBrokerDatabase(), cleanup: undefined }), + async () => { + const directory = await mkdtemp(join(tmpdir(), 'proving-broker-test')); + const config: ProverBrokerConfig = { + proverBrokerDataMapSizeKB: 1024 * 1024 * 1024, // 1GB + proverBrokerDataDirectory: directory, + proverBrokerJobMaxRetries: 1, + proverBrokerJobTimeoutMs: 1000, + proverBrokerPollIntervalMs: 1000, + }; + const database = await KVBrokerDatabase.new(config, new NoopTelemetryClient()); + const cleanup = () => {}; return { database, cleanup }; }, ])('ProvingBroker', createDb => { @@ -35,11 +45,11 @@ describe.each([ const now = () => Date.now(); - beforeEach(() => { + beforeEach(async () => { jobTimeoutMs = 100; maxRetries = 2; brokerIntervalMs = jobTimeoutMs / 4; - ({ database, cleanup } = createDb()); + ({ database, cleanup } = await createDb()); broker = new ProvingBroker(database, new NoopTelemetryClient(), { jobTimeoutMs, @@ -64,7 +74,7 @@ describe.each([ }); it('refuses stale jobs', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, epochNumber: 42, @@ -73,7 +83,7 @@ describe.each([ }); expect(await broker.getProvingJobStatus(id)).toEqual({ status: 'in-queue' }); - const id2 = makeProvingJobId(); + const id2 = makeRandomProvingJobId(); await expect( broker.enqueueProvingJob({ id: id2, @@ -86,40 +96,74 @@ describe.each([ }); it('enqueues jobs', async () => { - const id = makeProvingJobId(); - await broker.enqueueProvingJob({ + const id = makeRandomProvingJobId(); + const enqueueStatus = await broker.enqueueProvingJob({ id, epochNumber: 1, type: ProvingRequestType.BASE_PARITY, inputsUri: makeInputsUri(), }); + expect(enqueueStatus).toEqual({ status: 'not-found' }); expect(await broker.getProvingJobStatus(id)).toEqual({ status: 'in-queue' }); - const id2 = makeProvingJobId(); - await broker.enqueueProvingJob({ + const id2 = makeRandomProvingJobId(); + const enqueueStatus2 = await broker.enqueueProvingJob({ id: id2, epochNumber: 1, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, inputsUri: makeInputsUri(), }); + expect(enqueueStatus2).toEqual({ status: 'not-found' }); expect(await broker.getProvingJobStatus(id2)).toEqual({ status: 'in-queue' }); }); it('ignores duplicate jobs', async () => { const provingJob: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 1, inputsUri: makeInputsUri(), }; - await broker.enqueueProvingJob(provingJob); - await expect(broker.enqueueProvingJob(provingJob)).resolves.toBeUndefined(); + const enqueueStatus = await broker.enqueueProvingJob(provingJob); + expect(enqueueStatus).toEqual({ status: 'not-found' }); + await expect(broker.enqueueProvingJob(provingJob)).resolves.toEqual({ status: 'in-queue' }); await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({ status: 'in-queue' }); }); + it('reports correct status when enqueuing repeat jobs', async () => { + const provingJob: ProvingJob = { + id: makeRandomProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + epochNumber: 1, + inputsUri: makeInputsUri(), + }; + + const enqueueStatus = await broker.enqueueProvingJob(provingJob); + expect(enqueueStatus).toEqual({ status: 'not-found' }); + + // start the job + const returnedJob = await broker.getProvingJob(); + expect(returnedJob?.job.id).toEqual(provingJob.id); + + // job status should be in progress + await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({ status: 'in-progress' }); + + // enqueuing the same job again should return in progress + await expect(broker.enqueueProvingJob(provingJob)).resolves.toEqual({ status: 'in-progress' }); + + // now complete the job + await broker.reportProvingJobSuccess(provingJob.id, 'Proof' as ProofUri); + + // now the status should say fulfilled + await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({ status: 'fulfilled', value: 'Proof' }); + + // enqueuing the same job again should return fulfilled + await expect(broker.enqueueProvingJob(provingJob)).resolves.toEqual({ status: 'fulfilled', value: 'Proof' }); + }); + it('throws an error in case of duplicate job IDs', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, epochNumber: 1, @@ -137,12 +181,12 @@ describe.each([ }); it('returns not-found status for non-existing jobs', async () => { - const status = await broker.getProvingJobStatus(makeProvingJobId()); + const status = await broker.getProvingJobStatus(makeRandomProvingJobId()); expect(status).toEqual({ status: 'not-found' }); }); it('cancels jobs in queue', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, epochNumber: 1, @@ -156,7 +200,7 @@ describe.each([ }); it('cancels jobs in-progress', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, epochNumber: 1, @@ -172,7 +216,7 @@ describe.each([ it('returns job result if successful', async () => { const provingJob: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 1, inputsUri: makeInputsUri(), @@ -188,7 +232,7 @@ describe.each([ it('returns job error if failed', async () => { const provingJob: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 1, inputsUri: makeInputsUri(), @@ -219,14 +263,14 @@ describe.each([ it('returns jobs in priority order', async () => { const provingJob1: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 1, inputsUri: makeInputsUri(), }; const provingJob2: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 2, inputsUri: makeInputsUri(), @@ -240,7 +284,7 @@ describe.each([ it('returns undefined if no jobs are available for the given allowList', async () => { await broker.enqueueProvingJob({ - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 1, inputsUri: makeInputsUri(), @@ -252,7 +296,7 @@ describe.each([ }); it('returns a job if it is in the allowList', async () => { - const baseParity1 = makeProvingJobId(); + const baseParity1 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: baseParity1, type: ProvingRequestType.BASE_PARITY, @@ -260,7 +304,7 @@ describe.each([ inputsUri: makeInputsUri(), }); - const baseRollup1 = makeProvingJobId(); + const baseRollup1 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: baseRollup1, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, @@ -268,7 +312,7 @@ describe.each([ inputsUri: makeInputsUri(), }); - const baseRollup2 = makeProvingJobId(); + const baseRollup2 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: baseRollup2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, @@ -276,7 +320,7 @@ describe.each([ inputsUri: makeInputsUri(), }); - const rootParity1 = makeProvingJobId(); + const rootParity1 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: rootParity1, type: ProvingRequestType.ROOT_PARITY, @@ -288,7 +332,7 @@ describe.each([ }); it('returns the most important job if it is in the allowList', async () => { - const baseParity1 = makeProvingJobId(); + const baseParity1 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: baseParity1, type: ProvingRequestType.BASE_PARITY, @@ -296,7 +340,7 @@ describe.each([ inputsUri: makeInputsUri(), }); - const baseRollup1 = makeProvingJobId(); + const baseRollup1 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: baseRollup1, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, @@ -304,7 +348,7 @@ describe.each([ inputsUri: makeInputsUri(), }); - const baseRollup2 = makeProvingJobId(); + const baseRollup2 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: baseRollup2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, @@ -312,7 +356,7 @@ describe.each([ inputsUri: makeInputsUri(), }); - const rootParity1 = makeProvingJobId(); + const rootParity1 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: rootParity1, type: ProvingRequestType.ROOT_PARITY, @@ -329,7 +373,7 @@ describe.each([ }); it('returns any job if filter is empty', async () => { - const baseParity1 = makeProvingJobId(); + const baseParity1 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: baseParity1, type: ProvingRequestType.BASE_PARITY, @@ -337,7 +381,7 @@ describe.each([ inputsUri: makeInputsUri(), }); - const baseRollup1 = makeProvingJobId(); + const baseRollup1 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: baseRollup1, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, @@ -345,7 +389,7 @@ describe.each([ inputsUri: makeInputsUri(), }); - const baseRollup2 = makeProvingJobId(); + const baseRollup2 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: baseRollup2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, @@ -353,7 +397,7 @@ describe.each([ inputsUri: makeInputsUri(), }); - const rootParity1 = makeProvingJobId(); + const rootParity1 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: rootParity1, type: ProvingRequestType.ROOT_PARITY, @@ -365,7 +409,7 @@ describe.each([ }); it('returns a new job when reporting progress if current one is cancelled', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, @@ -377,7 +421,7 @@ describe.each([ await broker.cancelProvingJob(id); await assertJobStatus(id, 'rejected'); - const id2 = makeProvingJobId(); + const id2 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: id2, type: ProvingRequestType.BASE_PARITY, @@ -392,14 +436,14 @@ describe.each([ it('returns a new job if job is already in progress elsewhere', async () => { // this test simulates the broker crashing and when it comes back online it has two agents working the same job const job1: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 1, inputsUri: makeInputsUri(), }; const job2: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 2, inputsUri: makeInputsUri(), @@ -460,14 +504,14 @@ describe.each([ it('avoids sending the same job to a new agent after a restart', async () => { // this test simulates the broker crashing and when it comes back online it has two agents working the same job const job1: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 1, inputsUri: makeInputsUri(), }; const job2: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 2, inputsUri: makeInputsUri(), @@ -515,14 +559,14 @@ describe.each([ it('avoids sending a completed job to a new agent after a restart', async () => { // this test simulates the broker crashing and when it comes back online it has two agents working the same job const job1: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 1, inputsUri: makeInputsUri(), }; const job2: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 2, inputsUri: makeInputsUri(), @@ -558,8 +602,8 @@ describe.each([ }); it('tracks job result if in progress', async () => { - const id1 = makeProvingJobId(); - const id2 = makeProvingJobId(); + const id1 = makeRandomProvingJobId(); + const id2 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, @@ -585,8 +629,8 @@ describe.each([ }); it('tracks job result even if job is in queue', async () => { - const id1 = makeProvingJobId(); - const id2 = makeProvingJobId(); + const id1 = makeRandomProvingJobId(); + const id2 = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, @@ -608,14 +652,14 @@ describe.each([ }); it('ignores reported job error if unknown job', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await assertJobStatus(id, 'not-found'); await broker.reportProvingJobError(id, 'test error'); await assertJobStatus(id, 'not-found'); }); it('ignores job result if unknown job', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await assertJobStatus(id, 'not-found'); await broker.reportProvingJobSuccess(id, makeOutputsUri()); await assertJobStatus(id, 'not-found'); @@ -632,7 +676,7 @@ describe.each([ }); it('tracks in progress jobs', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, @@ -646,7 +690,7 @@ describe.each([ }); it('re-enqueues jobs that time out', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, @@ -664,7 +708,7 @@ describe.each([ }); it('cancel stale jobs that time out', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, @@ -686,21 +730,21 @@ describe.each([ await getAndAssertNextJobId(id); await assertJobStatus(id, 'in-progress'); - // epoch has advances + // epoch has advanced await broker.enqueueProvingJob({ - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 10, inputsUri: makeInputsUri(), }); - // advance time again so job times out. This time it should be rejected + // advance time again so job times out. This time it should be not-found as it will have been removed await sleep(jobTimeoutMs + brokerIntervalMs); - await assertJobStatus(id, 'rejected'); + await assertJobStatus(id, 'not-found'); }); it('keeps the jobs in progress while it is alive', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, @@ -734,9 +778,13 @@ describe.each([ }); describe('Retries', () => { + beforeEach(async () => { + await broker.start(); + }); + it('retries jobs', async () => { const provingJob: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 1, inputsUri: makeInputsUri(), @@ -753,7 +801,7 @@ describe.each([ }); it('retries up to a maximum number of times', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, @@ -775,7 +823,7 @@ describe.each([ }); it('passing retry=false does not retry', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, @@ -793,7 +841,7 @@ describe.each([ }); it('does not retry if job is stale', async () => { - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, @@ -814,16 +862,18 @@ describe.each([ // advance the epoch height await broker.enqueueProvingJob({ - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 3, inputsUri: makeInputsUri(), }); + await sleep(brokerIntervalMs); + + // job will have been removed await broker.reportProvingJobError(id, 'test error', true); await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ - status: 'rejected', - reason: 'test error', + status: 'not-found', }); }); }); @@ -834,7 +884,7 @@ describe.each([ }); it('re-enqueues proof requests on start', async () => { - const id1 = makeProvingJobId(); + const id1 = makeRandomProvingJobId(); await database.addProvingJob({ id: id1, @@ -843,7 +893,7 @@ describe.each([ inputsUri: makeInputsUri(), }); - const id2 = makeProvingJobId(); + const id2 = makeRandomProvingJobId(); await database.addProvingJob({ id: id2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, @@ -885,7 +935,7 @@ describe.each([ }); it('restores proof results on start', async () => { - const id1 = makeProvingJobId(); + const id1 = makeRandomProvingJobId(1); await database.addProvingJob({ id: id1, @@ -894,7 +944,7 @@ describe.each([ inputsUri: makeInputsUri(), }); - const id2 = makeProvingJobId(); + const id2 = makeRandomProvingJobId(2); await database.addProvingJob({ id: id2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, @@ -919,7 +969,7 @@ describe.each([ }); it('only re-enqueues unfinished jobs', async () => { - const id1 = makeProvingJobId(); + const id1 = makeRandomProvingJobId(); await database.addProvingJob({ id: id1, @@ -929,7 +979,7 @@ describe.each([ }); await database.setProvingJobResult(id1, makeOutputsUri()); - const id2 = makeProvingJobId(); + const id2 = makeRandomProvingJobId(); await database.addProvingJob({ id: id2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, @@ -947,7 +997,7 @@ describe.each([ it('saves job when enqueued', async () => { await broker.start(); const job: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 1, inputsUri: makeInputsUri(), @@ -963,7 +1013,7 @@ describe.each([ await broker.start(); jest.spyOn(database, 'addProvingJob').mockRejectedValue(new Error('db error')); - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await expect( broker.enqueueProvingJob({ id, @@ -979,7 +1029,7 @@ describe.each([ await broker.start(); const job: ProvingJob = { - id: makeProvingJobId(), + id: makeRandomProvingJobId(), type: ProvingRequestType.BASE_PARITY, epochNumber: 1, inputsUri: makeInputsUri(), @@ -996,7 +1046,7 @@ describe.each([ it('saves result even if database fails to save', async () => { await broker.start(); jest.spyOn(database, 'setProvingJobResult').mockRejectedValue(new Error('db error')); - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, @@ -1010,7 +1060,7 @@ describe.each([ it('saves job error', async () => { await broker.start(); - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); jest.spyOn(database, 'setProvingJobError'); await broker.enqueueProvingJob({ @@ -1029,7 +1079,7 @@ describe.each([ it('saves job error even if database fails to save', async () => { await broker.start(); jest.spyOn(database, 'setProvingJobError').mockRejectedValue(new Error('db error')); - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, @@ -1042,7 +1092,7 @@ describe.each([ it('does not save job result if job is unknown', async () => { await broker.start(); - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); jest.spyOn(database, 'setProvingJobResult'); jest.spyOn(database, 'addProvingJob'); @@ -1055,7 +1105,7 @@ describe.each([ it('does not save job error if job is unknown', async () => { await broker.start(); - const id = makeProvingJobId(); + const id = makeRandomProvingJobId(); jest.spyOn(database, 'setProvingJobError'); jest.spyOn(database, 'addProvingJob'); @@ -1068,12 +1118,12 @@ describe.each([ it('cleans up old jobs periodically', async () => { await broker.start(); - jest.spyOn(database, 'deleteProvingJobAndResult'); - const id1 = 'epoch1' as ProvingJobId; // makeProvingJobId(); // epoch 1 - const id2 = 'epoch2' as ProvingJobId; //makeProvingJobId(); // 2 - const id3 = 'epoch3' as ProvingJobId; //makeProvingJobId(); // 3 - const id4 = 'epoch4' as ProvingJobId; //makeProvingJobId(); // 4 - const id5 = 'epoch5' as ProvingJobId; //makeProvingJobId(); // 4 + jest.spyOn(database, 'deleteAllProvingJobsOlderThanEpoch'); + const id1 = makeRandomProvingJobId(1); // makeProvingJobId(); // epoch 1 + const id2 = makeRandomProvingJobId(2); //makeProvingJobId(); // 2 + const id3 = makeRandomProvingJobId(3); //makeProvingJobId(); // 3 + const id4 = makeRandomProvingJobId(4); //makeProvingJobId(); // 4 + const id5 = makeRandomProvingJobId(5); //makeProvingJobId(); // 4 await sleep(10); await broker.enqueueProvingJob({ @@ -1094,8 +1144,9 @@ describe.each([ await broker.reportProvingJobSuccess(id2, '' as ProofUri); // nothing got cleaned up yet. The broker first needs to advance to the next epoch - await sleep(brokerIntervalMs); - expect(database.deleteProvingJobAndResult).not.toHaveBeenCalled(); + await sleep(brokerIntervalMs * 2); + expect(database.deleteAllProvingJobsOlderThanEpoch).toHaveBeenCalledWith(1); + expect(database.deleteAllProvingJobsOlderThanEpoch).not.toHaveBeenCalledWith(2); await sleep(10); await broker.enqueueProvingJob({ @@ -1106,9 +1157,10 @@ describe.each([ }); // we got a job for epoch 3, we can clean up jobs from epoch 1 - await sleep(brokerIntervalMs); - expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id1); - expect(database.deleteProvingJobAndResult).not.toHaveBeenCalledWith(id2); + await sleep(brokerIntervalMs * 2); + expect(database.deleteAllProvingJobsOlderThanEpoch).toHaveBeenCalledWith(1); + expect(database.deleteAllProvingJobsOlderThanEpoch).toHaveBeenCalledWith(2); + expect(database.deleteAllProvingJobsOlderThanEpoch).not.toHaveBeenCalledWith(3); await sleep(10); await broker.enqueueProvingJob({ @@ -1119,8 +1171,11 @@ describe.each([ }); // once we advance to epoch 4 we can clean up finished jobs for epoch 2 - await sleep(brokerIntervalMs); - expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id2); + await sleep(brokerIntervalMs * 2); + expect(database.deleteAllProvingJobsOlderThanEpoch).toHaveBeenCalledWith(1); + expect(database.deleteAllProvingJobsOlderThanEpoch).toHaveBeenCalledWith(2); + expect(database.deleteAllProvingJobsOlderThanEpoch).toHaveBeenCalledWith(3); + expect(database.deleteAllProvingJobsOlderThanEpoch).not.toHaveBeenCalledWith(4); await sleep(10); await broker.enqueueProvingJob({ @@ -1130,16 +1185,13 @@ describe.each([ inputsUri: '' as ProofUri, }); - // advancing to epoch 5 does not automatically clean up unfinished jobs for epoch 3 - await sleep(brokerIntervalMs); - expect(database.deleteProvingJobAndResult).not.toHaveBeenCalledWith(id3); - - await broker.cancelProvingJob(id3); // now job 3 is settled (aborted) - await sleep(brokerIntervalMs); - expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id3); // and we can clean it up - - await broker.cancelProvingJob(id4); - await broker.cancelProvingJob(id5); + // advancing to epoch 5 should clean up jobs for epoch 3 + await sleep(brokerIntervalMs * 2); + expect(database.deleteAllProvingJobsOlderThanEpoch).toHaveBeenCalledWith(1); + expect(database.deleteAllProvingJobsOlderThanEpoch).toHaveBeenCalledWith(2); + expect(database.deleteAllProvingJobsOlderThanEpoch).toHaveBeenCalledWith(3); + expect(database.deleteAllProvingJobsOlderThanEpoch).toHaveBeenCalledWith(4); + expect(database.deleteAllProvingJobsOlderThanEpoch).not.toHaveBeenCalledWith(5); }); }); @@ -1173,15 +1225,3 @@ describe.each([ ); } }); - -function makeProvingJobId(): ProvingJobId { - return randomBytes(8).toString('hex') as ProvingJobId; -} - -function makeInputsUri(): ProofUri { - return randomBytes(8).toString('hex') as ProofUri; -} - -function makeOutputsUri(): ProofUri { - return randomBytes(8).toString('hex') as ProofUri; -} diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index 7ffbb5244ed..d51277fe62c 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -9,10 +9,9 @@ import { type ProvingJobStatus, ProvingRequestType, } from '@aztec/circuit-types'; -import { asyncPool } from '@aztec/foundation/async-pool'; import { createLogger } from '@aztec/foundation/log'; import { type PromiseWithResolvers, RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise'; -import { PriorityMemoryQueue } from '@aztec/foundation/queue'; +import { PriorityMemoryQueue, SerialQueue } from '@aztec/foundation/queue'; import { Timer } from '@aztec/foundation/timer'; import { type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; @@ -89,21 +88,24 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr private instrumentation: ProvingBrokerInstrumentation; public readonly tracer: Tracer; - private maxParallelCleanUps: number; + private completedJobNotifications: ProvingJobId[] = []; /** * The broker keeps track of the highest epoch its seen. * This information is used for garbage collection: once it reaches the next epoch, it can start pruning the database of old state. - * This clean up pass is only done against _settled_ jobs. This pass will not cancel jobs that are in-progress or in-queue. - * It is a client responsibility to cancel jobs if they are no longer necessary. + * It is important that this value is initialised to zero. This ensures that we don't delete any old jobs until the current + * process instance receives a job request informing it of the actual current highest epoch * Example: - * proving epoch 11 - the broker will wipe all setlled jobs for epochs 9 and lower - * finished proving epoch 11 and got first job for epoch 12 -> the broker will wipe all setlled jobs for epochs 10 and lower - * reorged back to end of epoch 10 -> epoch 11 is skipped and epoch 12 starts -> the broker will wipe all setlled jobs for epochs 10 and lower + * proving epoch 11 - the broker will wipe all jobs for epochs 9 and lower + * finished proving epoch 11 and got first job for epoch 12 -> the broker will wipe all settled jobs for epochs 10 and lower + * reorged back to end of epoch 10 -> epoch 11 is skipped and epoch 12 starts -> the broker will wipe all settled jobs for epochs 10 and lower */ private epochHeight = 0; private maxEpochsToKeepResultsFor = 1; + private requestQueue: SerialQueue = new SerialQueue(); + private started = false; + public constructor( private database: ProvingBrokerDatabase, client: TelemetryClient, @@ -112,7 +114,6 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr timeoutIntervalMs = 10_000, maxRetries = 3, maxEpochsToKeepResultsFor = 1, - maxParallelCleanUps = 20, }: ProofRequestBrokerConfig = {}, private logger = createLogger('prover-client:proving-broker'), ) { @@ -122,7 +123,6 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr this.jobTimeoutMs = jobTimeoutMs; this.maxRetries = maxRetries; this.maxEpochsToKeepResultsFor = maxEpochsToKeepResultsFor; - this.maxParallelCleanUps = maxParallelCleanUps; } private measureQueueDepth: MonitorCallback = (type: ProvingRequestType) => { @@ -142,6 +142,11 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr }; public start(): Promise { + if (this.started) { + this.logger.info('Proving Broker already started'); + return Promise.resolve(); + } + this.logger.info('Proving Broker started'); for (const [item, result] of this.database.allProvingJobs()) { this.logger.info(`Restoring proving job id=${item.id} settled=${!!result}`, { provingJobId: item.id, @@ -161,24 +166,71 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr this.cleanupPromise.start(); + this.requestQueue.start(); + this.instrumentation.monitorQueueDepth(this.measureQueueDepth); this.instrumentation.monitorActiveJobs(this.countActiveJobs); + this.started = true; + return Promise.resolve(); } public async stop(): Promise { + if (!this.started) { + this.logger.warn('ProvingBroker not started'); + return Promise.resolve(); + } + await this.requestQueue.cancel(); await this.cleanupPromise.stop(); } - public async enqueueProvingJob(job: ProvingJob): Promise { + public enqueueProvingJob(job: ProvingJob): Promise { + return this.requestQueue.put(() => this.#enqueueProvingJob(job)); + } + + public cancelProvingJob(id: ProvingJobId): Promise { + return this.requestQueue.put(() => this.#cancelProvingJob(id)); + } + + public getProvingJobStatus(id: ProvingJobId): Promise { + return this.requestQueue.put(() => this.#getProvingJobStatus(id)); + } + + public getCompletedJobs(ids: ProvingJobId[]): Promise { + return this.requestQueue.put(() => this.#getCompletedJobs(ids)); + } + + public getProvingJob(filter?: ProvingJobFilter): Promise<{ job: ProvingJob; time: number } | undefined> { + return this.requestQueue.put(() => this.#getProvingJob(filter)); + } + + public reportProvingJobSuccess(id: ProvingJobId, value: ProofUri): Promise { + return this.requestQueue.put(() => this.#reportProvingJobSuccess(id, value)); + } + + public reportProvingJobError(id: ProvingJobId, err: string, retry = false): Promise { + return this.requestQueue.put(() => this.#reportProvingJobError(id, err, retry)); + } + + public reportProvingJobProgress( + id: ProvingJobId, + startedAt: number, + filter?: ProvingJobFilter, + ): Promise<{ job: ProvingJob; time: number } | undefined> { + return this.requestQueue.put(() => this.#reportProvingJobProgress(id, startedAt, filter)); + } + + async #enqueueProvingJob(job: ProvingJob): Promise { + // We return the job status at the start of this call + const jobStatus = await this.#getProvingJobStatus(job.id); if (this.jobsCache.has(job.id)) { const existing = this.jobsCache.get(job.id); assert.deepStrictEqual(job, existing, 'Duplicate proving job ID'); this.logger.debug(`Duplicate proving job id=${job.id} epochNumber=${job.epochNumber}. Ignoring`, { provingJobId: job.id, }); - return; + return jobStatus; } if (this.isJobStale(job)) { @@ -199,18 +251,10 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr this.jobsCache.delete(job.id); throw err; } + return jobStatus; } - public waitForJobToSettle(id: ProvingJobId): Promise { - const promiseWithResolvers = this.promises.get(id); - if (!promiseWithResolvers) { - this.logger.warn(`Job id=${id} not found`, { provingJobId: id }); - return Promise.resolve({ status: 'rejected', reason: `Job ${id} not found` }); - } - return promiseWithResolvers.promise; - } - - public async cancelProvingJob(id: ProvingJobId): Promise { + async #cancelProvingJob(id: ProvingJobId): Promise { if (!this.jobsCache.has(id)) { this.logger.warn(`Can't cancel a job that doesn't exist id=${id}`, { provingJobId: id }); return; @@ -219,31 +263,21 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr // notify listeners of the cancellation if (!this.resultsCache.has(id)) { this.logger.info(`Cancelling job id=${id}`, { provingJobId: id }); - await this.reportProvingJobError(id, 'Aborted', false); + await this.#reportProvingJobError(id, 'Aborted', false); } } - private async cleanUpProvingJobState(id: ProvingJobId): Promise { - if (!this.jobsCache.has(id)) { - this.logger.warn(`Can't clean up a job that doesn't exist id=${id}`, { provingJobId: id }); - return; - } - - if (!this.resultsCache.has(id)) { - this.logger.warn(`Can't cleanup busy proving job: id=${id}`, { provingJobId: id }); - return; + private cleanUpProvingJobState(ids: ProvingJobId[]) { + for (const id of ids) { + this.jobsCache.delete(id); + this.promises.delete(id); + this.resultsCache.delete(id); + this.inProgress.delete(id); + this.retries.delete(id); } - - this.logger.debug(`Cleaning up state for job id=${id}`, { provingJobId: id }); - await this.database.deleteProvingJobAndResult(id); - this.jobsCache.delete(id); - this.promises.delete(id); - this.resultsCache.delete(id); - this.inProgress.delete(id); - this.retries.delete(id); } - public getProvingJobStatus(id: ProvingJobId): Promise { + #getProvingJobStatus(id: ProvingJobId): Promise { const result = this.resultsCache.get(id); if (result) { return Promise.resolve(result); @@ -252,7 +286,6 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr const item = this.jobsCache.get(id); if (!item) { - this.logger.warn(`Proving job id=${id} not found`, { provingJobId: id }); return Promise.resolve({ status: 'not-found' }); } @@ -260,8 +293,15 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr } } + #getCompletedJobs(ids: ProvingJobId[]): Promise { + const completedJobs = ids.filter(id => this.resultsCache.has(id)); + const notifications = this.completedJobNotifications; + this.completedJobNotifications = []; + return Promise.resolve(notifications.concat(completedJobs)); + } + // eslint-disable-next-line require-await - async getProvingJob( + async #getProvingJob( filter: ProvingJobFilter = { allowList: [] }, ): Promise<{ job: ProvingJob; time: number } | undefined> { const allowedProofs: ProvingRequestType[] = @@ -299,7 +339,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr return undefined; } - async reportProvingJobError(id: ProvingJobId, err: string, retry = false): Promise { + async #reportProvingJobError(id: ProvingJobId, err: string, retry = false): Promise { const info = this.inProgress.get(id); const item = this.jobsCache.get(id); const retries = this.retries.get(id) ?? 0; @@ -351,6 +391,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr const result: ProvingJobSettledResult = { status: 'rejected', reason: String(err) }; this.resultsCache.set(id, result); this.promises.get(id)!.resolve(result); + this.completedJobNotifications.push(id); this.instrumentation.incRejectedJobs(item.type); if (info) { @@ -369,7 +410,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr } } - reportProvingJobProgress( + #reportProvingJobProgress( id: ProvingJobId, startedAt: number, filter?: ProvingJobFilter, @@ -377,12 +418,12 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr const job = this.jobsCache.get(id); if (!job) { this.logger.warn(`Proving job id=${id} does not exist`, { provingJobId: id }); - return filter ? this.getProvingJob(filter) : Promise.resolve(undefined); + return filter ? this.#getProvingJob(filter) : Promise.resolve(undefined); } if (this.resultsCache.has(id)) { this.logger.warn(`Proving job id=${id} has already been completed`, { provingJobId: id }); - return filter ? this.getProvingJob(filter) : Promise.resolve(undefined); + return filter ? this.#getProvingJob(filter) : Promise.resolve(undefined); } const metadata = this.inProgress.get(id); @@ -420,13 +461,13 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr } already being worked on by another agent. Sending new one`, { provingJobId: id }, ); - return this.getProvingJob(filter); + return this.#getProvingJob(filter); } else { return Promise.resolve(undefined); } } - async reportProvingJobSuccess(id: ProvingJobId, value: ProofUri): Promise { + async #reportProvingJobSuccess(id: ProvingJobId, value: ProofUri): Promise { const info = this.inProgress.get(id); const item = this.jobsCache.get(id); const retries = this.retries.get(id) ?? 0; @@ -459,6 +500,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr const result: ProvingJobSettledResult = { status: 'fulfilled', value }; this.resultsCache.set(id, result); this.promises.get(id)!.resolve(result); + this.completedJobNotifications.push(id); this.instrumentation.incResolvedJobs(item.type); if (info) { @@ -479,30 +521,32 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr @trackSpan('ProvingBroker.cleanupPass') private async cleanupPass() { - await this.cleanupStaleJobs(); - await this.reEnqueueExpiredJobs(); + this.cleanupStaleJobs(); + this.reEnqueueExpiredJobs(); + const oldestEpochToKeep = this.oldestEpochToKeep(); + if (oldestEpochToKeep > 0) { + await this.requestQueue.put(() => this.database.deleteAllProvingJobsOlderThanEpoch(oldestEpochToKeep)); + this.logger.trace(`Deleted all epochs older than ${oldestEpochToKeep}`); + } } - private async cleanupStaleJobs() { + private cleanupStaleJobs() { const jobIds = Array.from(this.jobsCache.keys()); const jobsToClean: ProvingJobId[] = []; for (const id of jobIds) { const job = this.jobsCache.get(id)!; - const isComplete = this.resultsCache.has(id); - if (isComplete && this.isJobStale(job)) { + if (this.isJobStale(job)) { jobsToClean.push(id); } } if (jobsToClean.length > 0) { - this.logger.info(`Cleaning up jobs=${jobsToClean.length}`); - await asyncPool(this.maxParallelCleanUps, jobsToClean, async jobId => { - await this.cleanUpProvingJobState(jobId); - }); + this.cleanUpProvingJobState(jobsToClean); + this.logger.info(`Cleaned up jobs=${jobsToClean.length}`); } } - private async reEnqueueExpiredJobs() { + private reEnqueueExpiredJobs() { const inProgressEntries = Array.from(this.inProgress.entries()); for (const [id, metadata] of inProgressEntries) { const item = this.jobsCache.get(id); @@ -515,15 +559,10 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr const now = this.msTimeSource(); const msSinceLastUpdate = now - metadata.lastUpdatedAt; if (msSinceLastUpdate >= this.jobTimeoutMs) { - if (this.isJobStale(item)) { - // the job has timed out and it's also old, just cancel and move on - await this.cancelProvingJob(item.id); - } else { - this.logger.warn(`Proving job id=${id} timed out. Adding it back to the queue.`, { provingJobId: id }); - this.inProgress.delete(id); - this.enqueueJobInternal(item); - this.instrumentation.incTimedOutJobs(item.type); - } + this.logger.warn(`Proving job id=${id} timed out. Adding it back to the queue.`, { provingJobId: id }); + this.inProgress.delete(id); + this.enqueueJobInternal(item); + this.instrumentation.incTimedOutJobs(item.type); } } } @@ -541,7 +580,11 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr } private isJobStale(job: ProvingJob) { - return job.epochNumber < this.epochHeight - this.maxEpochsToKeepResultsFor; + return job.epochNumber < this.oldestEpochToKeep(); + } + + private oldestEpochToKeep() { + return this.epochHeight - this.maxEpochsToKeepResultsFor; } } diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker_database.ts b/yarn-project/prover-client/src/proving_broker/proving_broker_database.ts index b5adf91cb89..a713431390a 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker_database.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker_database.ts @@ -11,10 +11,10 @@ export interface ProvingBrokerDatabase { addProvingJob(request: ProvingJob): Promise; /** - * Removes a proof request from the backend - * @param id - The ID of the proof request to remove + * Deletes all proving jobs belonging to epochs older than the given epoch + * @param epochNumber - The epoch number beyond which jobs should be deleted */ - deleteProvingJobAndResult(id: ProvingJobId): Promise; + deleteAllProvingJobsOlderThanEpoch(epochNumber: number): Promise; /** * Returns an iterator over all saved proving jobs @@ -36,4 +36,9 @@ export interface ProvingBrokerDatabase { * @param err - The error that occurred while processing the proof request */ setProvingJobError(id: ProvingJobId, err: string): Promise; + + /** + * Closes the database + */ + close(): Promise; } diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker_database/broker_persisted_database.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker_database/broker_persisted_database.test.ts new file mode 100644 index 00000000000..afed1c71955 --- /dev/null +++ b/yarn-project/prover-client/src/proving_broker/proving_broker_database/broker_persisted_database.test.ts @@ -0,0 +1,286 @@ +import { + type ProofUri, + type ProverBrokerConfig, + type ProvingJob, + type ProvingJobSettledResult, + ProvingRequestType, +} from '@aztec/circuit-types'; +import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; + +import { existsSync } from 'fs'; +import { mkdir, mkdtemp, rm } from 'fs/promises'; +import { tmpdir } from 'os'; +import { join } from 'path'; + +import { makeInputsUri, makeRandomProvingJobId } from '../fixtures.js'; +import { KVBrokerDatabase } from './persisted.js'; + +describe('ProvingBrokerPersistedDatabase', () => { + let db: KVBrokerDatabase; + let directory: string; + let config: ProverBrokerConfig; + + beforeEach(async () => { + directory = await mkdtemp(join(tmpdir(), 'proving-broker-database-test')); + config = { + proverBrokerDataMapSizeKB: 1024 * 1024 * 1024, // 1GB + proverBrokerDataDirectory: directory, + proverBrokerJobMaxRetries: 1, + proverBrokerJobTimeoutMs: 1000, + proverBrokerPollIntervalMs: 1000, + }; + db = await KVBrokerDatabase.new(config, new NoopTelemetryClient()); + }); + + afterEach(async () => { + await rm(directory, { recursive: true, force: true }); + }); + + it('can add a proving job', async () => { + const id = makeRandomProvingJobId(42); + await expect( + db.addProvingJob({ + id, + epochNumber: 42, + type: ProvingRequestType.BASE_PARITY, + inputsUri: makeInputsUri(), + }), + ).resolves.not.toThrow(); + }); + + it('can add multiple proving jobs', async () => { + const numJobs = 5; + for (let i = 0; i < numJobs; i++) { + const id = makeRandomProvingJobId(42); + await expect( + db.addProvingJob({ + id, + epochNumber: 42, + type: ProvingRequestType.BASE_PARITY, + inputsUri: makeInputsUri(), + }), + ).resolves.not.toThrow(); + } + }); + + it('can add a proving success', async () => { + // need to add the epoch via a new job + const id = makeRandomProvingJobId(42); + await db.addProvingJob({ + id, + epochNumber: 42, + type: ProvingRequestType.BASE_PARITY, + inputsUri: makeInputsUri(), + }); + await expect(db.setProvingJobResult(id, 'Proof' as ProofUri)).resolves.not.toThrow(); + }); + + it('can add multiple proving successes', async () => { + // need to add the epoch via a new job + const id = makeRandomProvingJobId(42); + await db.addProvingJob({ + id, + epochNumber: 42, + type: ProvingRequestType.BASE_PARITY, + inputsUri: makeInputsUri(), + }); + + const numJobs = 5; + for (let i = 0; i < numJobs; i++) { + const id = makeRandomProvingJobId(42); + await expect(db.setProvingJobResult(id, 'Proof' as ProofUri)).resolves.not.toThrow(); + } + }); + + it('can add a proving error', async () => { + // need to add the epoch via a new job + const id = makeRandomProvingJobId(42); + await db.addProvingJob({ + id, + epochNumber: 42, + type: ProvingRequestType.BASE_PARITY, + inputsUri: makeInputsUri(), + }); + + await expect(db.setProvingJobError(id, 'Proof Failed')).resolves.not.toThrow(); + }); + + it('can add multiple proving errors', async () => { + // need to add the epoch via a new job + const id = makeRandomProvingJobId(42); + await db.addProvingJob({ + id, + epochNumber: 42, + type: ProvingRequestType.BASE_PARITY, + inputsUri: makeInputsUri(), + }); + + const numJobs = 5; + for (let i = 0; i < numJobs; i++) { + const id = makeRandomProvingJobId(42); + await expect(db.setProvingJobError(id, 'Proof Failed')).resolves.not.toThrow(); + } + }); + + it('can add items over multiple epochs', async () => { + const numJobs = 5; + const startEpoch = 12; + for (let i = 0; i < numJobs; i++) { + const id = makeRandomProvingJobId(startEpoch + i); + await expect( + db.addProvingJob({ + id, + epochNumber: startEpoch + i, + type: ProvingRequestType.BASE_PARITY, + inputsUri: makeInputsUri(), + }), + ).resolves.not.toThrow(); + await expect(db.setProvingJobResult(id, 'Proof' as ProofUri)).resolves.not.toThrow(); + await expect(db.setProvingJobError(id, 'Proof Failed')).resolves.not.toThrow(); + } + }); + + it('can retrieve items', async () => { + const numJobs = 10; + const startEpoch = 12; + const expectedJobs: [ProvingJob, ProvingJobSettledResult | undefined][] = []; + for (let i = 0; i < numJobs; i++) { + const id = makeRandomProvingJobId(startEpoch + i); + const job: ProvingJob = { + id, + epochNumber: startEpoch + i, + type: ProvingRequestType.BASE_PARITY, + inputsUri: makeInputsUri(), + }; + await db.addProvingJob(job); + if (i == startEpoch + 2) { + expectedJobs.push([job, undefined]); + } else if (i % 2) { + await db.setProvingJobResult(id, `Proof ${id}` as ProofUri); + const result: ProvingJobSettledResult = { status: 'fulfilled', value: `Proof ${id}` as ProofUri }; + expectedJobs.push([job, result]); + } else { + await db.setProvingJobError(id, `Proof failed ${id}`); + const result: ProvingJobSettledResult = { status: 'rejected', reason: `Proof failed ${id}` }; + expectedJobs.push([job, result]); + } + } + const allJobs = Array.from(db.allProvingJobs()); + expect(allJobs.length).toBe(numJobs); + expectArrayEquivalence(expectedJobs, allJobs); + }); + + it('creates subdirectories for each epoch', async () => { + const numJobs = 10; + const startEpoch = 12; + const epochs = []; + for (let i = 0; i < numJobs; i++) { + const id = makeRandomProvingJobId(startEpoch + i); + await db.addProvingJob({ + id, + epochNumber: startEpoch + i, + type: ProvingRequestType.BASE_PARITY, + inputsUri: makeInputsUri(), + }); + epochs.push(startEpoch + i); + expectSubdirectoriesExist(directory, epochs, true); + } + }); + + it('deletes all epochs before given value', async () => { + const numJobs = 10; + const startEpoch = 12; + const expectedJobs: [ProvingJob, ProvingJobSettledResult | undefined][] = []; + for (let i = 0; i < numJobs; i++) { + const id = makeRandomProvingJobId(startEpoch + i); + const job: ProvingJob = { + id, + epochNumber: startEpoch + i, + type: ProvingRequestType.BASE_PARITY, + inputsUri: makeInputsUri(), + }; + await db.addProvingJob(job); + if (i == startEpoch + 2) { + expectedJobs.push([job, undefined]); + } else if (i % 2) { + await db.setProvingJobResult(id, `Proof ${id}` as ProofUri); + const result: ProvingJobSettledResult = { status: 'fulfilled', value: `Proof ${id}` as ProofUri }; + expectedJobs.push([job, result]); + } else { + await db.setProvingJobError(id, `Proof failed ${id}`); + const result: ProvingJobSettledResult = { status: 'rejected', reason: `Proof failed ${id}` }; + expectedJobs.push([job, result]); + } + } + const epochNumbers = expectedJobs.map(x => x[0].epochNumber); + expectSubdirectoriesExist(directory, epochNumbers, true); + const expectedJobsAfterEpoch14 = expectedJobs.filter(x => x[0].epochNumber > 14); + await db.deleteAllProvingJobsOlderThanEpoch(15); + const allJobs = Array.from(db.allProvingJobs()); + expect(allJobs.length).toBe(expectedJobsAfterEpoch14.length); + expectArrayEquivalence(expectedJobsAfterEpoch14, allJobs); + + expectSubdirectoriesExist( + directory, + epochNumbers.filter(x => x > 14), + true, + ); + expectSubdirectoriesExist( + directory, + epochNumbers.filter(x => x <= 14), + false, + ); + }); + + it('restores all persisted data', async () => { + const numJobs = 10; + const startEpoch = 12; + const expectedJobs: [ProvingJob, ProvingJobSettledResult | undefined][] = []; + for (let i = 0; i < numJobs; i++) { + const id = makeRandomProvingJobId(startEpoch + i); + const job: ProvingJob = { + id, + epochNumber: startEpoch + i, + type: ProvingRequestType.BASE_PARITY, + inputsUri: makeInputsUri(), + }; + await db.addProvingJob(job); + if (i == startEpoch + 2) { + expectedJobs.push([job, undefined]); + } else if (i % 2) { + await db.setProvingJobResult(id, `Proof ${id}` as ProofUri); + const result: ProvingJobSettledResult = { status: 'fulfilled', value: `Proof ${id}` as ProofUri }; + expectedJobs.push([job, result]); + } else { + await db.setProvingJobError(id, `Proof failed ${id}`); + const result: ProvingJobSettledResult = { status: 'rejected', reason: `Proof failed ${id}` }; + expectedJobs.push([job, result]); + } + } + await db.close(); + + // Create a non epoch directory to ensure it gets ignored + const garbageDirectory = join(directory, 'NotAnEpoch'); + await mkdir(garbageDirectory, { recursive: true }); + + // Now create another instance + const secondDb = await KVBrokerDatabase.new(config, new NoopTelemetryClient()); + + // All data should be restored + const allJobs = Array.from(secondDb.allProvingJobs()); + expect(allJobs.length).toBe(numJobs); + expectArrayEquivalence(expectedJobs, allJobs); + }); +}); + +const expectArrayEquivalence = (actual: T[], expected: T[]) => { + expect(actual).toEqual(expect.arrayContaining(expected)); + expect(expected).toEqual(expect.arrayContaining(actual)); +}; + +const expectSubdirectoriesExist = (parent: string, epochNumbers: number[], success: boolean) => { + for (const epoch of epochNumbers) { + const directory = join(parent, epoch.toString()); + expect(existsSync(directory)).toBe(success); + } +}; diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker_database/memory.ts b/yarn-project/prover-client/src/proving_broker/proving_broker_database/memory.ts index 0a737aadd43..1a20ff8757d 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker_database/memory.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker_database/memory.ts @@ -1,4 +1,10 @@ -import type { ProofUri, ProvingJob, ProvingJobId, ProvingJobSettledResult } from '@aztec/circuit-types'; +import { + type ProofUri, + type ProvingJob, + type ProvingJobId, + type ProvingJobSettledResult, + getEpochFromProvingJobId, +} from '@aztec/circuit-types'; import { type ProvingBrokerDatabase } from '../proving_broker_database.js'; @@ -29,15 +35,29 @@ export class InMemoryBrokerDatabase implements ProvingBrokerDatabase { return Promise.resolve(); } - deleteProvingJobAndResult(id: ProvingJobId): Promise { - this.jobs.delete(id); - this.results.delete(id); + deleteProvingJobs(ids: ProvingJobId[]): Promise { + for (const id of ids) { + this.jobs.delete(id); + this.results.delete(id); + } return Promise.resolve(); } + deleteAllProvingJobsOlderThanEpoch(epochNumber: number): Promise { + const toDelete = [ + ...Array.from(this.jobs.keys()).filter(x => getEpochFromProvingJobId(x) < epochNumber), + ...Array.from(this.results.keys()).filter(x => getEpochFromProvingJobId(x) < epochNumber), + ]; + return this.deleteProvingJobs(toDelete); + } + *allProvingJobs(): Iterable<[ProvingJob, ProvingJobSettledResult | undefined]> { for (const item of this.jobs.values()) { yield [item, this.results.get(item.id)] as const; } } + + close(): Promise { + return Promise.resolve(); + } } diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts b/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts index 660c003b407..156a00b878d 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts @@ -1,27 +1,35 @@ -import { type ProofUri, ProvingJob, type ProvingJobId, ProvingJobSettledResult } from '@aztec/circuit-types'; +import { + type ProofUri, + type ProverBrokerConfig, + ProvingJob, + type ProvingJobId, + ProvingJobSettledResult, + getEpochFromProvingJobId, +} from '@aztec/circuit-types'; import { jsonParseWithSchema, jsonStringify } from '@aztec/foundation/json-rpc'; -import { type AztecKVStore, type AztecMap } from '@aztec/kv-store'; +import { type Logger, createLogger } from '@aztec/foundation/log'; +import { type AztecMap } from '@aztec/kv-store'; +import { AztecLmdbStore } from '@aztec/kv-store/lmdb'; import { Attributes, LmdbMetrics, type TelemetryClient } from '@aztec/telemetry-client'; +import { mkdir, readdir } from 'fs/promises'; +import { join } from 'path'; + import { type ProvingBrokerDatabase } from '../proving_broker_database.js'; -export class KVBrokerDatabase implements ProvingBrokerDatabase { +class SingleEpochDatabase { private jobs: AztecMap; private jobResults: AztecMap; - private metrics: LmdbMetrics; - constructor(private store: AztecKVStore, client: TelemetryClient) { - this.metrics = new LmdbMetrics( - client.getMeter('KVBrokerDatabase'), - { - [Attributes.DB_DATA_TYPE]: 'prover-broker', - }, - () => store.estimateSize(), - ); + constructor(public readonly store: AztecLmdbStore) { this.jobs = store.openMap('proving_jobs'); this.jobResults = store.openMap('proving_job_results'); } + estimateSize() { + return this.store.estimateSize(); + } + async addProvingJob(job: ProvingJob): Promise { await this.jobs.set(job.id, jsonStringify(job)); } @@ -35,13 +43,6 @@ export class KVBrokerDatabase implements ProvingBrokerDatabase { } } - deleteProvingJobAndResult(id: ProvingJobId): Promise { - return this.store.transaction(() => { - void this.jobs.delete(id); - void this.jobResults.delete(id); - }); - } - async setProvingJobError(id: ProvingJobId, reason: string): Promise { const result: ProvingJobSettledResult = { status: 'rejected', reason }; await this.jobResults.set(id, jsonStringify(result)); @@ -51,4 +52,125 @@ export class KVBrokerDatabase implements ProvingBrokerDatabase { const result: ProvingJobSettledResult = { status: 'fulfilled', value }; await this.jobResults.set(id, jsonStringify(result)); } + + delete() { + return this.store.delete(); + } + + close() { + return this.store.close(); + } +} + +export class KVBrokerDatabase implements ProvingBrokerDatabase { + private metrics: LmdbMetrics; + + private constructor( + private epochs: Map, + private config: ProverBrokerConfig, + client: TelemetryClient, + private logger: Logger, + ) { + this.metrics = new LmdbMetrics( + client.getMeter('KVBrokerDatabase'), + { + [Attributes.DB_DATA_TYPE]: 'prover-broker', + }, + () => this.estimateSize(), + ); + } + + private estimateSize() { + const sizes = Array.from(this.epochs.values()).map(x => x.estimateSize()); + return { + mappingSize: this.config.proverBrokerDataMapSizeKB, + numItems: sizes.reduce((prev, curr) => prev + curr.numItems, 0), + actualSize: sizes.reduce((prev, curr) => prev + curr.actualSize, 0), + }; + } + + public static async new( + config: ProverBrokerConfig, + client: TelemetryClient, + logger = createLogger('prover-client:proving-broker-database'), + ) { + const epochs: Map = new Map(); + const files = await readdir(config.proverBrokerDataDirectory!, { recursive: false, withFileTypes: true }); + for (const file of files) { + if (!file.isDirectory()) { + continue; + } + const fullDirectory = join(config.proverBrokerDataDirectory!, file.name); + const epochDirectory = file.name; + const epochNumber = parseInt(epochDirectory, 10); + if (!Number.isSafeInteger(epochNumber) || epochNumber < 0) { + logger.warn(`Found invalid epoch directory ${fullDirectory} when loading epoch databases, ignoring`); + continue; + } + logger.info( + `Loading broker database for epoch ${epochNumber} from ${fullDirectory} with map size ${config.proverBrokerDataMapSizeKB}KB`, + ); + const db = AztecLmdbStore.open(fullDirectory, config.proverBrokerDataMapSizeKB, false); + const epochDb = new SingleEpochDatabase(db); + epochs.set(epochNumber, epochDb); + } + return new KVBrokerDatabase(epochs, config, client, logger); + } + + async close(): Promise { + for (const [_, v] of this.epochs) { + await v.close(); + } + } + + async deleteAllProvingJobsOlderThanEpoch(epochNumber: number): Promise { + const oldEpochs = Array.from(this.epochs.keys()).filter(e => e < epochNumber); + for (const old of oldEpochs) { + const db = this.epochs.get(old); + if (!db) { + continue; + } + this.logger.info(`Deleting broker database for epoch ${old}`); + await db.delete(); + this.epochs.delete(old); + } + } + + async addProvingJob(job: ProvingJob): Promise { + let epochDb = this.epochs.get(job.epochNumber); + if (!epochDb) { + const newEpochDirectory = join(this.config.proverBrokerDataDirectory!, job.epochNumber.toString()); + await mkdir(newEpochDirectory, { recursive: true }); + this.logger.info( + `Creating broker database for epoch ${job.epochNumber} at ${newEpochDirectory} with map size ${this.config.proverBrokerDataMapSizeKB}`, + ); + const db = AztecLmdbStore.open(newEpochDirectory, this.config.proverBrokerDataMapSizeKB, false); + epochDb = new SingleEpochDatabase(db); + this.epochs.set(job.epochNumber, epochDb); + } + await epochDb.addProvingJob(job); + } + + *allProvingJobs(): Iterable<[ProvingJob, ProvingJobSettledResult | undefined]> { + const iterators = Array.from(this.epochs.values()).map(x => x.allProvingJobs()); + for (const it of iterators) { + yield* it; + } + } + + async setProvingJobError(id: ProvingJobId, reason: string): Promise { + const epochDb = this.epochs.get(getEpochFromProvingJobId(id)); + if (!epochDb) { + return; + } + await epochDb.setProvingJobError(id, reason); + } + + async setProvingJobResult(id: ProvingJobId, value: ProofUri): Promise { + const epochDb = this.epochs.get(getEpochFromProvingJobId(id)); + if (!epochDb) { + return; + } + await epochDb.setProvingJobResult(id, value); + } } diff --git a/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts b/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts index 0b08e0ad69c..cde1e4bcc59 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts @@ -42,13 +42,13 @@ export class ProvingJobController { } this.status = ProvingJobControllerStatus.PROVING; + this.promise = this.generateProof() .then( result => { if (this.status === ProvingJobControllerStatus.ABORTED) { return; } - this.status = ProvingJobControllerStatus.DONE; return this.onComplete(this.jobId, this.inputs.type, undefined, result); }, diff --git a/yarn-project/prover-client/src/proving_broker/rpc.ts b/yarn-project/prover-client/src/proving_broker/rpc.ts index 3dec6bedbdb..cabc716c988 100644 --- a/yarn-project/prover-client/src/proving_broker/rpc.ts +++ b/yarn-project/prover-client/src/proving_broker/rpc.ts @@ -6,7 +6,6 @@ import { type ProvingJobConsumer, ProvingJobId, type ProvingJobProducer, - ProvingJobSettledResult, ProvingJobStatus, ProvingRequestType, } from '@aztec/circuit-types'; @@ -26,10 +25,10 @@ const GetProvingJobResponse = z.object({ }); export const ProvingJobProducerSchema: ApiSchemaFor = { - enqueueProvingJob: z.function().args(ProvingJob).returns(z.void()), + enqueueProvingJob: z.function().args(ProvingJob).returns(ProvingJobStatus), getProvingJobStatus: z.function().args(ProvingJobId).returns(ProvingJobStatus), cancelProvingJob: z.function().args(ProvingJobId).returns(z.void()), - waitForJobToSettle: z.function().args(ProvingJobId).returns(ProvingJobSettledResult), + getCompletedJobs: z.function().args(z.array(ProvingJobId)).returns(z.array(ProvingJobId)), }; export const ProvingJobConsumerSchema: ApiSchemaFor = { diff --git a/yarn-project/prover-client/src/test/mock_prover.ts b/yarn-project/prover-client/src/test/mock_prover.ts index de87b6ef58c..0a8296a68b9 100644 --- a/yarn-project/prover-client/src/test/mock_prover.ts +++ b/yarn-project/prover-client/src/test/mock_prover.ts @@ -3,7 +3,6 @@ import { type ProvingJob, type ProvingJobId, type ProvingJobProducer, - type ProvingJobSettledResult, type ProvingJobStatus, type PublicInputsAndRecursiveProof, type ServerCircuitProver, @@ -83,7 +82,7 @@ export class TestBroker implements ProvingJobProducer { return this.proofStore; } - enqueueProvingJob(job: ProvingJob): Promise { + enqueueProvingJob(job: ProvingJob): Promise { return this.broker.enqueueProvingJob(job); } getProvingJobStatus(id: ProvingJobId): Promise { @@ -92,8 +91,9 @@ export class TestBroker implements ProvingJobProducer { cancelProvingJob(id: string): Promise { return this.broker.cancelProvingJob(id); } - waitForJobToSettle(id: ProvingJobId): Promise { - return this.broker.waitForJobToSettle(id); + + getCompletedJobs(ids: ProvingJobId[]): Promise { + return this.broker.getCompletedJobs(ids); } } diff --git a/yarn-project/prover-node/src/job/epoch-proving-job.ts b/yarn-project/prover-node/src/job/epoch-proving-job.ts index 1f22fe0477c..0e26ece0491 100644 --- a/yarn-project/prover-node/src/job/epoch-proving-job.ts +++ b/yarn-project/prover-node/src/job/epoch-proving-job.ts @@ -131,6 +131,8 @@ export class EpochProvingJob implements Traceable { await this.prover.setBlockCompleted(block.number, block.header); }); + const executionTime = timer.ms(); + this.progressState('awaiting-prover'); const { publicInputs, proof } = await this.prover.finaliseEpoch(); this.log.info(`Finalised proof for epoch ${epochNumber}`, { epochNumber, uuid: this.uuid, duration: timer.ms() }); @@ -143,7 +145,7 @@ export class EpochProvingJob implements Traceable { this.log.info(`Submitted proof for epoch`, { epochNumber, uuid: this.uuid }); this.state = 'completed'; - this.metrics.recordProvingJob(timer, epochSizeBlocks, epochSizeTxs); + this.metrics.recordProvingJob(executionTime, timer.ms(), epochSizeBlocks, epochSizeTxs); } catch (err: any) { if (err && err.name === 'HaltExecutionError') { this.log.warn(`Halted execution of epoch ${epochNumber} prover job`, { uuid: this.uuid, epochNumber }); @@ -154,6 +156,7 @@ export class EpochProvingJob implements Traceable { } finally { clearTimeout(this.deadlineTimeoutHandler); await this.cleanUp(this); + await this.prover.stop(); resolve(); } } diff --git a/yarn-project/prover-node/src/metrics.ts b/yarn-project/prover-node/src/metrics.ts index 98a6cf36608..9a08a65aea8 100644 --- a/yarn-project/prover-node/src/metrics.ts +++ b/yarn-project/prover-node/src/metrics.ts @@ -1,13 +1,18 @@ -import { type Timer } from '@aztec/foundation/timer'; import { type Histogram, Metrics, type TelemetryClient, ValueType } from '@aztec/telemetry-client'; export class ProverNodeMetrics { + proverEpochExecutionDuration: Histogram; provingJobDuration: Histogram; provingJobBlocks: Histogram; provingJobTransactions: Histogram; constructor(public readonly client: TelemetryClient, name = 'ProverNode') { const meter = client.getMeter(name); + this.proverEpochExecutionDuration = meter.createHistogram(Metrics.PROVER_NODE_EXECUTION_DURATION, { + description: 'Duration of execution of an epoch by the prover', + unit: 'ms', + valueType: ValueType.INT, + }); this.provingJobDuration = meter.createHistogram(Metrics.PROVER_NODE_JOB_DURATION, { description: 'Duration of proving job', unit: 'ms', @@ -23,9 +28,9 @@ export class ProverNodeMetrics { }); } - public recordProvingJob(timerOrMs: Timer | number, numBlocks: number, numTxs: number) { - const ms = Math.ceil(typeof timerOrMs === 'number' ? timerOrMs : timerOrMs.ms()); - this.provingJobDuration.record(ms); + public recordProvingJob(executionTimeMs: number, totalTimeMs: number, numBlocks: number, numTxs: number) { + this.proverEpochExecutionDuration.record(Math.ceil(executionTimeMs)); + this.provingJobDuration.record(Math.ceil(totalTimeMs)); this.provingJobBlocks.record(Math.floor(numBlocks)); this.provingJobTransactions.record(Math.floor(numTxs)); } diff --git a/yarn-project/sequencer-client/src/tx_validator/archive_cache.test.ts b/yarn-project/sequencer-client/src/tx_validator/archive_cache.test.ts new file mode 100644 index 00000000000..01484b2052f --- /dev/null +++ b/yarn-project/sequencer-client/src/tx_validator/archive_cache.test.ts @@ -0,0 +1,38 @@ +import { MerkleTreeId, type MerkleTreeReadOperations } from '@aztec/circuit-types'; +import { Fr } from '@aztec/circuits.js'; + +import { type MockProxy, mock } from 'jest-mock-extended'; + +import { ArchiveCache } from './archive_cache.js'; + +describe('ArchiveCache', () => { + let archiveCache: ArchiveCache; + let db: MockProxy; + let archives: Fr[]; + + beforeEach(() => { + db = mock(); + archiveCache = new ArchiveCache(db); + archives = [Fr.random(), Fr.random(), Fr.random()]; + }); + + it('checks archive existence against db', async () => { + db.findLeafIndices.mockResolvedValue([1n, 2n, undefined]); + await expect(archiveCache.getArchiveIndices(archives)).resolves.toEqual([1n, 2n, undefined]); + }); + + it('checks archive existence against db only on cache miss', async () => { + db.findLeafIndices.mockResolvedValueOnce([1n, 2n, undefined]); + let result = await archiveCache.getArchiveIndices(archives); + expect(db.findLeafIndices).toHaveBeenCalledWith(MerkleTreeId.ARCHIVE, archives); + expect(result).toEqual([1n, 2n, undefined]); + db.findLeafIndices.mockReset(); + + // asking again should only request one archive from the db + db.findLeafIndices.mockResolvedValueOnce([5n]); + result = await archiveCache.getArchiveIndices(archives); + // should only request the archive that was not found last time + expect(db.findLeafIndices).toHaveBeenCalledWith(MerkleTreeId.ARCHIVE, [archives[2]]); + expect(result).toEqual([1n, 2n, 5n]); + }); +}); diff --git a/yarn-project/sequencer-client/src/tx_validator/archive_cache.ts b/yarn-project/sequencer-client/src/tx_validator/archive_cache.ts new file mode 100644 index 00000000000..d887d87e678 --- /dev/null +++ b/yarn-project/sequencer-client/src/tx_validator/archive_cache.ts @@ -0,0 +1,27 @@ +import { MerkleTreeId, type MerkleTreeReadOperations } from '@aztec/circuit-types'; +import { type Fr } from '@aztec/circuits.js'; +import { type ArchiveSource } from '@aztec/p2p'; + +/** + * Implements an archive source by checking a DB and an in-memory collection. + * Intended for validating transactions as they are added to a block. + */ +export class ArchiveCache implements ArchiveSource { + archives: Map; + + constructor(private db: MerkleTreeReadOperations) { + this.archives = new Map(); + } + + public async getArchiveIndices(archives: Fr[]): Promise<(bigint | undefined)[]> { + const toCheckDb = archives.filter(n => !this.archives.has(n.toString())); + const dbHits = await this.db.findLeafIndices(MerkleTreeId.ARCHIVE, toCheckDb); + dbHits.forEach((x, index) => { + if (x !== undefined) { + this.archives.set(toCheckDb[index].toString(), x); + } + }); + + return archives.map(n => this.archives.get(n.toString())); + } +} diff --git a/yarn-project/sequencer-client/src/tx_validator/tx_validator_factory.ts b/yarn-project/sequencer-client/src/tx_validator/tx_validator_factory.ts index 500e446360c..6e752d9971f 100644 --- a/yarn-project/sequencer-client/src/tx_validator/tx_validator_factory.ts +++ b/yarn-project/sequencer-client/src/tx_validator/tx_validator_factory.ts @@ -9,6 +9,7 @@ import { import { type AztecAddress, type ContractDataSource, Fr, type GasFees, type GlobalVariables } from '@aztec/circuits.js'; import { AggregateTxValidator, + BlockHeaderTxValidator, DataTxValidator, DoubleSpendTxValidator, MetadataTxValidator, @@ -17,6 +18,7 @@ import { import { ProtocolContractAddress } from '@aztec/protocol-contracts'; import { readPublicState } from '@aztec/simulator'; +import { ArchiveCache } from './archive_cache.js'; import { GasTxValidator, type PublicStateSource } from './gas_validator.js'; import { NullifierCache } from './nullifier_cache.js'; import { PhasesTxValidator } from './phases_validator.js'; @@ -40,6 +42,7 @@ export function createValidatorForAcceptingTxs( new DoubleSpendTxValidator(new NullifierCache(db)), new PhasesTxValidator(contractDataSource, setupAllowList), new GasTxValidator(new DatabasePublicStateSource(db), ProtocolContractAddress.FeeJuice, enforceFees, gasFees), + new BlockHeaderTxValidator(new ArchiveCache(db)), ]; if (verifier) { @@ -61,11 +64,13 @@ export function createValidatorsForBlockBuilding( nullifierCache: NullifierCache; } { const nullifierCache = new NullifierCache(db); + const archiveCache = new ArchiveCache(db); const publicStateSource = new DatabasePublicStateSource(db); return { preprocessValidator: preprocessValidator( nullifierCache, + archiveCache, publicStateSource, contractDataSource, enforceFees, @@ -87,6 +92,7 @@ class DatabasePublicStateSource implements PublicStateSource { function preprocessValidator( nullifierCache: NullifierCache, + archiveCache: ArchiveCache, publicStateSource: PublicStateSource, contractDataSource: ContractDataSource, enforceFees: boolean, @@ -99,6 +105,7 @@ function preprocessValidator( new DoubleSpendTxValidator(nullifierCache), new PhasesTxValidator(contractDataSource, setupAllowList), new GasTxValidator(publicStateSource, ProtocolContractAddress.FeeJuice, enforceFees, globalVariables.gasFees), + new BlockHeaderTxValidator(archiveCache), ); } diff --git a/yarn-project/telemetry-client/src/metrics.ts b/yarn-project/telemetry-client/src/metrics.ts index d68b39399c6..64e26377282 100644 --- a/yarn-project/telemetry-client/src/metrics.ts +++ b/yarn-project/telemetry-client/src/metrics.ts @@ -41,7 +41,7 @@ export const MEMPOOL_PROVER_QUOTE_SIZE = 'aztec.mempool.prover_quote_size'; export const ARCHIVER_SYNC_DURATION = 'aztec.archiver.sync_duration'; export const ARCHIVER_L1_BLOCKS_SYNCED = 'aztec.archiver.l1_blocks_synced'; export const ARCHIVER_BLOCK_HEIGHT = 'aztec.archiver.block_height'; -export const ARCHIVER_BLOCK_SIZE = 'aztec.archiver.block_size'; +export const ARCHIVER_TX_COUNT = 'aztec.archiver.tx_count'; export const ARCHIVER_ROLLUP_PROOF_DELAY = 'aztec.archiver.rollup_proof_delay'; export const ARCHIVER_ROLLUP_PROOF_COUNT = 'aztec.archiver.rollup_proof_count'; export const ARCHIVER_PRUNE_COUNT = 'aztec.archiver.prune_count'; @@ -103,6 +103,7 @@ export const PROVING_QUEUE_DB_USED_SIZE = 'aztec.proving_queue.db.used_size'; export const PROVING_AGENT_IDLE = 'aztec.proving_queue.agent.idle'; +export const PROVER_NODE_EXECUTION_DURATION = 'aztec.prover_node.execution.duration'; export const PROVER_NODE_JOB_DURATION = 'aztec.prover_node.job_duration'; export const PROVER_NODE_JOB_BLOCKS = 'aztec.prover_node.job_blocks'; export const PROVER_NODE_JOB_TRANSACTIONS = 'aztec.prover_node.job_transactions'; diff --git a/yarn-project/yarn.lock b/yarn-project/yarn.lock index 49f73203701..cf4a8c93dd9 100644 --- a/yarn-project/yarn.lock +++ b/yarn-project/yarn.lock @@ -847,7 +847,7 @@ __metadata: chai-as-promised: "npm:^8.0.1" idb: "npm:^8.0.0" jest: "npm:^29.5.0" - lmdb: "npm:^3.0.6" + lmdb: "npm:^3.2.0" mocha: "npm:^10.8.2" mocha-each: "npm:^2.0.1" ts-node: "npm:^10.9.1" @@ -3457,44 +3457,44 @@ __metadata: languageName: node linkType: hard -"@lmdb/lmdb-darwin-arm64@npm:3.0.8": - version: 3.0.8 - resolution: "@lmdb/lmdb-darwin-arm64@npm:3.0.8" +"@lmdb/lmdb-darwin-arm64@npm:3.2.0": + version: 3.2.0 + resolution: "@lmdb/lmdb-darwin-arm64@npm:3.2.0" conditions: os=darwin & cpu=arm64 languageName: node linkType: hard -"@lmdb/lmdb-darwin-x64@npm:3.0.8": - version: 3.0.8 - resolution: "@lmdb/lmdb-darwin-x64@npm:3.0.8" +"@lmdb/lmdb-darwin-x64@npm:3.2.0": + version: 3.2.0 + resolution: "@lmdb/lmdb-darwin-x64@npm:3.2.0" conditions: os=darwin & cpu=x64 languageName: node linkType: hard -"@lmdb/lmdb-linux-arm64@npm:3.0.8": - version: 3.0.8 - resolution: "@lmdb/lmdb-linux-arm64@npm:3.0.8" +"@lmdb/lmdb-linux-arm64@npm:3.2.0": + version: 3.2.0 + resolution: "@lmdb/lmdb-linux-arm64@npm:3.2.0" conditions: os=linux & cpu=arm64 languageName: node linkType: hard -"@lmdb/lmdb-linux-arm@npm:3.0.8": - version: 3.0.8 - resolution: "@lmdb/lmdb-linux-arm@npm:3.0.8" +"@lmdb/lmdb-linux-arm@npm:3.2.0": + version: 3.2.0 + resolution: "@lmdb/lmdb-linux-arm@npm:3.2.0" conditions: os=linux & cpu=arm languageName: node linkType: hard -"@lmdb/lmdb-linux-x64@npm:3.0.8": - version: 3.0.8 - resolution: "@lmdb/lmdb-linux-x64@npm:3.0.8" +"@lmdb/lmdb-linux-x64@npm:3.2.0": + version: 3.2.0 + resolution: "@lmdb/lmdb-linux-x64@npm:3.2.0" conditions: os=linux & cpu=x64 languageName: node linkType: hard -"@lmdb/lmdb-win32-x64@npm:3.0.8": - version: 3.0.8 - resolution: "@lmdb/lmdb-win32-x64@npm:3.0.8" +"@lmdb/lmdb-win32-x64@npm:3.2.0": + version: 3.2.0 + resolution: "@lmdb/lmdb-win32-x64@npm:3.2.0" conditions: os=win32 & cpu=x64 languageName: node linkType: hard @@ -14031,21 +14031,21 @@ __metadata: languageName: node linkType: hard -"lmdb@npm:^3.0.6": - version: 3.0.8 - resolution: "lmdb@npm:3.0.8" - dependencies: - "@lmdb/lmdb-darwin-arm64": "npm:3.0.8" - "@lmdb/lmdb-darwin-x64": "npm:3.0.8" - "@lmdb/lmdb-linux-arm": "npm:3.0.8" - "@lmdb/lmdb-linux-arm64": "npm:3.0.8" - "@lmdb/lmdb-linux-x64": "npm:3.0.8" - "@lmdb/lmdb-win32-x64": "npm:3.0.8" - msgpackr: "npm:^1.9.9" +"lmdb@npm:^3.2.0": + version: 3.2.0 + resolution: "lmdb@npm:3.2.0" + dependencies: + "@lmdb/lmdb-darwin-arm64": "npm:3.2.0" + "@lmdb/lmdb-darwin-x64": "npm:3.2.0" + "@lmdb/lmdb-linux-arm": "npm:3.2.0" + "@lmdb/lmdb-linux-arm64": "npm:3.2.0" + "@lmdb/lmdb-linux-x64": "npm:3.2.0" + "@lmdb/lmdb-win32-x64": "npm:3.2.0" + msgpackr: "npm:^1.11.2" node-addon-api: "npm:^6.1.0" node-gyp: "npm:latest" - node-gyp-build-optional-packages: "npm:5.1.1" - ordered-binary: "npm:^1.4.1" + node-gyp-build-optional-packages: "npm:5.2.2" + ordered-binary: "npm:^1.5.3" weak-lru-cache: "npm:^1.2.2" dependenciesMeta: "@lmdb/lmdb-darwin-arm64": @@ -14062,7 +14062,7 @@ __metadata: optional: true bin: download-lmdb-prebuilds: bin/download-prebuilds.js - checksum: 10/f72a304ef2f438f9155cf7ff1cdfc7c2fa0679e53274dc16aa8e9371590cf0e528c12a283fc30f3804ac0985d5287940230d4e1ccb26ea98eb883f9da710657a + checksum: 10/fcda059ca8475e85e883b26e02b4e18549d019258b0db244ec7f487207af6570dc777325e7825dfc9572c0ed536d0f2b482183796ccf98f36f5c1264ea18ed60 languageName: node linkType: hard @@ -15038,15 +15038,15 @@ __metadata: languageName: node linkType: hard -"msgpackr@npm:^1.9.9": - version: 1.10.1 - resolution: "msgpackr@npm:1.10.1" +"msgpackr@npm:^1.11.2": + version: 1.11.2 + resolution: "msgpackr@npm:1.11.2" dependencies: msgpackr-extract: "npm:^3.0.2" dependenciesMeta: msgpackr-extract: optional: true - checksum: 10/9fc78c78435a773ed919720ab4f276f6781c6feac0e88233783d00deb64ef1c1caf75781f4a5c675bb535292bb9a67f2262ffc15b64cdc16b176b5220997d564 + checksum: 10/7602f1e91e5ba13f4289ec9cab0d3f3db87d4ed323bebcb40a0c43ba2f6153192bffb63a5bb4755faacb6e0985f307c35084f40eaba1c325b7035da91381f01a languageName: node linkType: hard @@ -15269,16 +15269,16 @@ __metadata: languageName: node linkType: hard -"node-gyp-build-optional-packages@npm:5.1.1": - version: 5.1.1 - resolution: "node-gyp-build-optional-packages@npm:5.1.1" +"node-gyp-build-optional-packages@npm:5.2.2": + version: 5.2.2 + resolution: "node-gyp-build-optional-packages@npm:5.2.2" dependencies: detect-libc: "npm:^2.0.1" bin: node-gyp-build-optional-packages: bin.js node-gyp-build-optional-packages-optional: optional.js node-gyp-build-optional-packages-test: build-test.js - checksum: 10/96dbeeba03fe5b9e86e1dc4491d7932cbf4c23f4ef8e63fb83bbbdcaf4553d8cbd5f23b9bc3632cb76a0739524f4b64f829daa5b608ebd72285ffdb03a9bdd81 + checksum: 10/f448a328cf608071dc8cc4426ac5be0daec4788e4e1759e9f7ffcd286822cc799384edce17a8c79e610c4bbfc8e3aff788f3681f1d88290e0ca7aaa5342a090f languageName: node linkType: hard @@ -15624,10 +15624,10 @@ __metadata: languageName: node linkType: hard -"ordered-binary@npm:^1.4.1": - version: 1.5.1 - resolution: "ordered-binary@npm:1.5.1" - checksum: 10/9b407e20ba90d4fc44938746295b3d301dcfa26983a88482e028e96479cd30dca6da33c052070bef034aa89280ff2befb75bbe4663f1f8210a12ce5586de2290 +"ordered-binary@npm:^1.5.3": + version: 1.5.3 + resolution: "ordered-binary@npm:1.5.3" + checksum: 10/52dae0dc08a0c77a16ae456e6b5fe98e6201add839e3b8b35617056f3fc0b96b8e866012d58d30aef933f964390fe5457c3d178117720378f9d7a90c1ca24e5f languageName: node linkType: hard