forked from zetamarkets/zeta-transaction-indexer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.ts
409 lines (369 loc) · 15.1 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
import {
ConfirmedSignatureInfo,
Connection,
PublicKey,
TransactionSignature,
} from "@solana/web3.js";
import { sendMessage } from "./utils/sqs";
import { SolanaRPC } from "./utils/rpc";
import { MAX_SIGNATURE_BATCH_SIZE, DEBUG_MODE } from "./utils/constants";
import { sleep } from "@zetamarkets/sdk/dist/utils";
import {
readFrontfillCheckpoint,
readBackfillCheckpoint,
writeFrontfillCheckpoint,
writeBackfillCheckpoint,
} from "./utils/dynamodb";
import { ConfirmedSignatureInfoShort } from "./utils/types";
let rpc = new SolanaRPC(
process.env.RPC_URL_1,
process.env.RPC_URL_2,
"finalized"
);
async function indexSignaturesForAddress(
address: PublicKey,
before?: TransactionSignature,
until?: TransactionSignature,
backfill_complete?: boolean,
old_top_block_time?: number,
old_top_slot?: number,
) {
let sigs: ConfirmedSignatureInfo[];
let top: ConfirmedSignatureInfoShort = {
signature: before,
blockTime: undefined,
slot: undefined,
};
let bottom: ConfirmedSignatureInfoShort = {
signature: until,
blockTime: old_top_block_time,
slot: old_top_slot,
};
if (!backfill_complete && before === undefined && until === undefined) {
console.info("Not top defined in backfill, write front fill checkpoint immediately");
var backfill_frontfill_checkpoint_write = true;
} else {
var backfill_frontfill_checkpoint_write = false;
}
let firstFlag = true;
let newTop: ConfirmedSignatureInfoShort;
let prev_bottom: ConfirmedSignatureInfoShort = {
signature: undefined,
blockTime: undefined,
slot: undefined,
};
let simultaneousBottoms = new Object();
simultaneousBottoms["blockTime"] = undefined;
simultaneousBottoms["slot"] = undefined;
do {
sigs = await rpc.getSignaturesForAddressWithRetries(address, {
before: top.signature,
until: bottom.signature,
limit: 1000,
});
sigs.reverse();
// Retry Logic if theres an empty sig list retry c times with some sleep and backoff/wait
let c = 1;
let backoff = 1.35;
let sleepTime = 1000;
while ((sigs.length < 1) && c < 5) {
console.warn(`[WARN] No signatures found, retrying in ${sleepTime}ms...`);
await sleep(sleepTime);
console.warn(`[WARN] No signatures found, retrying ${c} time(s)...`);
sigs = await rpc.getSignaturesForAddressWithRetries(address, {
before: top.signature,
until: bottom.signature,
limit: 1000,
});
sigs.reverse();
c += 1;
sleepTime *= backoff;
}
// ======== FRONTFILL ONLY ========
// If we are frontfilling need to check for same blocktimed txs from the bottom
if (sigs.length > 0 && backfill_complete) {
// Settign temp bottom (since its possible they are removed from the list)
let temp_bottom = {
signature: sigs[0].signature,
blockTime: sigs[0].blockTime,
slot: sigs[0].slot,
};
// Checking if the bottom of the current run matches the blocktime and slot of the final bottom (until)
console.info(`[INFO] Previous Bottom: ${prev_bottom.signature}, Blocktime: ${prev_bottom.blockTime}, Slot: ${prev_bottom.slot}`);
console.info(`[INFO] Current Bottom : ${temp_bottom.signature}, Blocktime: ${temp_bottom.blockTime}, Slot: ${temp_bottom.slot}`);
console.info(`[INFO] Final Bottom : ${until}, Blocktime: ${old_top_block_time}, Slot: ${old_top_slot}`);
// Checking for the case where the last two+ txs have the same blocktime and slot
let checkFlag = false;
if (sigs.length > 1) {
if (temp_bottom.blockTime == sigs[1].blockTime && temp_bottom.slot == sigs[1].slot) {
console.info(`[INFO] Check Flag: True`);
checkFlag = true;
}
}
// If the bottom of the current run matches the blocktime and slot of the final bottom (until)
if ((temp_bottom.blockTime === old_top_block_time && temp_bottom.slot === old_top_slot) || (temp_bottom.blockTime === prev_bottom.blockTime && temp_bottom.slot === prev_bottom.slot) || (checkFlag)) {
console.info("[INFO] Found bottom of frontfill with MATCHING blocktime and slot!");
let simultaneousBottomIndexes = [];
if ((temp_bottom.blockTime != simultaneousBottoms["blockTime"] && temp_bottom.slot != simultaneousBottoms["slot"]) && (temp_bottom.blockTime != old_top_block_time && temp_bottom.slot != old_top_slot)) {
// If the recent bottom isnt the same as the previous bottom or the final bottom we can reset the simultaneousBottoms
console.info('[INFO] Resetting simultaneousBottoms');
simultaneousBottoms = new Object();
simultaneousBottoms["blockTime"] = temp_bottom.blockTime;
simultaneousBottoms["slot"] = temp_bottom.slot;
}
for (var i = 0; i < sigs.length; i += 1) {
// Checking from the bottom upwards which txs are the exact same blocktime and slot
if (sigs[i].blockTime === old_top_block_time && sigs[i].slot === old_top_slot || sigs[i].blockTime === prev_bottom.blockTime && sigs[i].slot === temp_bottom.slot) {
if (simultaneousBottoms[sigs[i].signature] === undefined) {
// If this signature has not yet been recorded, add to the object
simultaneousBottoms[sigs[i].signature] = sigs[i];
} else if (((sigs[i].blockTime === old_top_block_time && sigs[i].slot === old_top_slot) || (sigs[i].blockTime === prev_bottom.blockTime && sigs[i].slot === temp_bottom.slot)) && simultaneousBottoms[sigs[i].signature] != undefined) {
// If this signature has already been recorded, record the index for splicing
simultaneousBottomIndexes.push(i);
} else {
// Once the above conditions are not met break, since all following txs are later/diff slot
break;
}
}
}
console.info(`[INFO] Bottom TXs / Repeated Indexes Length: ${Object.keys(simultaneousBottoms).length - 2} / ${simultaneousBottomIndexes.length}`);
// Reverse the index list so its largest first
simultaneousBottomIndexes.reverse();
// Splice out the signatures that have already been recorded (largest index first so smaller indexes remain valid)
console.info(`[INFO] SIGS LENGTH (BEFORE): ${sigs.length}`);
for (var j = 0; j < simultaneousBottomIndexes.length; j += 1) {
// Secondary Check
if (simultaneousBottoms[sigs[simultaneousBottomIndexes[j]].signature] != undefined) {
// Splice out the simultaneous bottom txs
sigs.splice(simultaneousBottomIndexes[j], 1);
}
}
console.info(`[INFO] SIGS LENGTH (AFTER): ${sigs.length}`);
}
// Rememeber the previous bottom
prev_bottom = temp_bottom;
}
// ======== BOTH FRONTFILL & BACKFILL ========
// Checking if any sigs were returned
if (sigs.length > 0) {
// Set top and bottom of current run
top = {
signature: sigs[sigs.length - 1].signature,
blockTime: sigs[sigs.length - 1].blockTime,
slot: sigs[sigs.length - 1].slot,
};
bottom = {
signature: sigs[0].signature,
blockTime: sigs[0].blockTime,
slot: sigs[0].slot,
};
// For first iteration
if (firstFlag) {
// Start process (1st iteration) - write new top
newTop = top;
firstFlag = false;
}
// infoging the bottom (oldest tx) to the top (most recent tx) of the current run
console.info(
`[INFO] Indexed [${sigs.length}] txs: ${sigs[0].signature} (${new Date(
sigs[0].blockTime * 1000
).toISOString()}) - ${sigs[sigs.length - 1].signature} (${new Date(
sigs[sigs.length - 1].blockTime * 1000
).toISOString()})`
);
// Write Frontfill checkpoint immediately after first run
if (backfill_frontfill_checkpoint_write) {
console.info(`[INFO] Writing Frontfill Checkpoint IMMEDIATELY after first backfill run`);
writeFrontfillCheckpoint(
process.env.CHECKPOINT_TABLE_NAME,
newTop.signature,
newTop.blockTime,
newTop.slot,
);
backfill_frontfill_checkpoint_write = false;
}
// Push Messages to SQS
if (!DEBUG_MODE) {
sendMessage(
sigs.map((s) => s.signature),
process.env.SQS_QUEUE_URL
);
}
// Update Local Pointers
top = bottom;
bottom = {
signature: until,
blockTime: old_top_block_time,
slot: old_top_slot,
};
if (!backfill_complete) {
// Update DynamoDB Checkpoint
writeBackfillCheckpoint(
process.env.CHECKPOINT_TABLE_NAME,
top.signature,
undefined, // can to top or undefined both work here... (more useful for specific backfilling scenarios)
false,
);
}
} else {
// No more signatures to index
console.info(`[INFO] Signature list is empty ${sigs}`);
// Never occurs on the first run (have to check again if there are 0 sigs on subsequent run to confirm)
if (!firstFlag) {
// Only writes frontfill checkpoint at the end in a front fill scenario
if (backfill_complete) {
writeFrontfillCheckpoint(
process.env.CHECKPOINT_TABLE_NAME,
newTop.signature,
newTop.blockTime,
newTop.slot,
);
} else {
// Mark backfill as complete (true) (old top should be written on first run of backfill)
writeBackfillCheckpoint(
process.env.CHECKPOINT_TABLE_NAME,
undefined,
undefined, // can to top or undefined both work here... (more useful for specific backfilling scenarios)
true,
);
// Write Frontfill Checkpoint IF ts undefined for some reason (cases where defined backfill is run without ff checkpoint existing)
let { old_top, old_top_block_time, old_top_slot } = await readFrontfillCheckpoint(
process.env.CHECKPOINT_TABLE_NAME
);
if (old_top == undefined) {
console.log("[INFO] Writing FF checkpoint after backfill complete since none was found...")
writeFrontfillCheckpoint(
process.env.CHECKPOINT_TABLE_NAME,
newTop.signature,
newTop.blockTime,
newTop.slot,
);
}
}
} else {
console.info('[INFO] First Flag is true AND no new sigs found, skipping write checkpoints.');
}
break;
}
if (sigs[0] == undefined || !sigs[sigs.length - 1] == undefined) {
console.error("[ERROR] Null signature detected");
}
} while (true);
return { bottom: bottom, top: top };
}
export const refreshConnection = async () => {
rpc.connection = new Connection(rpc.nodeUrl, rpc.commitmentOrConfig);
};
const main = async () => {
if (DEBUG_MODE) {
console.info("[INFO] Running in debug mode, will not push to AWS buckets");
}
// Periodically read checkpoints to confirm there are changes occuring
let prev_backfill = {
incomplete_top: undefined,
bottom_sig: undefined,
backfill_complete: undefined
};
let prev_frontfill = {
old_top: undefined,
old_top_block_time: undefined,
old_top_slot: undefined
};
setInterval(async () => {
let new_backfill = await readBackfillCheckpoint(process.env.CHECKPOINT_TABLE_NAME);
let new_frontfill = await readFrontfillCheckpoint(process.env.CHECKPOINT_TABLE_NAME);
if (
new_backfill.incomplete_top === prev_backfill.incomplete_top &&
new_backfill.bottom_sig === prev_backfill.bottom_sig &&
new_backfill.backfill_complete === prev_backfill.backfill_complete &&
new_frontfill.old_top === prev_frontfill.old_top &&
new_frontfill.old_top_block_time === prev_frontfill.old_top_block_time &&
new_frontfill.old_top_slot === prev_frontfill.old_top_slot) {
console.error("[ERROR] No change in checkpoints");
// Kill container task
process.exit(1);
} else {
console.info("[INFO] Checkpoints have changed, continuing...");
console.info(`Backfill Checkpoint Changes: ${JSON.stringify(prev_backfill)} - ${JSON.stringify(new_backfill)}`);
console.info(`Frontfill Checkpoint Changes: ${JSON.stringify(prev_frontfill)} - ${JSON.stringify(new_frontfill)}`);
prev_backfill = new_backfill;
prev_frontfill = new_frontfill;
}
}, 1000 * 60 * 30); // Check every 30 minutes
// Periodic refresh of rpc connection to prevent hangups
setInterval(async () => {
console.info("%c[INFO] Refreshing rpc connection", "color: cyan");
refreshConnection();
}, 1000 * 60 * 5); // Refresh every 5 minutes
if (process.env.RESET === "true") {
console.info("[INFO] Resetting checkpoints...");
writeFrontfillCheckpoint(
process.env.CHECKPOINT_TABLE_NAME,
undefined,
undefined,
undefined,
);
writeBackfillCheckpoint(
process.env.CHECKPOINT_TABLE_NAME,
undefined,
undefined,
false,
);
}
let top: ConfirmedSignatureInfoShort;
let bottom: ConfirmedSignatureInfoShort;
// Start Indexing
while (true) {
// get pointers from storage
let { incomplete_top, bottom_sig, backfill_complete } = await readBackfillCheckpoint(
process.env.CHECKPOINT_TABLE_NAME
);
console.info(`[INFO] Incomplete Top: ${incomplete_top}, Bottom: ${bottom_sig}, Backfill Complete: ${backfill_complete}`);
if (process.env.FRONTFILL_ONLY === "true") {
// Frontfill only mode
console.info("[INFO] Running in frontfill only mode...");
backfill_complete = true;
}
if (backfill_complete) {
// Frontfill
// Checking where the old 'top' was...
let { old_top, old_top_block_time, old_top_slot } = await readFrontfillCheckpoint(
process.env.CHECKPOINT_TABLE_NAME
);
if (!old_top) {
// Old top is undefined something is wrong, proceed to backfill
console.error("[ERROR] Backfilling Required, Setting Backfill to false")
let { incomplete_top, bottom_sig, backfill_complete } = await readBackfillCheckpoint(
process.env.CHECKPOINT_TABLE_NAME
);
writeBackfillCheckpoint( process.env.CHECKPOINT_TABLE_NAME, incomplete_top, bottom_sig, false);
} else {
// ...and indexing from the front to the old top
console.info(`[INFO] Frontfilling: Indexing up until: ${old_top}`);
({ bottom, top } = await indexSignaturesForAddress(
new PublicKey(process.env.PROGRAM_ID),
undefined,
old_top,
backfill_complete,
old_top_block_time,
old_top_slot,
));
}
} else {
// Backfill
console.info(`[INFO] No prior data, proceeding to backfill. Starting at: ${incomplete_top}`);
({ bottom, top } = await indexSignaturesForAddress(
new PublicKey(process.env.PROGRAM_ID),
incomplete_top,
bottom_sig,
backfill_complete,
undefined,
undefined,
));
console.info(`[INFO] Backfill Complete!`);
}
console.info("[INFO] Indexing up to date, waiting a few seconds...");
await sleep(10000); // 10 seconds
}
};
main().catch(console.error.bind(console));