Skip to content

Commit

Permalink
feat: remove deprecated ledger field for event message (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored Mar 15, 2023
1 parent a7cedb1 commit 0e72f6b
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 294 deletions.
6 changes: 0 additions & 6 deletions components/ledger/pkg/bus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ type EventMessage struct {
Version string `json:"version"`
Type string `json:"type"`
Payload any `json:"payload"`
// TODO: deprecated in future version
Ledger string `json:"ledger"`
}

type CommittedTransactions struct {
Expand All @@ -42,7 +40,6 @@ func newEventCommittedTransactions(txs CommittedTransactions) EventMessage {
Version: EventVersion,
Type: EventTypeCommittedTransactions,
Payload: txs,
Ledger: txs.Ledger,
}
}

Expand All @@ -60,7 +57,6 @@ func newEventSavedMetadata(metadata SavedMetadata) EventMessage {
Version: EventVersion,
Type: EventTypeSavedMetadata,
Payload: metadata,
Ledger: metadata.Ledger,
}
}

Expand All @@ -76,7 +72,6 @@ func newEventUpdatedMapping(mapping UpdatedMapping) EventMessage {
Version: EventVersion,
Type: EventTypeUpdatedMapping,
Payload: mapping,
Ledger: mapping.Ledger,
}
}

Expand All @@ -93,6 +88,5 @@ func newEventRevertedTransaction(tx RevertedTransaction) EventMessage {
Version: EventVersion,
Type: EventTypeRevertedTransaction,
Payload: tx,
Ledger: tx.Ledger,
}
}
278 changes: 139 additions & 139 deletions components/search/benthos/streams/ledger_ingestion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,154 +6,154 @@ input:

