From 0e72f6b2f0186fd4fa27e0399829f7ac836c8658 Mon Sep 17 00:00:00 2001 From: Paul Nicolas Date: Wed, 15 Mar 2023 11:45:50 +0100 Subject: [PATCH] feat: remove deprecated ledger field for event message (#150) --- components/ledger/pkg/bus/message.go | 6 - .../benthos/streams/ledger_ingestion.yaml | 278 +++++++++--------- .../streams/ledger_reindex_accounts.yaml | 89 +++--- .../streams/ledger_reindex_transactions.yaml | 119 ++++---- .../streams/ledger_reindex_volumes.yaml | 87 +++--- 5 files changed, 285 insertions(+), 294 deletions(-) diff --git a/components/ledger/pkg/bus/message.go b/components/ledger/pkg/bus/message.go index c49004d96..77e139b30 100644 --- a/components/ledger/pkg/bus/message.go +++ b/components/ledger/pkg/bus/message.go @@ -22,8 +22,6 @@ type EventMessage struct { Version string `json:"version"` Type string `json:"type"` Payload any `json:"payload"` - // TODO: deprecated in future version - Ledger string `json:"ledger"` } type CommittedTransactions struct { @@ -42,7 +40,6 @@ func newEventCommittedTransactions(txs CommittedTransactions) EventMessage { Version: EventVersion, Type: EventTypeCommittedTransactions, Payload: txs, - Ledger: txs.Ledger, } } @@ -60,7 +57,6 @@ func newEventSavedMetadata(metadata SavedMetadata) EventMessage { Version: EventVersion, Type: EventTypeSavedMetadata, Payload: metadata, - Ledger: metadata.Ledger, } } @@ -76,7 +72,6 @@ func newEventUpdatedMapping(mapping UpdatedMapping) EventMessage { Version: EventVersion, Type: EventTypeUpdatedMapping, Payload: mapping, - Ledger: mapping.Ledger, } } @@ -93,6 +88,5 @@ func newEventRevertedTransaction(tx RevertedTransaction) EventMessage { Version: EventVersion, Type: EventTypeRevertedTransaction, Payload: tx, - Ledger: tx.Ledger, } } diff --git a/components/search/benthos/streams/ledger_ingestion.yaml b/components/search/benthos/streams/ledger_ingestion.yaml index 92e442733..a5a05ea6f 100644 --- a/components/search/benthos/streams/ledger_ingestion.yaml +++ b/components/search/benthos/streams/ledger_ingestion.yaml @@ -6,154 +6,154 @@ input: pipeline: processors: - - switch_event_type: - events: - - label: COMMITTED_TRANSACTIONS - processors: - - bloblang: | - map account { - root = this.map_each(v -> v.value.map_each(v2 -> { - "action": "upsert", - "id": v.key, - "document": { - "data": { - "address": v.key - }, - "indexed": { - "address": v.key - }, - "kind": "ACCOUNT" + - switch_event_type: + events: + - label: COMMITTED_TRANSACTIONS + processors: + - bloblang: | + map account { + root = this.map_each(v -> v.value.map_each(v2 -> { + "action": "upsert", + "id": v.key, + "document": { + "data": { + "address": v.key + }, + "indexed": { + "address": v.key + }, + "kind": "ACCOUNT" + } + }).values()).values().flatten() } - }).values()).values().flatten() - } - map volumes { - root = this.map_each(v -> v.value.map_each(v2 -> { - "action": "index", - "id": "%s-%s".format(v.key, v2.key), - "document": { + map volumes { + root = this.map_each(v -> v.value.map_each(v2 -> { + "action": "index", + "id": "%s-%s".format(v.key, v2.key), + "document": { + "data": { + "name": v2.key, + "input": v2.value.input, + "output": v2.value.output, + "account": v.key + }, + "indexed": { + "account": v.key, + "name": v2.key + }, + "kind": "ASSET" + } + }).values()).values().flatten() + } + + map tx { + root = { + "action": "index", + "id": "%s".format(this.txid), + "document": { "data": { - "name": v2.key, - "input": v2.value.input, - "output": v2.value.output, - "account": v.key + "postings": this.postings, + "reference": this.reference, + "txid": this.txid, + "timestamp": this.timestamp, + "metadata": if this.metadata { this.metadata } else {{}} }, "indexed": { - "account": v.key, - "name": v2.key + "reference": this.reference, + "txid": this.txid, + "timestamp": this.timestamp, + "asset": this.postings.map_each(p -> p.asset), + "source": this.postings.map_each(p -> p.source), + "destination": this.postings.map_each(p -> p.destination), + "amount": this.postings.map_each(p -> if p.asset.contains("/") { + [ + p.amount, + p.amount / range(0, p.asset.split("/").index(1).number()).fold(1, t -> t.tally * 10) # amount / pow(10, decimal part of asset) + ] + } else { [ p.amount ] }).flatten().map_each(v -> "%v".format(v)) }, - "kind": "ASSET" + "kind": "TRANSACTION" + } } - }).values()).values().flatten() - } - - map tx { - root = { - "action": "index", - "id": "%s".format(this.txid), - "document": { - "data": { - "postings": this.postings, - "reference": this.reference, - "txid": this.txid, - "timestamp": this.timestamp, - "metadata": if this.metadata { this.metadata } else {{}} - }, - "indexed": { - "reference": this.reference, - "txid": this.txid, - "timestamp": this.timestamp, - "asset": this.postings.map_each(p -> p.asset), - "source": this.postings.map_each(p -> p.source), - "destination": this.postings.map_each(p -> p.destination), - "amount": this.postings.map_each(p -> if p.asset.contains("/") { - [ - p.amount, - p.amount / range(0, p.asset.split("/").index(1).number()).fold(1, t -> t.tally * 10) # amount / pow(10, decimal part of asset) - ] - } else { [ p.amount ] }).flatten().map_each(v -> "%v".format(v)) - }, - "kind": "TRANSACTION" - } - } - } + } - map committedTransactions { - root = [ - this.payload.transactions.map_each(t -> t.apply("tx")).map_each(t -> t.assign({ - "id": "TRANSACTION-%s-%s".format(this.ledger, t.id) - })), - this.payload.volumes.apply("volumes"). - sort(v -> v.right.id > v.left.id). - map_each(t -> t.assign({ - "id": "ASSET-%s-%s".format(this.ledger, t.id) - })), - this.payload.volumes.apply("account"). - sort(v -> v.right.id > v.left.id). - map_each(t -> t.assign({ - "id": "ACCOUNT-%s-%s".format(this.ledger, t.id) - })), - ].flatten().map_each(t -> t.merge({ - "document": { - "when": this.date, - "ledger": this.ledger, - "data": { - "ledger": this.ledger - }, - "indexed": { - "ledger": this.ledger - } - }, - })) - } + map committedTransactions { + root = [ + this.payload.transactions.map_each(t -> t.apply("tx")).map_each(t -> t.assign({ + "id": "TRANSACTION-%s-%s".format(this.payload.ledger, t.id) + })), + this.payload.volumes.apply("volumes"). + sort(v -> v.right.id > v.left.id). + map_each(t -> t.assign({ + "id": "ASSET-%s-%s".format(this.payload.ledger, t.id) + })), + this.payload.volumes.apply("account"). + sort(v -> v.right.id > v.left.id). + map_each(t -> t.assign({ + "id": "ACCOUNT-%s-%s".format(this.payload.ledger, t.id) + })), + ].flatten().map_each(t -> t.merge({ + "document": { + "when": this.date, + "ledger": this.payload.ledger, + "data": { + "ledger": this.payload.ledger + }, + "indexed": { + "ledger": this.payload.ledger + } + }, + })) + } - root = this.apply("committedTransactions") - - unarchive: - format: json_array - - bloblang: | - meta action = this.action - meta id = this.id - root = this.document - - label: SAVED_METADATA - processors: - - bloblang: | - meta targetType = this.payload.targetType - meta targetId = this.payload.targetId - meta ledger = this.ledger - meta id = "%s-%s-%s".format(this.payload.targetType, this.ledger, this.payload.targetId) - meta action = "upsert" - root = this - - get_doc: - id: ${!meta("id")} - - catch: - - log: - level: INFO - message: Document not found, assume it is an account - - bloblang: | - root = this.assign({ - "_doc": { - "data": { - "address": meta("targetId"), - "metadata": {} - }, - "indexed": { - "address": meta("targetId") - }, - "kind": "ACCOUNT", # If not found, so, this is an account - } - }) - - bloblang: | - root = this._doc.assign({ - "data": { - "metadata": this.payload.metadata, - "ledger": this.ledger - }, - "indexed": { - "ledger": this.ledger - }, - "ledger": this.ledger, - "when": this.date - }) + root = this.apply("committedTransactions") + - unarchive: + format: json_array + - bloblang: | + meta action = this.action + meta id = this.id + root = this.document + - label: SAVED_METADATA + processors: + - bloblang: | + meta targetType = this.payload.targetType + meta targetId = this.payload.targetId + meta ledger = this.payload.ledger + meta id = "%s-%s-%s".format(this.payload.targetType, this.payload.ledger, this.payload.targetId) + meta action = "upsert" + root = this + - get_doc: + id: ${!meta("id")} + - catch: + - log: + level: INFO + message: Document not found, assume it is an account + - bloblang: | + root = this.assign({ + "_doc": { + "data": { + "address": meta("targetId"), + "metadata": {} + }, + "indexed": { + "address": meta("targetId") + }, + "kind": "ACCOUNT", # If not found, so, this is an account + } + }) + - bloblang: | + root = this._doc.assign({ + "data": { + "metadata": this.payload.metadata, + "ledger": this.payload.ledger + }, + "indexed": { + "ledger": this.payload.ledger + }, + "ledger": this.payload.ledger, + "when": this.date + }) output: resource: elasticsearch diff --git a/components/search/benthos/streams/ledger_reindex_accounts.yaml b/components/search/benthos/streams/ledger_reindex_accounts.yaml index f90e73510..53a23699b 100644 --- a/components/search/benthos/streams/ledger_reindex_accounts.yaml +++ b/components/search/benthos/streams/ledger_reindex_accounts.yaml @@ -4,51 +4,50 @@ input: pipeline: processors: - - bloblang: | - meta ledger = this.ledger - meta batchSize = 100 - - postgres_query: - service: ledger - query: 'select count(*) as accounts_count from "${! meta("ledger") }".accounts' - - unarchive: - format: json_array - - bloblang: | - meta loopCount = (this.accounts_count.number() / meta("batchSize").number()).ceil() - meta loopIndex = 0 - - while: - check: 'meta("loopIndex") < meta("loopCount")' - processors: - - postgres_query: - service: ledger - query: | - select address, metadata - from "${! meta("ledger") }".accounts - offset ${! meta("loopIndex").number() * meta("batchSize").number() } - limit ${! meta("batchSize") } - - bloblang: - meta loopIndex = meta("loopIndex").number() + 1 - - unarchive: - format: json_array - - bloblang: | - root = this.assign({ - "metadata": this.metadata.parse_json() - }) - - bloblang: | - meta action = "upsert" - meta id = "ACCOUNT-%s-%d".format(meta("ledger"), this.address) - root = { - "data": { - "address": this.address, - "ledger": meta("ledger"), - "metadata": this.metadata - }, - "indexed": { - "address": this.address, - "ledger": meta("ledger") - }, - "kind": "ACCOUNT", - "ledger": meta("ledger") - } + - bloblang: | + meta ledger = this.payload.ledger + meta batchSize = 100 + - postgres_query: + service: ledger + query: 'select count(*) as accounts_count from "${! meta("ledger") }".accounts' + - unarchive: + format: json_array + - bloblang: | + meta loopCount = (this.accounts_count.number() / meta("batchSize").number()).ceil() + meta loopIndex = 0 + - while: + check: 'meta("loopIndex") < meta("loopCount")' + processors: + - postgres_query: + service: ledger + query: | + select address, metadata + from "${! meta("ledger") }".accounts + offset ${! meta("loopIndex").number() * meta("batchSize").number() } + limit ${! meta("batchSize") } + - bloblang: meta loopIndex = meta("loopIndex").number() + 1 + - unarchive: + format: json_array + - bloblang: | + root = this.assign({ + "metadata": this.metadata.parse_json() + }) + - bloblang: | + meta action = "upsert" + meta id = "ACCOUNT-%s-%d".format(meta("ledger"), this.address) + root = { + "data": { + "address": this.address, + "ledger": meta("ledger"), + "metadata": this.metadata + }, + "indexed": { + "address": this.address, + "ledger": meta("ledger") + }, + "kind": "ACCOUNT", + "ledger": meta("ledger") + } output: resource: elasticsearch diff --git a/components/search/benthos/streams/ledger_reindex_transactions.yaml b/components/search/benthos/streams/ledger_reindex_transactions.yaml index b6707da3c..a40067461 100644 --- a/components/search/benthos/streams/ledger_reindex_transactions.yaml +++ b/components/search/benthos/streams/ledger_reindex_transactions.yaml @@ -4,66 +4,65 @@ input: pipeline: processors: - - bloblang: | - meta ledger = this.ledger - meta batchSize = 100 - - postgres_query: - service: ledger - query: 'select count(*) as transactions_count from "${! meta("ledger") }".transactions' - - unarchive: - format: json_array - - bloblang: | - meta loopCount = (this.transactions_count.number() / meta("batchSize").number()).ceil() - meta loopIndex = 0 - - while: - check: 'meta("loopIndex") < meta("loopCount")' - processors: - - postgres_query: - service: ledger - query: | - select id, timestamp, reference, metadata, postings - from "${! meta("ledger") }".transactions - offset ${! meta("loopIndex").number() * meta("batchSize").number() } - limit ${! meta("batchSize") } - - bloblang: - meta loopIndex = meta("loopIndex").number() + 1 - - unarchive: - format: json_array - - bloblang: | - root = this.assign({ - "postings": this.postings.parse_json(), - "metadata": this.metadata.parse_json() - }) - - bloblang: | - meta action = "upsert" - meta id = "TRANSACTION-%s-%d".format(meta("ledger"), this.id) - root = { - "data": { - "postings": this.postings, - "reference": this.reference, - "txid": this.id, - "timestamp": this.timestamp, - "metadata": if this.metadata { this.metadata } else {{}}, - "ledger": meta("ledger") - }, - "indexed": { - "reference": this.reference, - "txid": this.id, - "timestamp": this.timestamp, - "asset": this.postings.map_each(p -> p.asset), - "source": this.postings.map_each(p -> p.source), - "destination": this.postings.map_each(p -> p.destination), - "amount": this.postings.map_each(p -> if p.asset.contains("/") { - [ - p.amount, - p.amount / range(0, p.asset.split("/").index(1).number()).fold(1, t -> t.tally * 10) # amount / pow(10, decimal part of asset) - ] - } else { [ p.amount ] }).flatten().map_each(v -> "%v".format(v)), - "ledger": meta("ledger") - }, - "kind": "TRANSACTION", - "ledger": meta("ledger") - } + - bloblang: | + meta ledger = this.payload.ledger + meta batchSize = 100 + - postgres_query: + service: ledger + query: 'select count(*) as transactions_count from "${! meta("ledger") }".transactions' + - unarchive: + format: json_array + - bloblang: | + meta loopCount = (this.transactions_count.number() / meta("batchSize").number()).ceil() + meta loopIndex = 0 + - while: + check: 'meta("loopIndex") < meta("loopCount")' + processors: + - postgres_query: + service: ledger + query: | + select id, timestamp, reference, metadata, postings + from "${! meta("ledger") }".transactions + offset ${! meta("loopIndex").number() * meta("batchSize").number() } + limit ${! meta("batchSize") } + - bloblang: meta loopIndex = meta("loopIndex").number() + 1 + - unarchive: + format: json_array + - bloblang: | + root = this.assign({ + "postings": this.postings.parse_json(), + "metadata": this.metadata.parse_json() + }) + - bloblang: | + meta action = "upsert" + meta id = "TRANSACTION-%s-%d".format(meta("ledger"), this.id) + root = { + "data": { + "postings": this.postings, + "reference": this.reference, + "txid": this.id, + "timestamp": this.timestamp, + "metadata": if this.metadata { this.metadata } else {{}}, + "ledger": meta("ledger") + }, + "indexed": { + "reference": this.reference, + "txid": this.id, + "timestamp": this.timestamp, + "asset": this.postings.map_each(p -> p.asset), + "source": this.postings.map_each(p -> p.source), + "destination": this.postings.map_each(p -> p.destination), + "amount": this.postings.map_each(p -> if p.asset.contains("/") { + [ + p.amount, + p.amount / range(0, p.asset.split("/").index(1).number()).fold(1, t -> t.tally * 10) # amount / pow(10, decimal part of asset) + ] + } else { [ p.amount ] }).flatten().map_each(v -> "%v".format(v)), + "ledger": meta("ledger") + }, + "kind": "TRANSACTION", + "ledger": meta("ledger") + } output: resource: elasticsearch diff --git a/components/search/benthos/streams/ledger_reindex_volumes.yaml b/components/search/benthos/streams/ledger_reindex_volumes.yaml index 7cb7f41a2..870047e8d 100644 --- a/components/search/benthos/streams/ledger_reindex_volumes.yaml +++ b/components/search/benthos/streams/ledger_reindex_volumes.yaml @@ -4,50 +4,49 @@ input: pipeline: processors: - - bloblang: | - meta ledger = this.ledger - meta batchSize = 100 - - postgres_query: - service: ledger - query: 'select count(*) as volumes_count from "${! meta("ledger") }".volumes' - - unarchive: - format: json_array - - bloblang: | - meta loopCount = (this.volumes_count.number() / meta("batchSize").number()).ceil() - meta loopIndex = 0 - - while: - check: 'meta("loopIndex") < meta("loopCount")' - processors: - - postgres_query: - service: ledger - query: | - select account, asset, input, output - from "${! meta("ledger") }".volumes - offset ${! meta("loopIndex").number() * meta("batchSize").number() } - limit ${! meta("batchSize") } - - bloblang: - meta loopIndex = meta("loopIndex").number() + 1 - - unarchive: - format: json_array - - bloblang: | - meta action = "upsert" - meta id = "ASSET-%s-%s-%s".format(meta("ledger"), this.account, this.asset) - root = { - "data": { - "name": this.asset, - "input": this.input, - "output": this.output, - "account": this.account, - "ledger": meta("ledger") - }, - "indexed": { - "account": this.account, - "name": this.asset, - "ledger": meta("ledger") - }, - "kind": "ASSET", - "ledger": meta("ledger") - } + - bloblang: | + meta ledger = this.payload.ledger + meta batchSize = 100 + - postgres_query: + service: ledger + query: 'select count(*) as volumes_count from "${! meta("ledger") }".volumes' + - unarchive: + format: json_array + - bloblang: | + meta loopCount = (this.volumes_count.number() / meta("batchSize").number()).ceil() + meta loopIndex = 0 + - while: + check: 'meta("loopIndex") < meta("loopCount")' + processors: + - postgres_query: + service: ledger + query: | + select account, asset, input, output + from "${! meta("ledger") }".volumes + offset ${! meta("loopIndex").number() * meta("batchSize").number() } + limit ${! meta("batchSize") } + - bloblang: meta loopIndex = meta("loopIndex").number() + 1 + - unarchive: + format: json_array + - bloblang: | + meta action = "upsert" + meta id = "ASSET-%s-%s-%s".format(meta("ledger"), this.account, this.asset) + root = { + "data": { + "name": this.asset, + "input": this.input, + "output": this.output, + "account": this.account, + "ledger": meta("ledger") + }, + "indexed": { + "account": this.account, + "name": this.asset, + "ledger": meta("ledger") + }, + "kind": "ASSET", + "ledger": meta("ledger") + } output: resource: elasticsearch