Skip to content

Commit

Permalink
add concurrency, down to 4.4s
Browse files Browse the repository at this point in the history
  • Loading branch information
dannyvankooten committed Jan 4, 2024
1 parent 31282f3 commit fe6d42d
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 92 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CFLAGS+=-g -O2 -march=native -Wall -Wextra -Wpedantic -std=c99 -Wformat=2 -Wconversion -Wtrampolines \
-Wimplicit-fallthrough
-Wimplicit-fallthrough -D_XOPEN_SOURCE=500
LIBS+=-lm

all: bin/ bin/create-sample bin/analyze
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ This will create a 12 GB file with 1B rows named `measurements.txt` in your curr
### Run the challenge:

```
time bin/analyze measurements.txt
time bin/analyze measurements.txt >/dev/null
real 0m29.071s
user 0m23.790s
sys 0m2.962s
real 0m4.464s
user 0m34.472s
sys 0m6.938s
```
225 changes: 138 additions & 87 deletions analyze.c
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

struct result {
char *city;
int count, sum, min, max;
};

#define HCAP 1987

// parses a floating point number as an integer
Expand All @@ -28,85 +25,83 @@ static inline void parse_number(int *dest, char *s, char **endptr) {

// parse characteristic
while (*s >= '0' && *s <= '9') {
n = (n * 10) + (*s - '0');
s++;
n = (n * 10) + (*s++ - '0');
}

// skip separator
s++;

// parse mantissa
while (*s >= '0' && *s <= '9') {
d = (d * 10) + (*s - '0');
s++;
d = (d * 10) + (*s++ - '0');
}

*dest = mod * (n * 10 + d);
*dest = mod * n * 10 + d;
*endptr = s;
}

// hash returns the fnv-1a hash of the first n bytes in data
static inline unsigned int hash(const unsigned char *data, size_t n) {
const unsigned int fnv_prime = 0x811C9DC5;
unsigned int hash = 0;

for (size_t i = 0; i < n; data++, i++) {
hash *= fnv_prime;
hash *= 0x811C9DC5; // prime
hash ^= (*data);
}

return hash;
}

// cmp returns -1 if city property of a is lexically smaller than city property
// of b
struct Group {
unsigned int count;
int sum, min, max;
char *city;
};

// cmp compares the city property of the result that both pointers point
static inline int cmp(const void *ptr_a, const void *ptr_b) {
return strcmp(((struct result *)ptr_a)->city, ((struct result *)ptr_b)->city);
return strcmp(((struct Group *)ptr_a)->city, ((struct Group *)ptr_b)->city);
}

