-
Notifications
You must be signed in to change notification settings - Fork 5
/
queryS3Express.ts
127 lines (101 loc) · 4.29 KB
/
queryS3Express.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import { APIGatewayEvent, Context } from 'aws-lambda';
import DuckDB from 'duckdb';
import { metricScope, Unit } from 'aws-embedded-metrics';
import Logger from '../lib/logger';
import { filterQuery } from '../lib/queryFilter';
import { getAWSSecretQuery } from '../lib/awsSecret';
// Patch BigInt
(BigInt.prototype as any).toJSON = function() {
return this.toString()
}
// Instantiate logger
const logger = new Logger({
name: 'duckdb-sync-logger',
}).getInstance();
// Instantiate DuckDB
const duckDB = new DuckDB.Database(':memory:', { allow_unsigned_extensions: 'true' });
// Create connection
const connection = duckDB.connect();
// Store initialization
let isInitialized = false;
// Promisify query method
const query = (query: string, isRemoteQuery: boolean = true) => {
return new Promise((resolve, reject) => {
connection.all(filterQuery(query, isRemoteQuery), (err, res) => {
if (err) reject(err);
resolve(res);
})
})
}
// SIGTERM Handler
process.on('SIGTERM', async () => {
logger.debug('[runtime] SIGTERM received');
logger.debug('[runtime] cleaning up');
// Add your cleanup code here!
logger.debug('[runtime] exiting');
process.exit(0)
});
// eslint-disable-next-line import/prefer-default-export
export const handler = metricScope(metrics => async (event: APIGatewayEvent, context: Context) => {
// Setup logger
const requestLogger = logger.child({ requestId: context.awsRequestId });
requestLogger.debug({ event, context });
// Setup metrics
metrics.putDimensions({ Service: 'QueryService' });
metrics.setProperty('RequestId', context.awsRequestId);
try {
if (!event.body) {
throw 'No body present!';
} else {
// Parse event body with query
const body = JSON.parse(event.body);
if (!body.hasOwnProperty('query')) {
throw 'Missing query property in request body!';
}
// Check if DuckDB has been initalized
if (!isInitialized) {
const initialSetupStartTimestamp = new Date().getTime();
// Load home directory
await query(`SET home_directory='/tmp';`, false);
// Install and load local extensions
await query(`INSTALL '/opt/nodejs/node_modules/duckdb/extensions/aws.duckdb_extension';`, false);
await query(`LOAD '/opt/nodejs/node_modules/duckdb/extensions/aws.duckdb_extension';`, false);
await query(`INSTALL '/opt/nodejs/node_modules/duckdb/extensions/httpfs.duckdb_extension';`, false);
await query(`LOAD '/opt/nodejs/node_modules/duckdb/extensions/httpfs.duckdb_extension';`, false);
await query(`INSTALL '/opt/nodejs/node_modules/duckdb/extensions/arrow.duckdb_extension';`, false);
await query(`LOAD '/opt/nodejs/node_modules/duckdb/extensions/arrow.duckdb_extension';`, false);
// Whether or not the global http metadata is used to cache HTTP metadata, see https://github.com/duckdb/duckdb/pull/5405
await query(`SET enable_http_metadata_cache=true;`, false);
// Whether or not object cache is used to cache e.g. Parquet metadata
await query(`SET enable_object_cache=true;`, false);
requestLogger.debug({ message: 'Initial setup done!' });
metrics.putMetric('InitialSetupDuration', (new Date().getTime() - initialSetupStartTimestamp), Unit.Milliseconds);
const awsSetupStartTimestamp = new Date().getTime();
// Set AWS credentials, endpoint and region
await query(getAWSSecretQuery(), false);
requestLogger.debug({ message: 'AWS setup done!' });
metrics.putMetric('AWSSetupDuration', (new Date().getTime() - awsSetupStartTimestamp), Unit.Milliseconds);
// Store initialization
isInitialized = true;
}
// Track query start timestamp
const queryStartTimestamp = new Date().getTime();
// Run query
const queryResult = await query(body.query);
requestLogger.debug({ queryResult });
metrics.putMetric('QueryDuration', (new Date().getTime() - queryStartTimestamp), Unit.Milliseconds);
return {
statusCode: 200,
body: JSON.stringify(queryResult),
}
}
} catch (err: any) {
requestLogger.error(err);
return {
statusCode: 400,
body: JSON.stringify({
error: (err.message ? err.message : 'Unknown error encountered'),
}),
}
}
})