-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlibmapreduce.c
executable file
·162 lines (135 loc) · 4.43 KB
/
libmapreduce.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/** @file libmapreduce.c */
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <sys/wait.h>
#include "libmapreduce.h"
#include "libds/libds.h"
static const int BUFFER_SIZE = 2048; /**< Size of the buffer used by read_from_fd(). */
/**
* Adds the key-value pair to the mapreduce data structure. This may
* require a reduce() operation.
*
* @param key
* The key of the key-value pair. The key has been malloc()'d by
* read_from_fd() and must be free()'d by you at some point.
* @param value
* The value of the key-value pair. The value has been malloc()'d
* by read_from_fd() and must be free()'d by you at some point.
* @param mr
* The pass-through mapreduce data structure (from read_from_fd()).
*/
static void process_key_value(const char *key, const char *value, mapreduce_t *mr)
{
}
/**
* Helper function. Reads up to BUFFER_SIZE from a file descriptor into a
* buffer and calls process_key_value() when for each and every key-value
* pair that is read from the file descriptor.
*
* Each key-value must be in a "Key: Value" format, identical to MP1, and
* each pair must be terminated by a newline ('\n').
*
* Each unique file descriptor must have a unique buffer and the buffer
* must be of size (BUFFER_SIZE + 1). Therefore, if you have two
* unique file descriptors, you must have two buffers that each have
* been malloc()'d to size (BUFFER_SIZE + 1).
*
* Note that read_from_fd() makes a read() call and will block if the
* fd does not have data ready to be read. This function is complete
* and does not need to be modified as part of this MP.
*
* @param fd
* File descriptor to read from.
* @param buffer
* A unique buffer associated with the fd. This buffer may have
* a partial key-value pair between calls to read_from_fd() and
* must not be modified outside the context of read_from_fd().
* @param mr
* Pass-through mapreduce_t structure (to process_key_value()).
*
* @retval 1
* Data was available and was read successfully.
* @retval 0
* The file descriptor fd has been closed, no more data to read.
* @retval -1
* The call to read() produced an error.
*/
static int read_from_fd(int fd, char *buffer, mapreduce_t *mr)
{
/* Find the end of the string. */
int offset = strlen(buffer);
/* Read bytes from the underlying stream. */
int bytes_read = read(fd, buffer + offset, BUFFER_SIZE - offset);
if (bytes_read == 0)
return 0;
else if(bytes_read < 0)
{
fprintf(stderr, "error in read.\n");
return -1;
}
buffer[offset + bytes_read] = '\0';
/* Loop through each "key: value\n" line from the fd. */
char *line;
while ((line = strstr(buffer, "\n")) != NULL)
{
*line = '\0';
/* Find the key/value split. */
char *split = strstr(buffer, ": ");
if (split == NULL)
continue;
/* Allocate and assign memory */
char *key = malloc((split - buffer + 1) * sizeof(char));
char *value = malloc((strlen(split) - 2 + 1) * sizeof(char));
strncpy(key, buffer, split - buffer);
key[split - buffer] = '\0';
strcpy(value, split + 2);
/* Process the key/value. */
process_key_value(key, value, mr);
/* Shift the contents of the buffer to remove the space used by the processed line. */
memmove(buffer, line + 1, BUFFER_SIZE - ((line + 1) - buffer));
buffer[BUFFER_SIZE - ((line + 1) - buffer)] = '\0';
}
return 1;
}
/**
* Initialize the mapreduce data structure, given a map and a reduce
* function pointer.
*/
void mapreduce_init(mapreduce_t *mr,
void (*mymap)(int, const char *),
const char *(*myreduce)(const char *, const char *))
{
}
/**
* Starts the map() processes for each value in the values array.
* (See the MP description for full details.)
*/
void mapreduce_map_all(mapreduce_t *mr, const char **values)
{
}
/**
* Blocks until all the reduce() operations have been completed.
* (See the MP description for full details.)
*/
void mapreduce_reduce_all(mapreduce_t *mr)
{
}
/**
* Gets the current value for a key.
* (See the MP description for full details.)
*/
const char *mapreduce_get_value(mapreduce_t *mr, const char *result_key)
{
return NULL;
}
/**
* Destroys the mapreduce data structure.
*/
void mapreduce_destroy(mapreduce_t *mr)
{
}