int main(int argc, char **argv) {
char *file = "measurements.txt";
if (argc > 1) {
file = argv[1];
}

struct stat sb;
int fd = open(file, O_RDONLY);
if (!fd) {
perror("error opening file");
exit(EXIT_FAILURE);
}
struct ResultSet {
char labels[420][32];
struct Group groups[420];
int n;
};

if (fstat(fd, &sb) == -1) {
perror("error getting file size");
exit(EXIT_FAILURE);
}
struct Chunk {
char *data;
size_t start;
size_t end;
struct ResultSet result;
};

char cities[413][28] = {{0}};
struct result results[413];
int nresults = 0;
static void *process_chunk(void *data) {
struct Chunk *ch = (struct Chunk *)data;

// mmap entire file into memory
char *data = mmap(NULL, (size_t)sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
if (!data) {
perror("error mmapping file");
exit(EXIT_FAILURE);
// skip start forward until SOF or after next newline
if (ch->start > 0) {
while (ch->data[ch->start - 1] != '\n') {
ch->start++;
}
}

// initialize map to -1
int *map = malloc(HCAP * sizeof(int));
if (!map) {
perror("error allocating memory for hashmap");
exit(EXIT_FAILURE);
while (ch->data[ch->end] != 0x0 && ch->data[ch->end - 1] != '\n') {
ch->end++;
}

// initialize hashmap
// will hold indexes into our cities and results array
int map[HCAP];
memset(map, -1, HCAP * sizeof(int));

char *s = data;
char *s = &ch->data[ch->start];
char *start, *pos;
unsigned int h;
int measurement;
size_t len;
int c;

while (1) {
start = s;
pos = strchr(s, ';');
Expand All @@ -116,64 +111,125 @@ int main(int argc, char **argv) {
// probe map until free spot or match
h = hash((unsigned char *)start, len) % HCAP;
c = map[h];
while (c >= 0 && memcmp(cities[c], start, len) != 0) {
h = (h == HCAP - 1) ? 0 : h + 1;
while (c >= 0 && memcmp(ch->result.labels[c], start, len) != 0) {
if (h++ == HCAP) {
h = 0;
}
c = map[h];
}

if (c < 0) {
map[h] = nresults;
char *city = cities[nresults];
memcpy(city, start, len);
city[len] = 0x0;

results[nresults].city = city;
results[nresults].sum = measurement;
results[nresults].max = measurement;
results[nresults].min = measurement;
results[nresults].count = 1;

nresults++;
memcpy(ch->result.labels[ch->result.n], start, len);
ch->result.labels[ch->result.n][len] = 0x0;
ch->result.groups[ch->result.n].city = ch->result.labels[ch->result.n];
ch->result.groups[ch->result.n].count = 1;
ch->result.groups[ch->result.n].sum = measurement;
ch->result.groups[ch->result.n].min = measurement;
ch->result.groups[ch->result.n].max = measurement;
map[h] = ch->result.n++;
} else {

results[c].sum += measurement;
results[c].count += 1;
if (measurement < results[c].min) {
results[c].min = measurement;
} else if (measurement > results[c].max) {
results[c].max = measurement;
ch->result.groups[c].count += 1;
ch->result.groups[c].sum += measurement;
if (measurement < ch->result.groups[c].min) {
ch->result.groups[c].min = measurement;
} else if (measurement > ch->result.groups[c].max) {
ch->result.groups[c].max = measurement;
}
}

// skip newline
s++;

if (*s == 0x0) {
// break out of loop if EOF
if (*++s == 0x0 || s == &ch->data[ch->end]) {
break;
}
}

char buf[128];
char *output = malloc(1 << 14 * sizeof(char));
if (!output) {
perror("error allocating memory for output string");
return (void *)ch;
}

int main(int argc, char **argv) {
char *file = "measurements.txt";
if (argc > 1) {
file = argv[1];
}

int fd = open(file, O_RDONLY);
if (!fd) {
perror("error opening file");
exit(EXIT_FAILURE);
}

s = output;
struct stat sb;
if (fstat(fd, &sb) == -1) {
perror("error getting file size");
exit(EXIT_FAILURE);
}

// mmap entire file into memory
size_t sz = (size_t)sb.st_size;
char *data = mmap(NULL, sz, PROT_READ, MAP_PRIVATE, fd, 0);
if (!data || data == MAP_FAILED) {
perror("error mmapping file");
exit(EXIT_FAILURE);
}

// distribute work among N worker threads
const int nworkers = 12;
pthread_t workers[nworkers];
struct Chunk chunks[nworkers];
for (int i = 0; i < nworkers; i++) {
chunks[i].result.n = 0;
chunks[i].data = data;
chunks[i].start = sz / (size_t)nworkers * (size_t)i;
chunks[i].end = sz / (size_t)nworkers * ((size_t)i + 1);
pthread_create(&workers[i], NULL, process_chunk, &chunks[i]);
}

// wait for all threads to finish
for (int i = 0; i < nworkers; i++) {
pthread_join(workers[i], NULL);
}

// merge results
struct ResultSet result = chunks[0].result;
for (int i = 1; i < nworkers; i++) {
for (int j = 0; j < chunks[i].result.n; j++) {
// get index of label
for (int k = 0; k < result.n; k++) {
if (strcmp(result.labels[k], chunks[i].result.labels[j]) == 0) {
result.groups[k].count += chunks[i].result.groups[j].count;
result.groups[k].sum += chunks[i].result.groups[j].sum;
if (chunks[i].result.groups[j].min < result.groups[k].min) {
result.groups[k].min = chunks[i].result.groups[j].min;
}
if (chunks[i].result.groups[j].max < result.groups[k].max) {
result.groups[k].max = chunks[i].result.groups[j].max;
}
break;
}
}
}
}

// sort results alphabetically
qsort(result.groups, (size_t)result.n, sizeof(struct Group), cmp);

// prepare output string
char buf[128];
char output[1 << 14];
char *s = output;
*s++ = '{';
qsort(results, (size_t)nresults, sizeof(struct result), cmp);
for (int i = 0; i < nresults; i++) {
for (int i = 0; i < result.n; i++) {
size_t n = (size_t)sprintf(
buf, "%s=%.1f/%.1f/%.1f", results[i].city,
(double)results[i].min / 10.0,
((double)results[i].sum / (double)results[i].count) / 10.0,
(double)results[i].max / 10.0);
buf, "%s=%.1f/%.1f/%.1f", result.groups[i].city,
(float)result.groups[i].min / 10.0,
((float)result.groups[i].sum / (float)result.groups[i].count) / 10.0,
(float)result.groups[i].max / 10.0);

// copy buf to output
memcpy(s, buf, n);

if (i < nresults - 1) {
if (i < result.n - 1) {
memcpy(s + n, ", ", 2);
n += 2;
}
Expand All @@ -184,12 +240,7 @@ int main(int argc, char **argv) {
*s = 0x0;
puts(output);

free(map);
free(output);
munmap(data, sb.st_size);
munmap(data, sz);
close(fd);
exit(EXIT_SUCCESS);
}

// TODO: Concurrency, process in chunks where each chunk works on a separate map
// TODO: SIMD

0 comments on commit fe6d42d

Please sign in to comment.