forked from ElektraInitiative/libelektra
-
Notifications
You must be signed in to change notification settings - Fork 0
/
publish.c
403 lines (367 loc) · 9.7 KB
/
publish.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
/**
* @file
*
* @brief
*
* @copyright BSD License (see LICENSE.md or https://www.libelektra.org)
*/
#include "zeromqsend.h"
#include <kdbhelper.h>
#include <kdblogger.h>
#include <time.h> // clock_gettime()
#include <unistd.h> // usleep()
/** wait inside loop while waiting for messages (10ms) */
#define ELEKTRA_ZEROMQSEND_LOOPDELAY_NS (10 * 1000 * 1000)
/** first byte of a subscription message */
#define ELEKTRA_ZEROMQSEND_SUBSCRIPTION_MESSAGE '\x01'
#define ELEKTRA_ZEROMQSEND_MONITOR_ENDPOINT "inproc://zmqpublish-monitor"
/**
* Receive and return events from a ZeroMQ monitor socket.
*
* @param monitor monitor socket
* @return ZMQ_EVENT_ number
* @retval 0 if receiving interrupted or no message available
* @retval -1 on invalid message
*/
static int getMonitorEvent (void * monitor)
{
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1)
{
// presumably interrupted or no message available
return 0;
}
if (!zmq_msg_more (&msg))
{
ELEKTRA_LOG_WARNING ("Invalid monitor message received!");
return -1;
}
uint8_t * data = (uint8_t *) zmq_msg_data (&msg);
uint16_t event = *(uint16_t *) (data);
// Second frame in message contains event address
// We receive it to clear the buffer, since we are only
// interested in the event number
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1)
{
// presumably interrupted
return 0;
}
if (zmq_msg_more (&msg))
{
ELEKTRA_LOG_WARNING ("Invalid monitor message received!");
return -1;
}
return event;
}
static struct timespec ts_diff (struct timespec now, struct timespec start)
{
struct timespec diff;
if ((now.tv_nsec - start.tv_nsec) < 0)
{
diff.tv_sec = now.tv_sec - start.tv_sec - 1;
diff.tv_nsec = 1000000000 + now.tv_nsec - start.tv_nsec;
}
else
{
diff.tv_sec = now.tv_sec - start.tv_sec;
diff.tv_nsec = now.tv_nsec - start.tv_nsec;
}
return diff;
}
/**
* Wait for connection message from ZeroMQ monitor socket.
*
* @param monitorSocket socket
* @retval 1 when connected
* @retval -1 on timeout
* @retval 0 on other errors
*/
static int waitForConnection (void * monitorSocket, long connectTimeout)
{
struct timespec wait;
struct timespec start;
struct timespec now;
struct timespec diff;
time_t startFallback = -1;
long timeoutSec = (connectTimeout / (1000));
long timeoutNsec = (connectTimeout % (1000)) * (1000 * 1000);
if (clock_gettime (CLOCK_MONOTONIC, &start) == -1)
{
ELEKTRA_LOG_WARNING ("Using slower fallback for timeout detection");
startFallback = time (NULL);
// minimum timeout is 1 second when using the fallback
if (timeoutSec == 0)
{
timeoutSec = 1;
}
}
// wait for connection established event
int connected = 0;
while (!connected)
{
wait.tv_sec = 0;
wait.tv_nsec = ELEKTRA_ZEROMQSEND_LOOPDELAY_NS;
while (!nanosleep (&wait, &wait) && errno == EINTR)
;
int event = getMonitorEvent (monitorSocket);
int timeout = 0;
if (startFallback == -1)
{
clock_gettime (CLOCK_MONOTONIC, &now);
diff = ts_diff (now, start);
timeout = diff.tv_sec >= timeoutSec && diff.tv_nsec >= timeoutNsec;
}
else
{
timeout = time (NULL) - startFallback >= timeoutSec;
}
if (timeout)
{
ELEKTRA_LOG_WARNING ("connection timed out. could not publish notification");
zmq_close (monitorSocket);
return -1;
}
switch (event)
{
case ZMQ_EVENT_CONNECTED:
// we do not need the publisher monitor anymore
zmq_close (monitorSocket);
connected = 1;
break;
case -1:
// abort, inconsistencies detected
ELEKTRA_LOG_WARNING ("Cannot monitor connection events");
return 0;
break;
case 0:
// no message available or interrupted, try again
break;
default:
// other ZMQ event, ignore
break;
}
}
return 1;
}
/**
* Wait for first subscription message on ZeroMQ socket.
*
* @param socket socket
* @retval 1 on success
* @retval -2 on timeout
* @retval 0 on other errors
*/
static int waitForSubscription (void * socket, long subscribeTimeout)
{
struct timespec start;
struct timespec now;
struct timespec wait;
struct timespec diff;
time_t startFallback = -1;
long timeoutSec = (subscribeTimeout / (1000));
long timeoutNsec = (subscribeTimeout % (1000)) * (1000 * 1000);
if (clock_gettime (CLOCK_MONOTONIC, &start) == -1)
{
ELEKTRA_LOG_WARNING ("Using slower fallback for timeout detection");
startFallback = time (NULL);
// minimum timeout is 1 second when using the fallback
if (timeoutSec == 0)
{
timeoutSec = 1;
}
}
// wait until we receive the first subscription message
zmq_msg_t message;
zmq_msg_init (&message);
int hasSubscriber = 0;
int lastErrno = 0;
do
{
wait.tv_sec = 0;
wait.tv_nsec = ELEKTRA_ZEROMQSEND_LOOPDELAY_NS;
while (!nanosleep (&wait, &wait) && errno == EINTR)
;
lastErrno = 0;
int result = zmq_msg_recv (&message, socket, ZMQ_DONTWAIT);
if (result == -1)
{
lastErrno = zmq_errno ();
}
int timeout = 0;
if (startFallback == -1)
{
clock_gettime (CLOCK_MONOTONIC, &now);
diff = ts_diff (now, start);
timeout = diff.tv_sec >= timeoutSec && diff.tv_nsec >= timeoutNsec;
}
else
{
timeout = time (NULL) - startFallback >= timeoutSec;
}
if (timeout)
{
ELEKTRA_LOG_WARNING ("subscribing timed out. could not publish notification");
zmq_msg_close (&message);
return -2;
}
if (result == -1)
{
if (lastErrno != EAGAIN)
{
ELEKTRA_LOG_WARNING ("receiving failed %s", zmq_strerror (lastErrno));
zmq_msg_close (&message);
return 0;
}
}
else
{
// we have received a message subscription or unsubscription message
char * messageData = zmq_msg_data (&message);
if (messageData[0] == ELEKTRA_ZEROMQSEND_SUBSCRIPTION_MESSAGE)
{
hasSubscriber = 1;
}
}
} while (lastErrno == EAGAIN && !hasSubscriber);
zmq_msg_close (&message);
return 1;
}
/**
* @internal
* Connect to ZeroMq SUB or XSUB socket bound at endpoint.
*
* @param data plugin data
* @retval 1 on success
* @retval 0 on error
*/
int elektraZeroMqSendConnect (ElektraZeroMqSendPluginData * data)
{
// create zmq context
if (!data->zmqContext)
{
data->zmqContext = zmq_ctx_new ();
if (data->zmqContext == NULL)
{
ELEKTRA_LOG_WARNING ("zmq_ctx_new failed %s", zmq_strerror (zmq_errno ()));
return 0;
}
}
if (!data->zmqPublisher)
{
// create publish socket
data->zmqPublisher = zmq_socket (data->zmqContext, ZMQ_XPUB);
if (data->zmqPublisher == NULL)
{
ELEKTRA_LOG_WARNING ("zmq_socket failed %s", zmq_strerror (zmq_errno ()));
zmq_close (data->zmqPublisher);
return 0;
}
// setup socket monitor
if (zmq_socket_monitor (data->zmqPublisher, ELEKTRA_ZEROMQSEND_MONITOR_ENDPOINT, ZMQ_EVENT_CONNECTED) == -1)
{
ELEKTRA_LOG_WARNING ("creating socket monitor failed: %s", zmq_strerror (zmq_errno ()));
return 0;
}
data->zmqPublisherMonitor = zmq_socket (data->zmqContext, ZMQ_PAIR);
if (zmq_connect (data->zmqPublisherMonitor, ELEKTRA_ZEROMQSEND_MONITOR_ENDPOINT) != 0)
{
ELEKTRA_LOG_WARNING ("connecting to socket monitor failed: %s", zmq_strerror (zmq_errno ()));
return 0;
}
// connect to endpoint
int result = zmq_connect (data->zmqPublisher, data->endpoint);
if (result != 0)
{
ELEKTRA_LOG_WARNING ("zmq_connect error: %s", zmq_strerror (zmq_errno ()));
zmq_close (data->zmqPublisher);
data->zmqPublisher = NULL;
return 0;
}
}
return 1;
}
/**
* Publish notification on ZeroMq connection.
*
* @param changeType type of change
* @param keyName name of changed key
* @param data plugin data
* @retval 1 on success
* @retval -1 on connection timeout
* @retval -2 on subscription timeout
* @retval 0 on other errors
*/
int elektraZeroMqSendPublish (const char * changeType, const char * keyName, ElektraZeroMqSendPluginData * data)
{
if (!elektraZeroMqSendConnect (data))
{
ELEKTRA_LOG_WARNING ("could not connect to endpoint");
return 0;
}
// wait for subscription message
if (!data->hasSubscriber)
{
// NOTE zmq_connect() returns before a connection is established since
// ZeroMq asynchronously does that in the background.
// All notifications sent before the connection is established and and the
// socket has a subscriber are lost since ZMQ_(X)PUB sockets handle message
// filtering: Without subscribers all messages are discarded.
// Therefore we monitor the socket for until the connection is established
// and then wait for the first subscription message.
// A ZMQ_XPUB socket instead of a ZMQ_PUB socket allows us to receive
// subscription messages
int result = waitForConnection (data->zmqPublisherMonitor, data->connectTimeout);
if (result != 1)
{
return result;
}
result = waitForSubscription (data->zmqPublisher, data->subscribeTimeout);
if (result == 1)
{
data->hasSubscriber = 1;
}
else
{
return result;
}
}
// send notification
if (!elektraZeroMqSendNotification (data->zmqPublisher, changeType, keyName))
{
ELEKTRA_LOG_WARNING ("could not send notification");
return 0;
}
return 1;
}
/**
* @internal
* Send notification over ZeroMq socket.
*
* zmq_send() asynchronous.
* Processing already handled in a thread created by ZeroMq.
*
* @param socket ZeroMq socket
* @param changeType type of change
* @param keyName name of changed key
* @retval 1 on success
* @retval 0 on error
*/
int elektraZeroMqSendNotification (void * socket, const char * changeType, const char * keyName)
{
unsigned int size;
// Send change type
size = zmq_send (socket, changeType, elektraStrLen (changeType), ZMQ_SNDMORE);
if (size != elektraStrLen (changeType))
{
return 0;
}
size = zmq_send (socket, keyName, elektraStrLen (keyName), 0);
if (size != elektraStrLen (keyName))
{
return 0;
}
return 1;
}