diff --git a/Makefile b/Makefile index ecf3071..491c3c7 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ CFLAGS+=-g -O2 -march=native -Wall -Wextra -Wpedantic -std=c99 -Wformat=2 -Wconversion -Wtrampolines \ --Wimplicit-fallthrough +-Wimplicit-fallthrough -D_XOPEN_SOURCE=500 LIBS+=-lm all: bin/ bin/create-sample bin/analyze diff --git a/README.md b/README.md index 859dffc..9d87d44 100644 --- a/README.md +++ b/README.md @@ -25,9 +25,9 @@ This will create a 12 GB file with 1B rows named `measurements.txt` in your curr ### Run the challenge: ``` -time bin/analyze measurements.txt +time bin/analyze measurements.txt >/dev/null -real 0m29.071s -user 0m23.790s -sys 0m2.962s +real 0m4.464s +user 0m34.472s +sys 0m6.938s ``` diff --git a/analyze.c b/analyze.c index 83ea3d4..d5ad693 100644 --- a/analyze.c +++ b/analyze.c @@ -1,16 +1,13 @@ #include +#include #include #include #include #include #include +#include #include -struct result { - char *city; - int count, sum, min, max; -}; - #define HCAP 1987 // parses a floating point number as an integer @@ -28,8 +25,7 @@ static inline void parse_number(int *dest, char *s, char **endptr) { // parse characteristic while (*s >= '0' && *s <= '9') { - n = (n * 10) + (*s - '0'); - s++; + n = (n * 10) + (*s++ - '0'); } // skip separator @@ -37,76 +33,75 @@ static inline void parse_number(int *dest, char *s, char **endptr) { // parse mantissa while (*s >= '0' && *s <= '9') { - d = (d * 10) + (*s - '0'); - s++; + d = (d * 10) + (*s++ - '0'); } - *dest = mod * (n * 10 + d); + *dest = mod * n * 10 + d; *endptr = s; } // hash returns the fnv-1a hash of the first n bytes in data static inline unsigned int hash(const unsigned char *data, size_t n) { - const unsigned int fnv_prime = 0x811C9DC5; unsigned int hash = 0; for (size_t i = 0; i < n; data++, i++) { - hash *= fnv_prime; + hash *= 0x811C9DC5; // prime hash ^= (*data); } return hash; } -// cmp returns -1 if city property of a is lexically smaller than city property -// of b +struct Group { + unsigned int count; + int sum, min, max; + char *city; +}; + +// cmp compares the city property of the result that both pointers point static inline int cmp(const void *ptr_a, const void *ptr_b) { - return strcmp(((struct result *)ptr_a)->city, ((struct result *)ptr_b)->city); + return strcmp(((struct Group *)ptr_a)->city, ((struct Group *)ptr_b)->city); } -int main(int argc, char **argv) { - char *file = "measurements.txt"; - if (argc > 1) { - file = argv[1]; - } - - struct stat sb; - int fd = open(file, O_RDONLY); - if (!fd) { - perror("error opening file"); - exit(EXIT_FAILURE); - } +struct ResultSet { + char labels[420][32]; + struct Group groups[420]; + int n; +}; - if (fstat(fd, &sb) == -1) { - perror("error getting file size"); - exit(EXIT_FAILURE); - } +struct Chunk { + char *data; + size_t start; + size_t end; + struct ResultSet result; +}; - char cities[413][28] = {{0}}; - struct result results[413]; - int nresults = 0; +static void *process_chunk(void *data) { + struct Chunk *ch = (struct Chunk *)data; - // mmap entire file into memory - char *data = mmap(NULL, (size_t)sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0); - if (!data) { - perror("error mmapping file"); - exit(EXIT_FAILURE); + // skip start forward until SOF or after next newline + if (ch->start > 0) { + while (ch->data[ch->start - 1] != '\n') { + ch->start++; + } } - // initialize map to -1 - int *map = malloc(HCAP * sizeof(int)); - if (!map) { - perror("error allocating memory for hashmap"); - exit(EXIT_FAILURE); + while (ch->data[ch->end] != 0x0 && ch->data[ch->end - 1] != '\n') { + ch->end++; } + + // initialize hashmap + // will hold indexes into our cities and results array + int map[HCAP]; memset(map, -1, HCAP * sizeof(int)); - char *s = data; + char *s = &ch->data[ch->start]; char *start, *pos; unsigned int h; int measurement; size_t len; int c; + while (1) { start = s; pos = strchr(s, ';'); @@ -116,64 +111,125 @@ int main(int argc, char **argv) { // probe map until free spot or match h = hash((unsigned char *)start, len) % HCAP; c = map[h]; - while (c >= 0 && memcmp(cities[c], start, len) != 0) { - h = (h == HCAP - 1) ? 0 : h + 1; + while (c >= 0 && memcmp(ch->result.labels[c], start, len) != 0) { + if (h++ == HCAP) { + h = 0; + } c = map[h]; } if (c < 0) { - map[h] = nresults; - char *city = cities[nresults]; - memcpy(city, start, len); - city[len] = 0x0; - - results[nresults].city = city; - results[nresults].sum = measurement; - results[nresults].max = measurement; - results[nresults].min = measurement; - results[nresults].count = 1; - - nresults++; + memcpy(ch->result.labels[ch->result.n], start, len); + ch->result.labels[ch->result.n][len] = 0x0; + ch->result.groups[ch->result.n].city = ch->result.labels[ch->result.n]; + ch->result.groups[ch->result.n].count = 1; + ch->result.groups[ch->result.n].sum = measurement; + ch->result.groups[ch->result.n].min = measurement; + ch->result.groups[ch->result.n].max = measurement; + map[h] = ch->result.n++; } else { - - results[c].sum += measurement; - results[c].count += 1; - if (measurement < results[c].min) { - results[c].min = measurement; - } else if (measurement > results[c].max) { - results[c].max = measurement; + ch->result.groups[c].count += 1; + ch->result.groups[c].sum += measurement; + if (measurement < ch->result.groups[c].min) { + ch->result.groups[c].min = measurement; + } else if (measurement > ch->result.groups[c].max) { + ch->result.groups[c].max = measurement; } } // skip newline - s++; - - if (*s == 0x0) { + // break out of loop if EOF + if (*++s == 0x0 || s == &ch->data[ch->end]) { break; } } - char buf[128]; - char *output = malloc(1 << 14 * sizeof(char)); - if (!output) { - perror("error allocating memory for output string"); + return (void *)ch; +} + +int main(int argc, char **argv) { + char *file = "measurements.txt"; + if (argc > 1) { + file = argv[1]; + } + + int fd = open(file, O_RDONLY); + if (!fd) { + perror("error opening file"); exit(EXIT_FAILURE); } - s = output; + struct stat sb; + if (fstat(fd, &sb) == -1) { + perror("error getting file size"); + exit(EXIT_FAILURE); + } + + // mmap entire file into memory + size_t sz = (size_t)sb.st_size; + char *data = mmap(NULL, sz, PROT_READ, MAP_PRIVATE, fd, 0); + if (!data || data == MAP_FAILED) { + perror("error mmapping file"); + exit(EXIT_FAILURE); + } + + // distribute work among N worker threads + const int nworkers = 12; + pthread_t workers[nworkers]; + struct Chunk chunks[nworkers]; + for (int i = 0; i < nworkers; i++) { + chunks[i].result.n = 0; + chunks[i].data = data; + chunks[i].start = sz / (size_t)nworkers * (size_t)i; + chunks[i].end = sz / (size_t)nworkers * ((size_t)i + 1); + pthread_create(&workers[i], NULL, process_chunk, &chunks[i]); + } + + // wait for all threads to finish + for (int i = 0; i < nworkers; i++) { + pthread_join(workers[i], NULL); + } + + // merge results + struct ResultSet result = chunks[0].result; + for (int i = 1; i < nworkers; i++) { + for (int j = 0; j < chunks[i].result.n; j++) { + // get index of label + for (int k = 0; k < result.n; k++) { + if (strcmp(result.labels[k], chunks[i].result.labels[j]) == 0) { + result.groups[k].count += chunks[i].result.groups[j].count; + result.groups[k].sum += chunks[i].result.groups[j].sum; + if (chunks[i].result.groups[j].min < result.groups[k].min) { + result.groups[k].min = chunks[i].result.groups[j].min; + } + if (chunks[i].result.groups[j].max < result.groups[k].max) { + result.groups[k].max = chunks[i].result.groups[j].max; + } + break; + } + } + } + } + + // sort results alphabetically + qsort(result.groups, (size_t)result.n, sizeof(struct Group), cmp); + + // prepare output string + char buf[128]; + char output[1 << 14]; + char *s = output; *s++ = '{'; - qsort(results, (size_t)nresults, sizeof(struct result), cmp); - for (int i = 0; i < nresults; i++) { + for (int i = 0; i < result.n; i++) { size_t n = (size_t)sprintf( - buf, "%s=%.1f/%.1f/%.1f", results[i].city, - (double)results[i].min / 10.0, - ((double)results[i].sum / (double)results[i].count) / 10.0, - (double)results[i].max / 10.0); + buf, "%s=%.1f/%.1f/%.1f", result.groups[i].city, + (float)result.groups[i].min / 10.0, + ((float)result.groups[i].sum / (float)result.groups[i].count) / 10.0, + (float)result.groups[i].max / 10.0); // copy buf to output memcpy(s, buf, n); - if (i < nresults - 1) { + if (i < result.n - 1) { memcpy(s + n, ", ", 2); n += 2; } @@ -184,12 +240,7 @@ int main(int argc, char **argv) { *s = 0x0; puts(output); - free(map); - free(output); - munmap(data, sb.st_size); + munmap(data, sz); close(fd); exit(EXIT_SUCCESS); } - -// TODO: Concurrency, process in chunks where each chunk works on a separate map -// TODO: SIMD