-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththread_io.c
133 lines (97 loc) · 3.04 KB
/
thread_io.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*!
* \file thread_io.c
* \brief Implementation of functions related to the I/O operations performed by all
* working threads.
* \author Henrique Nascimento Gouveia <[email protected]>
*/
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <sys/mman.h>
#include "internal.h"
#include "daemon.h"
#include "resultio.h"
#include "requestio.h"
/*!
* Clean up handler called when a thread is cancelled.
*
* \param arg argument passed to this cleaner when it was called. Actually, this is a
* pointer to the daemon structure.
*/
static void thread_cleanup_routine(void *arg)
{
struct akwbs_daemon *daemon_p = NULL;
if (arg == NULL)
return;
daemon_p = (struct akwbs_daemon *)arg;
pthread_mutex_unlock(&daemon_p->request_io_queue_mutex);
}
/*!
* Set up the calling thread.
*
* \return AKWBS_SUCCESS on succes setting up this thread.
* AKWBS_ERROR on error while trying to set up this thread.
*
* \details The set up is do.ne by detaching the thread, enabling cancellation on deferred
* type.
*/
static int setup_io_thread(void)
{
if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) == AKWBS_ERROR)
return AKWBS_ERROR;
if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) == AKWBS_ERROR)
return AKWBS_ERROR;
return AKWBS_SUCCESS;
}
/*!
* Main routine of working threads.
*
* \param arg argument to this routine.
*/
void *akwbs_thread_io_routine(void *arg)
{
struct akwbs_daemon *daemon_p = NULL;
struct akwbs_request_io_msg msg;
struct akwbs_result_io result_msg;
if (arg == NULL)
pthread_exit(NULL);
daemon_p = (struct akwbs_daemon *)arg;
if (setup_io_thread() == AKWBS_ERROR)
pthread_exit(NULL);
pthread_cleanup_push(thread_cleanup_routine, daemon_p)
while (1)
{
ssize_t ret = 0;
bzero(&msg, sizeof(struct akwbs_request_io_msg));
bzero(&result_msg, sizeof(struct akwbs_result_io));
if (pthread_mutex_lock(&daemon_p->request_io_queue_mutex) != AKWBS_SUCCESS)
pthread_exit(NULL);
while ((ret = read(daemon_p->request_io_queue[AKWBS_READ_INDEX],
&msg,
sizeof(struct akwbs_request_io_msg))) == AKWBS_ERROR)
{
if (errno == EAGAIN)
{
if (pthread_cond_wait(&daemon_p->request_io_queue_cond,
&daemon_p->request_io_queue_mutex) != AKWBS_SUCCESS)
{
pthread_mutex_unlock(&daemon_p->request_io_queue_mutex);
pthread_exit(NULL);
}
}
else
{
pthread_mutex_unlock(&daemon_p->request_io_queue_mutex);
pthread_exit(NULL);
}
}
pthread_mutex_unlock(&daemon_p->request_io_queue_mutex);
akwbs_do_io(msg.fd, msg.address, &msg.bytes, &msg.offset, msg.type);
result_msg.bytes_read = msg.bytes;
result_msg.connection_fd = msg.sd;
posix_madvise(msg.address, msg.bytes, POSIX_MADV_SEQUENTIAL);
akwbs_result_io_send_msg(&result_msg, daemon_p->result_io_queue[AKWBS_WRITE_INDEX]);
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
}