pipeline:
processors:
- switch_event_type:
events:
- label: COMMITTED_TRANSACTIONS
processors:
- bloblang: |
map account {
root = this.map_each(v -> v.value.map_each(v2 -> {
"action": "upsert",
"id": v.key,
"document": {
"data": {
"address": v.key
},
"indexed": {
"address": v.key
},
"kind": "ACCOUNT"
- switch_event_type:
events:
- label: COMMITTED_TRANSACTIONS
processors:
- bloblang: |
map account {
root = this.map_each(v -> v.value.map_each(v2 -> {
"action": "upsert",
"id": v.key,
"document": {
"data": {
"address": v.key
},
"indexed": {
"address": v.key
},
"kind": "ACCOUNT"
}
}).values()).values().flatten()
}
}).values()).values().flatten()
}
map volumes {
root = this.map_each(v -> v.value.map_each(v2 -> {
"action": "index",
"id": "%s-%s".format(v.key, v2.key),
"document": {
map volumes {
root = this.map_each(v -> v.value.map_each(v2 -> {
"action": "index",
"id": "%s-%s".format(v.key, v2.key),
"document": {
"data": {
"name": v2.key,
"input": v2.value.input,
"output": v2.value.output,
"account": v.key
},
"indexed": {
"account": v.key,
"name": v2.key
},
"kind": "ASSET"
}
}).values()).values().flatten()
}
map tx {
root = {
"action": "index",
"id": "%s".format(this.txid),
"document": {
"data": {
"name": v2.key,
"input": v2.value.input,
"output": v2.value.output,
"account": v.key
"postings": this.postings,
"reference": this.reference,
"txid": this.txid,
"timestamp": this.timestamp,
"metadata": if this.metadata { this.metadata } else {{}}
},
"indexed": {
"account": v.key,
"name": v2.key
"reference": this.reference,
"txid": this.txid,
"timestamp": this.timestamp,
"asset": this.postings.map_each(p -> p.asset),
"source": this.postings.map_each(p -> p.source),
"destination": this.postings.map_each(p -> p.destination),
"amount": this.postings.map_each(p -> if p.asset.contains("/") {
[
p.amount,
p.amount / range(0, p.asset.split("/").index(1).number()).fold(1, t -> t.tally * 10) # amount / pow(10, decimal part of asset)
]
} else { [ p.amount ] }).flatten().map_each(v -> "%v".format(v))
},
"kind": "ASSET"
"kind": "TRANSACTION"
}
}
}).values()).values().flatten()
}
map tx {
root = {
"action": "index",
"id": "%s".format(this.txid),
"document": {
"data": {
"postings": this.postings,
"reference": this.reference,
"txid": this.txid,
"timestamp": this.timestamp,
"metadata": if this.metadata { this.metadata } else {{}}
},
"indexed": {
"reference": this.reference,
"txid": this.txid,
"timestamp": this.timestamp,
"asset": this.postings.map_each(p -> p.asset),
"source": this.postings.map_each(p -> p.source),
"destination": this.postings.map_each(p -> p.destination),
"amount": this.postings.map_each(p -> if p.asset.contains("/") {
[
p.amount,
p.amount / range(0, p.asset.split("/").index(1).number()).fold(1, t -> t.tally * 10) # amount / pow(10, decimal part of asset)
]
} else { [ p.amount ] }).flatten().map_each(v -> "%v".format(v))
},
"kind": "TRANSACTION"
}
}
}
}
map committedTransactions {
root = [
this.payload.transactions.map_each(t -> t.apply("tx")).map_each(t -> t.assign({
"id": "TRANSACTION-%s-%s".format(this.ledger, t.id)
})),
this.payload.volumes.apply("volumes").
sort(v -> v.right.id > v.left.id).
map_each(t -> t.assign({
"id": "ASSET-%s-%s".format(this.ledger, t.id)
})),
this.payload.volumes.apply("account").
sort(v -> v.right.id > v.left.id).
map_each(t -> t.assign({
"id": "ACCOUNT-%s-%s".format(this.ledger, t.id)
})),
].flatten().map_each(t -> t.merge({
"document": {
"when": this.date,
"ledger": this.ledger,
"data": {
"ledger": this.ledger
},
"indexed": {
"ledger": this.ledger
}
},
}))
}
map committedTransactions {
root = [
this.payload.transactions.map_each(t -> t.apply("tx")).map_each(t -> t.assign({
"id": "TRANSACTION-%s-%s".format(this.payload.ledger, t.id)
})),
this.payload.volumes.apply("volumes").
sort(v -> v.right.id > v.left.id).
map_each(t -> t.assign({
"id": "ASSET-%s-%s".format(this.payload.ledger, t.id)
})),
this.payload.volumes.apply("account").
sort(v -> v.right.id > v.left.id).
map_each(t -> t.assign({
"id": "ACCOUNT-%s-%s".format(this.payload.ledger, t.id)
})),
].flatten().map_each(t -> t.merge({
"document": {
"when": this.date,
"ledger": this.payload.ledger,
"data": {
"ledger": this.payload.ledger
},
"indexed": {
"ledger": this.payload.ledger
}
},
}))
}
root = this.apply("committedTransactions")
- unarchive:
format: json_array
- bloblang: |
meta action = this.action
meta id = this.id
root = this.document
- label: SAVED_METADATA
processors:
- bloblang: |
meta targetType = this.payload.targetType
meta targetId = this.payload.targetId
meta ledger = this.ledger
meta id = "%s-%s-%s".format(this.payload.targetType, this.ledger, this.payload.targetId)
meta action = "upsert"
root = this
- get_doc:
id: ${!meta("id")}
- catch:
- log:
level: INFO
message: Document not found, assume it is an account
- bloblang: |
root = this.assign({
"_doc": {
"data": {
"address": meta("targetId"),
"metadata": {}
},
"indexed": {
"address": meta("targetId")
},
"kind": "ACCOUNT", # If not found, so, this is an account
}
})
- bloblang: |
root = this._doc.assign({
"data": {
"metadata": this.payload.metadata,
"ledger": this.ledger
},
"indexed": {
"ledger": this.ledger
},
"ledger": this.ledger,
"when": this.date
})
root = this.apply("committedTransactions")
- unarchive:
format: json_array
- bloblang: |
meta action = this.action
meta id = this.id
root = this.document
- label: SAVED_METADATA
processors:
- bloblang: |
meta targetType = this.payload.targetType
meta targetId = this.payload.targetId
meta ledger = this.payload.ledger
meta id = "%s-%s-%s".format(this.payload.targetType, this.payload.ledger, this.payload.targetId)
meta action = "upsert"
root = this
- get_doc:
id: ${!meta("id")}
- catch:
- log:
level: INFO
message: Document not found, assume it is an account
- bloblang: |
root = this.assign({
"_doc": {
"data": {
"address": meta("targetId"),
"metadata": {}
},
"indexed": {
"address": meta("targetId")
},
"kind": "ACCOUNT", # If not found, so, this is an account
}
})
- bloblang: |
root = this._doc.assign({
"data": {
"metadata": this.payload.metadata,
"ledger": this.payload.ledger
},
"indexed": {
"ledger": this.payload.ledger
},
"ledger": this.payload.ledger,
"when": this.date
})
output:
resource: elasticsearch
Loading

0 comments on commit 0e72f6b

Please sign in to comment.