diff --git a/src/clib/pio_rearrange.c b/src/clib/pio_rearrange.c index cebf72d849d..498c493fea7 100644 --- a/src/clib/pio_rearrange.c +++ b/src/clib/pio_rearrange.c @@ -1211,6 +1211,7 @@ int box_rearrange_create(iosystem_desc_t *ios, int maplen, const PIO_Offset *com /* Allocate arrays needed for this function. */ int *dest_ioproc = NULL; /* Destination IO task for each data element on compute task. */ PIO_Offset *dest_ioindex = NULL; /* Offset into IO task array for each data element. */ + PIO_Offset **gcoord_map = NULL; /* Global coordinate value for each data element. */ int sendcounts[ios->num_uniontasks]; /* Send counts for swapm call. */ int sdispls[ios->num_uniontasks]; /* Send displacements for swapm. */ int recvcounts[ios->num_uniontasks]; /* Receive counts for swapm. */ @@ -1238,6 +1239,15 @@ int box_rearrange_create(iosystem_desc_t *ios, int maplen, const PIO_Offset *com if (!(dest_ioindex = malloc(maplen * sizeof(PIO_Offset)))) return pio_err(ios, NULL, PIO_ENOMEM, __FILE__, __LINE__); + + if (!(gcoord_map = malloc(maplen * sizeof(PIO_Offset*)))) + return pio_err(ios, NULL, PIO_ENOMEM, __FILE__, __LINE__); + + for (int i = 0; i < maplen; i++) + { + if (!(gcoord_map[i] = calloc(ndims, sizeof(PIO_Offset)))) + return pio_err(ios, NULL, PIO_ENOMEM, __FILE__, __LINE__); + } } /* Initialize the sc_info send and recv messages */ @@ -1361,6 +1371,18 @@ int box_rearrange_create(iosystem_desc_t *ios, int maplen, const PIO_Offset *com LOG((3, "iomaplen[%d] = %d", i, sc_info_msg_recv[i * sc_info_msg_sz])); #endif /* PIO_ENABLE_LOGGING */ + /* Convert a 1-D index into a global coordinate value for each data element */ + for (int k = 0; k < maplen; k++) + { + /* The compmap array is 1 based but calculations are 0 based */ + LOG((3, "about to call idx_to_dim_list ndims = %d ", ndims)); + idx_to_dim_list(ndims, gdimlen, compmap[k] - 1, gcoord_map[k]); +#if PIO_ENABLE_LOGGING + for (int d = 0; d < ndims; d++) + LOG((3, "gcoord_map[%d][%d] = %lld", k, d, gcoord_map[k][d])); +#endif /* PIO_ENABLE_LOGGING */ + } + for (int i = 0; i < ios->num_iotasks; i++) { /* First entry in the sc_info msg is the iomaplen */ @@ -1383,23 +1405,19 @@ int box_rearrange_create(iosystem_desc_t *ios, int maplen, const PIO_Offset *com * offset into the global data array. */ for (int k = 0; k < maplen; k++) { - PIO_Offset gcoord[ndims], lcoord[ndims]; - bool found = true; + /* An IO task has already been found for this element */ + if (dest_ioproc[k] >= 0) + continue; - /* The compmap array is 1 based but calculations are 0 based */ - LOG((3, "about to call idx_to_dim_list ndims = %d ", ndims)); - idx_to_dim_list(ndims, gdimlen, compmap[k] - 1, gcoord); - #if PIO_ENABLE_LOGGING - for (int d = 0; d < ndims; d++) - LOG((3, "gcoord[%d] = %lld", d, gcoord[d])); - #endif /* PIO_ENABLE_LOGGING */ + PIO_Offset lcoord[ndims]; + bool found = true; /* Find a destination for each entry in the compmap. */ for (int j = 0; j < ndims; j++) { - if (gcoord[j] >= start[j] && gcoord[j] < start[j] + count[j]) + if (gcoord_map[k][j] >= start[j] && gcoord_map[k][j] < start[j] + count[j]) { - lcoord[j] = gcoord[j] - start[j]; + lcoord[j] = gcoord_map[k][j] - start[j]; } else { @@ -1423,6 +1441,11 @@ int box_rearrange_create(iosystem_desc_t *ios, int maplen, const PIO_Offset *com } } + for (int i = 0; i < maplen; i++) + free(gcoord_map[i]); + free(gcoord_map); + gcoord_map = NULL; + /* Check that a destination is found for each compmap entry. */ for (int k = 0; k < maplen; k++) if (dest_ioproc[k] < 0 && compmap[k] > 0) @@ -1461,6 +1484,278 @@ int box_rearrange_create(iosystem_desc_t *ios, int maplen, const PIO_Offset *com return PIO_NOERR; } + +/* The box_rearrange_create algorithm optimized for the case where many + * iotasks have iomaplen == 0 (holes) + */ +int box_rearrange_create_with_holes(iosystem_desc_t *ios, int maplen, const PIO_Offset *compmap, + const int *gdimlen, int ndims, io_desc_t *iodesc) +{ + int ret; + +#ifdef TIMING + GPTLstart("PIO:box_rearrange_create_with_holes"); +#endif + /* Check inputs. */ + pioassert(ios && maplen >= 0 && compmap && gdimlen && ndims > 0 && iodesc, + "invalid input", __FILE__, __LINE__); + LOG((1, "box_rearrange_create maplen = %d ndims = %d ios->num_comptasks = %d " + "ios->num_iotasks = %d", maplen, ndims, ios->num_comptasks, ios->num_iotasks)); + + /* Allocate arrays needed for this function. */ + int *dest_ioproc = NULL; /* Destination IO task for each data element on compute task. */ + PIO_Offset *dest_ioindex = NULL; /* Offset into IO task array for each data element. */ + PIO_Offset **gcoord_map = NULL; /* Global coordinate value for each data element. */ + int sendcounts[ios->num_uniontasks]; /* Send counts for swapm call. */ + int sdispls[ios->num_uniontasks]; /* Send displacements for swapm. */ + int recvcounts[ios->num_uniontasks]; /* Receive counts for swapm. */ + int rdispls[ios->num_uniontasks]; /* Receive displacements for swapm. */ + MPI_Datatype dtypes[ios->num_uniontasks]; /* Array of MPI_OFFSET types for swapm. */ + PIO_Offset iomaplen[ios->num_iotasks]; /* Gets the llen of all IO tasks. */ + + /* This is the box rearranger. */ + iodesc->rearranger = PIO_REARR_BOX; + + /* Number of elements of data on compute node. */ + iodesc->ndof = maplen; + + if (maplen > 0) + { + if (!(dest_ioproc = malloc(maplen * sizeof(int)))) + return pio_err(ios, NULL, PIO_ENOMEM, __FILE__, __LINE__); + + if (!(dest_ioindex = malloc(maplen * sizeof(PIO_Offset)))) + return pio_err(ios, NULL, PIO_ENOMEM, __FILE__, __LINE__); + + if (!(gcoord_map = malloc(maplen * sizeof(PIO_Offset*)))) + return pio_err(ios, NULL, PIO_ENOMEM, __FILE__, __LINE__); + + for (int i = 0; i < maplen; i++) + { + if (!(gcoord_map[i] = calloc(ndims, sizeof(PIO_Offset)))) + return pio_err(ios, NULL, PIO_ENOMEM, __FILE__, __LINE__); + } + } + + /* Initialize array values. */ + for (int i = 0; i < maplen; i++) + { + dest_ioproc[i] = -1; + dest_ioindex[i] = -1; + } + + /* Initialize arrays used in swapm. */ + for (int i = 0; i < ios->num_uniontasks; i++) + { + sendcounts[i] = 0; + sdispls[i] = 0; + recvcounts[i] = 0; + rdispls[i] = 0; + dtypes[i] = MPI_OFFSET; + } + + /* For IO tasks, determine llen, the length of the data array on + * the IO task. For computation tasks, llen will remain at 0. Also + * set up arrays for the allgather which will give every IO task a + * complete list of llens for each IO task. */ + LOG((3, "ios->ioproc = %d ios->num_uniontasks = %d", ios->ioproc, + ios->num_uniontasks)); + pioassert(iodesc->llen == 0, "error", __FILE__, __LINE__); + if (ios->ioproc) + { + /* Set up send counts for sending llen in all to all + * gather. We are sending to all tasks, IO and computation. */ + for (int i = 0; i < ios->num_comptasks; i++) + sendcounts[ios->compranks[i]] = 1; + for (int i = 0; i < ios->num_iotasks; i++) + sendcounts[ios->ioranks[i]] = 1; + + /* Determine llen, the lenght of the data array on this IO + * node, by multipliying the counts in the + * iodesc->firstregion. */ + iodesc->llen = 1; + for (int i = 0; i < ndims; i++) + { + iodesc->llen *= iodesc->firstregion->count[i]; + LOG((3, "iodesc->firstregion->start[%d] = %d iodesc->firstregion->count[%d] = %d", + i, iodesc->firstregion->start[i], i, iodesc->firstregion->count[i])); + } + LOG((2, "iodesc->llen = %d", iodesc->llen)); + } + + /* Determine whether fill values will be needed. */ + if ((ret = determine_fill(ios, iodesc, gdimlen, compmap))) + return pio_err(ios, NULL, ret, __FILE__, __LINE__); + LOG((2, "iodesc->needsfill = %d ios->num_iotasks = %d", iodesc->needsfill, + ios->num_iotasks)); + + /* Set up receive counts and displacements to for an AllToAll + * gather of llen. */ + for (int i = 0; i < ios->num_iotasks; i++) + { + recvcounts[ios->ioranks[i]] = 1; + rdispls[ios->ioranks[i]] = i * SIZEOF_MPI_OFFSET; + LOG((3, "i = %d ios->ioranks[%d] = %d recvcounts[%d] = %d rdispls[%d] = %d", + i, i, ios->ioranks[i], ios->ioranks[i], recvcounts[ios->ioranks[i]], + ios->ioranks[i], rdispls[ios->ioranks[i]])); + } + + /* All-gather the llen to all tasks into array iomaplen. */ + LOG((3, "calling pio_swapm to allgather llen into array iomaplen, ndims = %d dtypes[0] = %d", + ndims, dtypes)); + if ((ret = pio_swapm(&iodesc->llen, sendcounts, sdispls, dtypes, iomaplen, recvcounts, + rdispls, dtypes, ios->union_comm, &iodesc->rearr_opts.io2comp))) + return pio_err(ios, NULL, ret, __FILE__, __LINE__); + LOG((3, "iodesc->llen = %d", iodesc->llen)); +#if PIO_ENABLE_LOGGING + for (int i = 0; i < ios->num_iotasks; i++) + LOG((3, "iomaplen[%d] = %d", i, iomaplen[i])); +#endif /* PIO_ENABLE_LOGGING */ + + /* Convert a 1-D index into a global coordinate value for each data element */ + for (int k = 0; k < maplen; k++) + { + /* The compmap array is 1 based but calculations are 0 based */ + LOG((3, "about to call idx_to_dim_list ndims = %d ", ndims)); + idx_to_dim_list(ndims, gdimlen, compmap[k] - 1, gcoord_map[k]); +#if PIO_ENABLE_LOGGING + for (int d = 0; d < ndims; d++) + LOG((3, "gcoord_map[%d][%d] = %lld", k, d, gcoord_map[k][d])); +#endif /* PIO_ENABLE_LOGGING */ + } + + /* For each IO task send starts/counts to all compute tasks. */ + for (int i = 0; i < ios->num_iotasks; i++) + { + /* The ipmaplen contains the llen (number of data elements) + * for this IO task. */ + LOG((2, "iomaplen[%d] = %d", i, iomaplen[i])); + + /* If there is data for this IO task, send start/count to all + * compute tasks. */ + if (iomaplen[i] > 0) + { + PIO_Offset start_count_send[ndims * 2]; + PIO_Offset start_count_recv[ndims * 2]; + + /* start/count array to be sent: 1st half for start, 2nd half for count */ + for (int j = 0; j < ndims; j++) + { + start_count_send[j] = iodesc->firstregion->start[j]; + start_count_send[ndims + j] = iodesc->firstregion->count[j]; + } + + /* Set up send/recv parameters for all to all gather of + * counts and starts. */ + for (int j = 0; j < ios->num_uniontasks; j++) + { + sendcounts[j] = 0; + sdispls[j] = 0; + rdispls[j] = 0; + recvcounts[j] = 0; + if (ios->union_rank == ios->ioranks[i]) + sendcounts[j] = ndims * 2; + } + recvcounts[ios->ioranks[i]] = ndims * 2; + + /* The start/count array from iotask i is sent to all compute tasks. */ + LOG((3, "about to call pio_swapm with start/count from iotask %d ndims = %d", + i, ndims)); + if ((ret = pio_swapm(start_count_send, sendcounts, sdispls, dtypes, start_count_recv, + recvcounts, rdispls, dtypes, ios->union_comm, + &iodesc->rearr_opts.io2comp))) + return pio_err(ios, NULL, ret, __FILE__, __LINE__); + + /* start/count array received: 1st half for start, 2nd half for count */ + PIO_Offset *start = start_count_recv; + PIO_Offset *count = start_count_recv + ndims; + +#if PIO_ENABLE_LOGGING + for (int d = 0; d < ndims; d++) + LOG((3, "start[%d] = %lld count[%d] = %lld", d, start[d], d, count[d])); +#endif /* PIO_ENABLE_LOGGING */ + + /* For each element of the data array on the compute task, + * find the IO task to send the data element to, and its + * offset into the global data array. */ + for (int k = 0; k < maplen; k++) + { + /* An IO task has already been found for this element */ + if (dest_ioproc[k] >= 0) + continue; + + PIO_Offset lcoord[ndims]; + bool found = true; + + /* Find a destination for each entry in the compmap. */ + for (int j = 0; j < ndims; j++) + { + if (gcoord_map[k][j] >= start[j] && gcoord_map[k][j] < start[j] + count[j]) + { + lcoord[j] = gcoord_map[k][j] - start[j]; + } + else + { + found = false; + break; + } + } + + /* Did we find a destination IO task for this element + * of the computation task data array? If so, remember + * the destination IO task, and determine the index + * for that element in the IO task data. */ + if (found) + { + dest_ioindex[k] = coord_to_lindex(ndims, lcoord, count); + dest_ioproc[k] = i; + LOG((3, "found dest_ioindex[%d] = %d dest_ioproc[%d] = %d", k, dest_ioindex[k], + k, dest_ioproc[k])); + } + } + } + } + + for (int i = 0; i < maplen; i++) + free(gcoord_map[i]); + free(gcoord_map); + gcoord_map = NULL; + + /* Check that a destination is found for each compmap entry. */ + for (int k = 0; k < maplen; k++) + if (dest_ioproc[k] < 0 && compmap[k] > 0) + return pio_err(ios, NULL, PIO_EINVAL, __FILE__, __LINE__); + + /* Completes the mapping for the box rearranger. */ + LOG((2, "calling compute_counts maplen = %d", maplen)); + if ((ret = compute_counts(ios, iodesc, dest_ioproc, dest_ioindex))) + return pio_err(ios, NULL, ret, __FILE__, __LINE__); + + free(dest_ioproc); + free(dest_ioindex); + dest_ioproc = NULL; + dest_ioindex = NULL; + + /* Compute the max io buffer size needed for an iodesc. */ + if (ios->ioproc) + { + if ((ret = compute_maxIObuffersize(ios->io_comm, iodesc))) + return pio_err(ios, NULL, ret, __FILE__, __LINE__); + LOG((3, "iodesc->maxiobuflen = %d", iodesc->maxiobuflen)); + } + + /* Using maxiobuflen compute the maximum number of bytes that the + * io task buffer can handle. */ + if ((ret = compute_maxaggregate_bytes(ios, iodesc))) + return pio_err(ios, NULL, ret, __FILE__, __LINE__); + LOG((3, "iodesc->maxbytes = %d", iodesc->maxbytes)); + +#ifdef TIMING + GPTLstop("PIO:box_rearrange_create_with_holes"); +#endif + return PIO_NOERR; +} + /** * Compare offsets is used by the sort in the subset rearranger. This * function is passed to qsort.