Skip to content

Commit

Permalink
beginning to implement read_darray for async
Browse files Browse the repository at this point in the history
  • Loading branch information
edhartnett committed Sep 5, 2019
1 parent ee94d47 commit 166b9af
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 18 deletions.
41 changes: 37 additions & 4 deletions src/clib/pio_darray.c
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ PIOc_write_darray(int ncid, int varid, int ioid, PIO_Offset arraylen, void *arra
return pio_err(ios, file, PIO_EINVAL, __FILE__, __LINE__);

/* Move to end of list or the entry that matches this ioid. */
hashid = ioid*10 + vdesc->rec_var;
hashid = ioid * 10 + vdesc->rec_var;
HASH_FIND_INT( file->buffer, &hashid, wmb);
if (wmb)
PLOG((3, "wmb->ioid = %d wmb->recordvar = %d", wmb->ioid, wmb->recordvar));
Expand Down Expand Up @@ -869,10 +869,11 @@ PIOc_write_darray(int ncid, int varid, int ioid, PIO_Offset arraylen, void *arra
}

/**
* Read a field from a file to the IO library.
* Read a field from a file to the IO library using distributed
* arrays.
*
* @param ncid identifies the netCDF file
* @param varid the variable ID to be read
* @param ncid identifies the netCDF file.
* @param varid the variable ID to be read.
* @param ioid the I/O description ID as passed back by
* PIOc_InitDecomp().
* @param arraylen this parameter is ignored. Nominally it is the
Expand Down Expand Up @@ -911,6 +912,38 @@ PIOc_read_darray(int ncid, int varid, int ioid, PIO_Offset arraylen,
return pio_err(NULL, NULL, PIO_EBADID, __FILE__, __LINE__);
ios = file->iosystem;

/* If async is in use, and this is not an IO task, bcast the
* parameters. */
if (ios->async)
{
if (!ios->ioproc)
{
int msg = PIO_MSG_READDARRAY;

if (ios->compmaster == MPI_ROOT)
mpierr = MPI_Send(&msg, 1, MPI_INT, ios->ioroot, 1, ios->union_comm);

/* Send the function parameters and associated informaiton
* to the msg handler. */
if (!mpierr)
mpierr = MPI_Bcast(&ncid, 1, MPI_INT, ios->compmaster, ios->intercomm);
if (!mpierr)
mpierr = MPI_Bcast(&varid, 1, MPI_INT, ios->compmaster, ios->intercomm);
if (!mpierr)
mpierr = MPI_Bcast(&ioid, 1, MPI_INT, ios->compmaster, ios->intercomm);
if (!mpierr)
mpierr = MPI_Bcast(&arraylen, 1, MPI_OFFSET, ios->compmaster, ios->intercomm);
PLOG((2, "PIOc_read_darray ncid %d varid %d ioid %d arraylen %d"
ncid, varid, ioid, arraylen));
}

/* Handle MPI errors. */
if ((mpierr2 = MPI_Bcast(&mpierr, 1, MPI_INT, ios->comproot, ios->my_comm)))
return check_mpi(NULL, file, mpierr2, __FILE__, __LINE__);
if (mpierr)
return check_mpi(NULL, file, mpierr, __FILE__, __LINE__);
}

/* Get the iodesc. */
if (!(iodesc = pio_get_iodesc_from_id(ioid)))
return pio_err(ios, file, PIO_EBADID, __FILE__, __LINE__);
Expand Down
14 changes: 7 additions & 7 deletions src/clib/pio_darray_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -1137,19 +1137,19 @@ write_darray_multi_serial(file_desc_t *file, int nvars, int fndims, const int *v
}

/**
* Read an array of data from a file to the (parallel) IO library.
* Read an array of data from a file using distributed arrays.
*
* @param file a pointer to the open file descriptor for the file
* that will be written to
* @param iodesc a pointer to the defined iodescriptor for the buffer
* @param vid the variable id to be read
* that will be read from.
* @param iodesc a pointer to the defined iodescriptor for the buffer.
* @param vid the variable id to be read.
* @param iobuf the buffer to be read into from this mpi task. May be
* null. for example we have 8 ionodes and a distributed array with
* null. (For example we have 8 ionodes and a distributed array with
* global size 4, then at least 4 nodes will have a null iobuf. In
* practice the box rearranger trys to have at least blocksize bytes
* practice the box rearranger tries to have at least blocksize bytes
* on each io task and so if the total number of bytes to write is
* less than blocksize*numiotasks then some iotasks will have a NULL
* iobuf.
* iobuf.)
* @return 0 on success, error code otherwise.
* @ingroup PIO_read_darray_c
* @author Jim Edwards, Ed Hartnett
Expand Down
31 changes: 27 additions & 4 deletions src/clib/pio_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -2259,8 +2259,7 @@ int write_darray_multi_handler(iosystem_desc_t *ios)
}

/**
* This function is run on the IO tasks to...
* NOTE: not yet implemented
* This function is run on the IO tasks to read distributed arrays.
*
* @param ios pointer to the iosystem_desc_t data.
*
Expand All @@ -2269,9 +2268,33 @@ int write_darray_multi_handler(iosystem_desc_t *ios)
* @internal
* @author Ed Hartnett
*/
int readdarray_handler(iosystem_desc_t *ios)
int read_darray_handler(iosystem_desc_t *ios)
{
int ncid;
int varid;
int ioid;
int arraylen;
void *data;

PLOG((1, "read_darray_handler called"));
assert(ios);

/* Get the parameters for this function that the the comp master
* task is broadcasting. */
if ((mpierr = MPI_Bcast(&ncid, 1, MPI_INT, 0, ios->intercomm)))
return check_mpi(ios, NULL, mpierr, __FILE__, __LINE__);
if ((mpierr = MPI_Bcast(&varid, 1, MPI_INT, 0, ios->intercomm)))
return check_mpi(ios, NULL, mpierr, __FILE__, __LINE__);
if ((mpierr = MPI_Bcast(&ioid, 1, MPI_INT, 0, ios->intercomm)))
return check_mpi(ios, NULL, mpierr, __FILE__, __LINE__);
if ((mpierr = MPI_Bcast(&arraylen, 1, MPI_OFFSET, 0, ios->intercomm)))
return check_mpi(ios, NULL, mpierr, __FILE__, __LINE__);
PLOG((2, "ncid %d varid %d ioid %d arraylen %d" ncid, varid,
ioid, arraylen));

PIOc_read_darray(ncid, varid, ioid, arraylen, data);

PLOG((1, "read_darray_handler succeeded!"));
return PIO_NOERR;
}

Expand Down Expand Up @@ -2736,7 +2759,7 @@ int pio_msg_handler2(int io_rank, int component_count, iosystem_desc_t **iosys,
ret = advanceframe_handler(my_iosys);
break;
case PIO_MSG_READDARRAY:
ret = readdarray_handler(my_iosys);
ret = read_darray_handler(my_iosys);
break;
case PIO_MSG_SETERRORHANDLING:
ret = seterrorhandling_handler(my_iosys);
Expand Down
2 changes: 1 addition & 1 deletion src/clib/pio_rearrange.c
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ define_iodesc_datatypes(iosystem_desc_t *ios, io_desc_t *iodesc)
int ret; /* Return value. */

pioassert(ios && iodesc, "invalid input", __FILE__, __LINE__);
PLOG((1, "define_iodesc_datatypes ios->ioproc = %d iodesc->rtype is %sNULL, "
PLOG((3, "define_iodesc_datatypes ios->ioproc = %d iodesc->rtype is %sNULL, "
"iodesc->nrecvs %d", ios->ioproc, iodesc->rtype ? "not " : "",
iodesc->nrecvs));

Expand Down
4 changes: 2 additions & 2 deletions tests/cunit/test_async_1d.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ int main(int argc, char **argv)
/* Read the data. */
if ((ret = PIOc_setframe(ncid, 0, 0)))
ERR(ret);
/* if ((ret = PIOc_read_darray(ncid, 0, ioid, MAPLEN, &data_in))) */
/* ERR(ret); */
if ((ret = PIOc_read_darray(ncid, 0, ioid, MAPLEN, &data_in)))
ERR(ret);

/* Close the file. */
if ((ret = PIOc_closefile(ncid)))
Expand Down

0 comments on commit 166b9af

Please sign in to comment.