-
Notifications
You must be signed in to change notification settings - Fork 32
/
contract_watcher.go
272 lines (236 loc) · 9.58 KB
/
contract_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
package watcher
import (
"context"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/jpillora/backoff"
"github.com/pkg/errors"
"github.com/synapsecns/sanguine/core/observer"
"github.com/synapsecns/sanguine/ethergo/chain/chainwatcher"
"github.com/teivah/onecontext"
)
// ContractFilterer is the filterer object.
type ContractFilterer interface {
bind.ContractFilterer
GetBigChainID() *big.Int
}
// contractWatcherImpl handles listening for logs on a contract and registering listeners/
// producers to listen for different events.
type contractWatcherImpl struct {
// ctx is the overriding context for the contract contractWatcher
//nolint: containedctx
ctx context.Context
// observer handles registering/deregistering listeners
// in the futures these should be wrapped to avoid type issues downstream
// see: https://pkg.go.dev/github.com/eapache/channels#Wrap (and: https://stackoverflow.com/a/25594926/1011803)
// generic typing in go solves this
//nolint: staticcheck
observer *observer.StringObserver
// producers is used to register producers on the channel
producers map[common.Address]interface{}
// producerLock locks the producer map
producerLock sync.RWMutex
// eventCount is the total number of events observed on all contracts
eventCount uint64
// client is the client used for interacting with the chain
client ContractFilterer
// heightWatcher is the block height watcher
heightWatcher chainwatcher.BlockHeightWatcher
// requiredConfirmations is how many confirmations we wait before finality
requiredConfirmations uint
}
// NewContractWatcher creates a new contract contractWatcher. A contract contractWatcher defines two types:
// a listener, and a producer. A producer is responsible for emitting events on a contract to
// listeners. There can only be one producer per contract. A listener processes these events.
// there can be (theoretically) unlimited listeners per producer. Listeners are responsible for
// registering and unregistering themselves.
// TODO: consider replacing with: https://pkg.go.dev/github.com/cryptoriums/[email protected]/pkg/tracker/events#New
func NewContractWatcher(ctx context.Context, contractListener ContractFilterer, heightWatcher chainwatcher.BlockHeightWatcher, requiredConfirmations uint) chainwatcher.ContractWatcher {
return &contractWatcherImpl{
ctx: ctx,
observer: observer.NewStringObserver(),
producers: make(map[common.Address]interface{}),
client: contractListener,
heightWatcher: heightWatcher,
requiredConfirmations: requiredConfirmations,
}
}
// ListenOnContract listens on a contract. The method here uses a string contract to remain chain agnostic.
func (c *contractWatcherImpl) ListenOnContract(ctx context.Context, contract string, eventLog chan interface{}) error {
ctx, _ = onecontext.Merge(ctx, c.ctx)
logger.Debugf("listening on contract %s", contract)
contractAddress := common.HexToAddress(contract)
// create the listener
c.addListener(contractAddress, eventLog)
// create the producer if it doesn't exist
err := c.createProducerIfNotExists(ctx, contractAddress)
if err != nil {
return err
}
return nil
}
// createProducerIfNotExists creates a producer for listening to events on contract address in the contract watcher.
func (c *contractWatcherImpl) createProducerIfNotExists(ctx context.Context, contractAddress common.Address) error {
// generate a merged context
ctx, cancel := onecontext.Merge(ctx, c.ctx)
producedEvents := make(chan types.Log)
err := c.addProducer(ctx, contractAddress, producedEvents)
// subscription already exists
//nolint:nilerr
if err != nil {
cancel()
return nil
}
heightSubscription := c.heightWatcher.Subscribe()
// TODO This belongs in a new create producer function that checks is hasProducer is true first.
// ideally w/ some kind of retries as well
go func() {
// prevent context leaks
// on every new height
defer c.heightWatcher.Unsubscribe(heightSubscription)
for {
select {
case <-ctx.Done():
return
case height := <-heightSubscription:
logs, _ := c.filterBlocks(ctx, contractAddress, height, heightSubscription)
for _, log := range logs {
select {
case <-ctx.Done():
return
case producedEvents <- log:
continue
}
}
}
}
}()
return nil
}
// filterBlocks is a block filterer with a backoff
// returns false for didFilter if the filter was not completed. Keeps retrying in case of an error.
func (c *contractWatcherImpl) filterBlocks(ctx context.Context, contractAddress common.Address, receivedHeight uint64, heightSubscription <-chan uint64) (logs []types.Log, didFilter bool) {
// backoff in the case of an error
b := &backoff.Backoff{
Factor: 2,
Jitter: true,
Min: 1 * time.Second,
Max: 30 * time.Second,
}
// get the new most recent finalized hegiht (chain tip - required confirmations)
startHeight := receivedHeight - uint64(c.requiredConfirmations)
// make sure we didn't overflow the uint64
if startHeight > receivedHeight {
return []types.Log{}, false
}
endHeight := c.getFilterEndHeight(receivedHeight, heightSubscription)
// timeout is 0 the first time and set by backoff on subsequent requests
timeout := time.Duration(0)
for {
select {
case <-ctx.Done():
return []types.Log{}, false
case <-time.After(timeout):
var err error
logs, err = c.client.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: big.NewInt(int64(startHeight)),
ToBlock: big.NewInt(int64(endHeight)),
Addresses: []common.Address{contractAddress},
})
if err != nil {
timeout = b.Duration()
logger.Errorf("got error %v when fetching logs from %d to %d on address %s will retry in %f seconds", err, startHeight, endHeight, contractAddress, timeout.Seconds())
continue
}
return logs, true
}
}
}
// getFilterEndHeight gets the latest height from a channel by draining the channel for 1 ms
// due to latency in getLogs/subsequent calls, or other machine induce latency
// producers can fall behind other nodes on occasion. Without a mechanism to depressurize these channels
// these producers can permanently fall behind. This function attempts to drain the channel of all current messages
// so we can get logs for the range. Ideally, this pressure build ups never actually occur.
//
// at the end, as with the start height required confirmations are subtracted so this does not need to be done by the caller.
func (c *contractWatcherImpl) getFilterEndHeight(startHeight uint64, heightChan <-chan uint64) (endHeight uint64) {
// populate the default
endHeight = startHeight
OUTER:
for {
select {
case newHeight := <-heightChan:
endHeight = newHeight
default:
break OUTER
}
}
return endHeight - uint64(c.requiredConfirmations)
}
// emit emits an event to a contract channel.
func (c *contractWatcherImpl) emit(address common.Address, event types.Log) {
c.observer.Emit(address.String(), event)
}
// addListener adds a listener to the observer for contract address to receive events
// on ch channel. wraps observer.addListener() to prevent exposing Emit() by fully
// extending observer
// TODO: this should not be an interface type.
func (c *contractWatcherImpl) addListener(address common.Address, ch chan interface{}) {
c.observer.AddListener(address.String(), ch)
}
// RemoveListener removes a listener from the observer for the contract address
// on a ch channel. wraps observer.RemoveListener to prevent exposing Emit() by fully
// extending observer
// TODO: this should not be an interface type.
func (c *contractWatcherImpl) RemoveListener(address common.Address, ch chan interface{}) {
c.observer.RemoveListener(address.String(), ch)
}
var errProducerAlreadyExists = errors.New("producer already exists")
// addProducer attempts to add a new producer to the contract contractWatcher. Note, since only
// one producer can be registered per address an error will be returned if the producer doe snot
// exist
// Any cancellations on the contractWatcherImpl context or the passed in context will cause the producer to cancel and be removed
// a message will be sent to errorChan on complete.
func (c *contractWatcherImpl) addProducer(ctx context.Context, address common.Address, producedEvents <-chan types.Log) error {
c.producerLock.Lock()
defer c.producerLock.Unlock()
if !c.hasProducer(address) {
logger.Debugf("adding contract watcher producer for %s", address)
c.producers[address] = struct{}{}
go func() {
// create the combined context of the producer and the contract contractWatcher
ctx, cancel := onecontext.Merge(ctx, c.ctx)
defer cancel()
for {
select {
// add an error to the error chan if the context is done
case <-ctx.Done():
logger.Warnf("contract watcher for contract %s finished", address.String())
return
// Emit any produced events to all listeners
case event := <-producedEvents:
atomic.AddUint64(&c.eventCount, 1)
logger.Warnf("got event in tx %s for contract %s at height %d", event.TxHash, event.Address, event.BlockNumber)
c.emit(address, event)
}
}
}()
return nil
}
return fmt.Errorf("could not add producer %s, %w", address.String(), errProducerAlreadyExists)
}
// hasProducer returns whether or not the contract producer has a producer already
// any caller to this method should call the lock before calling this method.
func (c *contractWatcherImpl) hasProducer(address common.Address) bool {
if _, ok := c.producers[address]; ok {
return true
}
return false
}