-
Notifications
You must be signed in to change notification settings - Fork 3
/
sr-main.c
372 lines (332 loc) · 14.6 KB
/
sr-main.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
/*
* This is statsd-router: metrics router for statsd cluster.
*
* Statsd (https://github.com/etsy/statsd/) is a very convenient way for collecting metrics.
* Statsd-router can be used to scale statsd. It accepts metrics and routes them across
* several statsd instances in such way, that each metric is processed by one and the
* same statsd instance. This is done in order not to corrupt data while using graphite
* as backend.
*
* Author: Kirill Timofeev <[email protected]>
*
* Enjoy :-)!
*
*/
#pragma GCC diagnostic ignored "-Wstrict-aliasing"
#include "sr-main.h"
// this function flushes data to downstream
void ds_flush_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) {
int bytes_send;
struct downstream_s *ds = (struct downstream_s *)watcher;
int flush_buffer_idx = ds->flush_buffer_idx;
if (EV_ERROR & revents) {
log_msg(WARN, "%s: invalid event %s", __func__, strerror(errno));
return;
}
bytes_send = sendto(watcher->fd,
ds->buffer + flush_buffer_idx * DOWNSTREAM_BUF_SIZE,
ds->buffer_length[flush_buffer_idx],
0,
(struct sockaddr *)&(ds->sa_in_data),
sizeof(ds->sa_in_data));
// update flush time
ds->buffer_length[flush_buffer_idx] = 0;
ds->flush_buffer_idx = (flush_buffer_idx + 1) % DOWNSTREAM_BUF_NUM;
if (ds->flush_buffer_idx == ds->active_buffer_idx) {
ev_io_stop(loop, watcher);
}
if (bytes_send < 0) {
log_msg(WARN, "%s: sendto() failed %s", __func__, strerror(errno));
}
}
// this function switches active and flush buffers, registers handler to send data when socket would be ready
void ds_schedule_flush(struct downstream_s *ds, struct ev_loop *loop) {
struct ev_io *watcher = (struct ev_io *)ds;
int new_active_buffer_idx = (ds->active_buffer_idx + 1) % DOWNSTREAM_BUF_NUM;
// if active_buffer_idx == flush_buffer_idx this means that all previous
// flushes are done (no filled buffers in the queue) and we need to schedule new one
int need_to_schedule_flush = (ds->active_buffer_idx == ds->flush_buffer_idx);
if (ds->buffer_length[new_active_buffer_idx] > 0) {
log_msg(WARN, "%s: previous flush is not completed, loosing data.", __func__);
ds->active_buffer_length = 0;
return;
}
ds->downstream_packet_counter++;
ds->downstream_traffic_counter += ds->active_buffer_length;
ds->buffer_length[ds->active_buffer_idx] = ds->active_buffer_length;
ds->active_buffer = ds->buffer + new_active_buffer_idx * DOWNSTREAM_BUF_SIZE;
ds->active_buffer_length = 0;
ds->active_buffer_idx = new_active_buffer_idx;
if (need_to_schedule_flush) {
ev_io_init(watcher, ds_flush_cb, *ds->socket_out, EV_WRITE);
ev_io_start(loop, watcher);
}
}
void push_to_downstream(struct downstream_s *ds, char *line, int length, struct ev_loop *loop) {
// check if we new data would fit in buffer
if (ds->active_buffer_length + length > DOWNSTREAM_BUF_SIZE) {
// buffer is full, let's flush data
ds_schedule_flush(ds, loop);
}
// let's add new data to buffer
memcpy(ds->active_buffer + ds->active_buffer_length, line, length);
// update buffer length
ds->active_buffer_length += length;
}
// this function pushes data to appropriate downstream using metrics name hash
int find_downstream(char *line, unsigned long hash, int length, int downstream_num, struct downstream_s *downstream, struct ev_loop *loop) {
// array to store downstreams for consistent hashing
int ds_index[downstream_num];
int i, j, k;
log_msg(TRACE, "%s: hash = %lx, length = %d, line = %.*s", __func__, hash, length, length, line);
// array is ordered before reshuffling
for (i = 0; i < downstream_num; i++) {
ds_index[i] = i;
}
// we have most config.downstream_num downstreams to cycle through
for (i = downstream_num; i > 0; i--) {
j = hash % i;
k = ds_index[j];
// k is downstream number for this metric, is it alive?
if ((downstream + k)->health_client->alive) {
log_msg(TRACE, "%s: pushing to downstream %d", __func__, k);
push_to_downstream(downstream + k, line, length, loop);
return 0;
} else {
(downstream + k)->active_buffer_length = 0;
}
if (j != i - 1) {
ds_index[j] = ds_index[i - 1];
ds_index[i - 1] = k;
}
// quasi random number sequence, distribution is bad without this trick
hash = (hash * 7 + 5) / 3;
}
log_msg(WARN, "%s: all downstreams are dead", __func__);
return 1;
}
// sdbm hashing (http://www.cse.yorku.ca/~oz/hash.html)
int hash(char *s, int length, unsigned long *result) {
int i;
char c;
unsigned long h = 0;
for (i = 0; i < length; i++) {
c = *(s + i);
if (c == ':') {
*result = h;
return 0;
}
h = (h << 6) + (h << 16) - h + c;
}
return 1;
}
// function to process single metrics line
int process_data_line(char *line, int length, int downstream_num, struct downstream_s *downstream, struct ev_loop *loop) {
unsigned long h = 0;
// if ':' wasn't found this is not valid statsd metric
if (hash(line, length, &h) != 0) {
*(line + length - 1) = 0;
log_msg(WARN, "%s: invalid metric %s", __func__, line);
return 1;
}
find_downstream(line, h, length, downstream_num, downstream, loop);
return 0;
}
void udp_read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) {
char buffer[DATA_BUF_SIZE];
ssize_t bytes_in_buffer;
char *buffer_ptr = buffer;
char *delimiter_ptr = buffer;
int line_length = 0;
int downstream_num = ((struct ev_io_ds_s *)watcher)->downstream_num;
struct downstream_s *downstream = ((struct ev_io_ds_s *)watcher)->downstream;
if (EV_ERROR & revents) {
log_msg(WARN, "%s: invalid event %s", __func__, strerror(errno));
return;
}
bytes_in_buffer = recv(watcher->fd, buffer, DATA_BUF_SIZE - 1, 0);
if (bytes_in_buffer < 0) {
log_msg(WARN, "%s: recv() failed %s", __func__, strerror(errno));
return;
}
if (bytes_in_buffer > 0) {
if (buffer[bytes_in_buffer - 1] != '\n') {
buffer[bytes_in_buffer++] = '\n';
}
log_msg(TRACE, "%s: got packet %.*s", __func__, bytes_in_buffer, buffer);
while ((delimiter_ptr = memchr(buffer_ptr, '\n', bytes_in_buffer)) != NULL) {
delimiter_ptr++;
line_length = delimiter_ptr - buffer_ptr;
// minimum metrics line should look like X:1|c\n
// so lines with length less than 6 can be ignored
if (line_length > 5 && line_length < DOWNSTREAM_BUF_SIZE) {
// if line has valid length let's process it
process_data_line(buffer_ptr, line_length, downstream_num, downstream, loop);
} else {
log_msg(WARN, "%s: invalid length %d of metric %.*s", __func__, line_length, line_length, buffer_ptr);
}
// this is not last metric, let's advance line start pointer
buffer_ptr = delimiter_ptr;
bytes_in_buffer -= line_length;
}
}
}
// this function cycles through downstreams and flushes them on scheduled basis
void ds_flush_timer_cb(struct ev_loop *loop, struct ev_periodic *p, int revents) {
int i;
struct downstream_s *downstream = ((struct ev_periodic_ds_s *)p)->downstream;
int downstream_num = ((struct ev_periodic_ds_s *)p)->downstream_num;
for (i = 0; i < downstream_num; i++) {
if ((downstream + i)->active_buffer_length > 0) {
ds_schedule_flush(downstream + i, loop);
}
}
}
void ping_cb(struct ev_loop *loop, struct ev_periodic *p, int revents) {
int i = 0;
int n = 0;
int count = 0;
char buffer[METRIC_SIZE];
struct downstream_s *ds;
int packets = 0;
int traffic = 0;
int downstream_num = ((struct ev_periodic_ds_s *)p)->downstream_num;
struct downstream_s *downstream = ((struct ev_periodic_ds_s *)p)->downstream;
char *alive_downstream_metric_name = ((struct ev_periodic_ds_s *)p)->string;
for (i = 0; i < downstream_num; i++) {
ds = downstream + i;
if (ds->health_client->alive) {
push_to_downstream(ds, ds->per_downstream_counter_metric, ds->per_downstream_counter_metric_length, loop);
count++;
}
traffic = ds->downstream_traffic_counter;
packets = ds->downstream_packet_counter;
ds->downstream_traffic_counter = 0;
ds->downstream_packet_counter = 0;
n = sprintf(buffer, "%s:%d|c\n%s:%d|c\n",
ds->downstream_traffic_counter_metric, traffic,
ds->downstream_packet_counter_metric, packets);
process_data_line(buffer, n, downstream_num, downstream, loop);
}
n = sprintf(buffer, "%s:%d|g\n", alive_downstream_metric_name, count);
process_data_line(buffer, n, downstream_num, downstream, loop);
}
void *data_pipe_thread(void *args) {
struct sockaddr_in addr;
struct ev_loop *loop = ev_loop_new(0);
struct thread_config_s *thread_config = (struct thread_config_s *)args;
struct ev_io_ds_s socket_watcher;
struct ev_periodic_ds_s ds_flush_timer_watcher;
struct ev_periodic_ds_s ping_timer_watcher;
ev_tstamp ds_flush_timer_at = 0.0;
ev_tstamp ping_timer_at = 0.0;
int socket_in = -1;
ev_tstamp downstream_flush_interval = thread_config->common->downstream_flush_interval;
int downstream_num = thread_config->common->downstream_num;
struct downstream_s *downstream = thread_config->common->downstream + thread_config->index * downstream_num;
int i = 0;
int optval = 1;
socket_in = socket(PF_INET, SOCK_DGRAM, 0);
if (socket_in < 0 ) {
log_msg(ERROR, "%s: socket_in socket() error %s", __func__, strerror(errno));
return NULL;
}
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(thread_config->common->data_port);
addr.sin_addr.s_addr = INADDR_ANY;
if (setsockopt(socket_in, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)) != 0) {
log_msg(ERROR, "%s: setsockopt() failed %s", __func__, strerror(errno));
return NULL;
}
if (bind(socket_in, (struct sockaddr*) &addr, sizeof(addr)) != 0) {
log_msg(ERROR, "%s: bind() failed %s", __func__, strerror(errno));
return NULL;
}
thread_config->socket_in = socket_in;
thread_config->socket_out = (int *)malloc(thread_config->common->socket_out_num * sizeof(int));
if (thread_config->socket_out == NULL) {
log_msg(ERROR, "%s: malloc() failed %s", __func__, strerror(errno));
return NULL;
}
for (i = 0; i < thread_config->common->socket_out_num; i++) {
*(thread_config->socket_out + i) = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (*(thread_config->socket_out + i) < 0 ) {
log_msg(ERROR, "%s: socket_out socket() error %s", __func__, strerror(errno));
return NULL;
}
}
for (i = 0; i < downstream_num; i++) {
(downstream + i)->socket_out = thread_config->socket_out + (i % thread_config->common->socket_out_num);
}
socket_watcher.downstream_num = downstream_num;
socket_watcher.downstream = downstream;
ev_io_init((struct ev_io *)&socket_watcher, udp_read_cb, socket_in, EV_READ);
ev_io_start(loop, (struct ev_io *)&socket_watcher);
ds_flush_timer_watcher.downstream_num = downstream_num;
ds_flush_timer_watcher.downstream = downstream;
ev_periodic_init ((struct ev_periodic *)(&ds_flush_timer_watcher), ds_flush_timer_cb, ds_flush_timer_at, downstream_flush_interval, 0);
ev_periodic_start (loop, (struct ev_periodic *)(&ds_flush_timer_watcher));
ping_timer_watcher.downstream_num = downstream_num;
ping_timer_watcher.downstream = downstream;
ping_timer_watcher.string = thread_config->alive_downstream_metric_name;
ev_periodic_init((struct ev_periodic *)&ping_timer_watcher, ping_cb, ping_timer_at, thread_config->common->downstream_ping_interval, 0);
ev_periodic_start (loop, (struct ev_periodic *)&ping_timer_watcher);
ev_loop(loop, 0);
log_msg(ERROR, "%s: ev_loop() exited", __func__);
return NULL;
}
int main(int argc, char *argv[]) {
struct ev_loop *loop = ev_loop_new(0);
struct sockaddr_in addr;
struct ev_io_control control_socket_watcher;
struct ev_periodic_health_client_s ds_health_check_timer_watcher;
int i;
int optval = 1;
int control_socket = -1;
ev_tstamp ds_health_check_timer_at = 0.0;
struct sr_config_s config;
if (argc != 2) {
fprintf(stdout, "Usage: %s config.file\n", argv[0]);
exit(1);
}
if (init_config(argv[1], &config) != 0) {
log_msg(ERROR, "%s: init_config() failed", __func__);
exit(1);
}
control_socket = socket(PF_INET, SOCK_STREAM, 0);
if (control_socket < 0 ) {
log_msg(ERROR, "%s: socket() error %s", __func__, strerror(errno));
return(1);
}
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(config.control_port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(control_socket, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
log_msg(ERROR, "%s: bind() failed %s", __func__, strerror(errno));
return(1);
}
setsockopt(control_socket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
if (listen(control_socket, 4096) < 0) {
log_msg(ERROR, "%s: listen() error %s", __func__, strerror(errno));
return(1);
}
config.control_socket = control_socket;
control_socket_watcher.health_response = config.health_check_response_buf;
control_socket_watcher.health_response_len = &config.health_check_response_buf_length;
ev_io_init((struct ev_io *)&control_socket_watcher, control_accept_cb, control_socket, EV_READ);
ev_io_start(loop, (struct ev_io *)&control_socket_watcher);
ds_health_check_timer_watcher.downstream_num = config.downstream_num;
ds_health_check_timer_watcher.health_client = config.health_client;
ev_periodic_init((struct ev_periodic *)&ds_health_check_timer_watcher, ds_health_check_timer_cb, ds_health_check_timer_at, config.downstream_health_check_interval, 0);
ev_periodic_start(loop, (struct ev_periodic *)&ds_health_check_timer_watcher);
for (i = 0; i < config.threads_num; i++) {
(config.thread_config + i)->index = i;
(config.thread_config + i)->common = &config;
pthread_create(&(config.thread_config + i)->thread, NULL, data_pipe_thread, (void *)(config.thread_config + i));
}
ev_loop(loop, 0);
log_msg(ERROR, "%s: ev_loop() exited", __func__);
return(0);
}