Skip to content

Commit

Permalink
Merge pull request ESMCI#1306 from wkliao/vard_multi_var
Browse files Browse the repository at this point in the history
aggregate filetypes of multiple variables for vard API
  • Loading branch information
jedwards4b authored Jun 27, 2018
2 parents cfd66c6 + cad03a9 commit 2f0fb4a
Showing 1 changed file with 194 additions and 55 deletions.
249 changes: 194 additions & 55 deletions src/clib/pio_darray_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ int get_gdim0(file_desc_t *file,io_desc_t *iodesc, int varid, int fndims, MPI_Of
return ierr;
}

static
int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdimoffset,
int rrcnt, int ndims, int fndims,
int frame, PIO_Offset **startlist, PIO_Offset **countlist,
Expand All @@ -113,6 +114,11 @@ int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdi
int blocklengths[rrcnt];
MPI_Datatype subarray[rrcnt];

/* preserve the value of unlimdimoffset, as it may be changed */
PIO_Offset _unlimdimoffset = unlimdimoffset;

*filetype = MPI_DATATYPE_NULL;

if(rrcnt == 0)
return PIO_NOERR;

Expand All @@ -122,11 +128,6 @@ int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdi
blocklengths[rc] = 1;
subarray[rc] = MPI_DATATYPE_NULL;
}
if(*filetype != MPI_DATATYPE_NULL)
{
if ((mpierr = MPI_Type_free(filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
if(fndims > ndims)
{
if ( gdim0 > 0)
Expand All @@ -153,54 +154,108 @@ int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdi
gdims[i] = iodesc->dimlen[i];
}

int true_rrcnt=-1; /* true number of contiguous requests */
MPI_Aint prev_end=-1; /* end offset of rc-1 request */
for( int rc=0; rc<rrcnt; rc++)
{
int sacount[fndims];
int sastart[fndims];
for (int i=dim_offset; i< fndims; i++)
{
sacount[i-dim_offset] = (int) countlist[rc][i];
sastart[i-dim_offset] = (int) startlist[rc][i];
}
if(gdim0 > 0)
{
unlimdimoffset = gdim0;
sastart[0] = max(0, frame);
displacements[rc]=0;
for (int i=dim_offset; i< fndims; i++)
{
sacount[i-dim_offset] = (int) countlist[rc][i];
sastart[i-dim_offset] = (int) startlist[rc][i];
}
if(gdim0 > 0)
{
unlimdimoffset = gdim0;
sastart[0] = max(0, frame);
displacements[rc]=0;
}
else
displacements[rc] = unlimdimoffset * max(0, frame);

/* Check whether this request is actually contiguous. If contiguous,
* we do not need to create an MPI derived datatype.
*/
int blocklen=1, isContig=1, warnContig=0;
MPI_Aint disp=0, shape=iodesc->mpitype_size;
for (int i=sa_ndims-1; i>=0; i--)
{
if (isContig) {
/* blocklen is the amount of this request, rc */
blocklen *= sacount[i];
/* disp is the flattened starting array index */
disp += sastart[i] * shape;
/* shape is the dimension product from sa_ndims-1 to i */
shape *= gdims[i];

if (warnContig == 0) {
if (sacount[i] < gdims[i])
/* first i detected to access partial dimension. If this
* one is contiguous, the remaining sacount[i-1 ... 0]
* must all == 1 */
warnContig = 1; /* possible non-contiguos */
}
else if (sacount[i] != 1) {
isContig = 0;
break; /* loop i */
}
}
}
else
displacements[rc] = unlimdimoffset * max(0, frame);
/* if this is a record variable, add the gap of record size */
disp += _unlimdimoffset * max(0, frame);

#if PIO_ENABLE_LOGGING
for (int i=0; i< sa_ndims; i++)
LOG((3, "vard: sastart[%d]=%d sacount[%d]=%d gdims[%d]=%d %ld %ld displacement = %ld un %d",
i,sastart[i], i,sacount[i], i, gdims[i], startlist[rc][i], countlist[rc][i], displacements[rc], unlimdimoffset));
#endif
if((mpierr = MPI_Type_create_subarray(sa_ndims, gdims,
sacount, sastart,MPI_ORDER_C
,iodesc->mpitype, subarray + rc)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

if((mpierr = MPI_Type_commit(subarray + rc)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
if (isContig) { /* this request rc is contiguous, no need to create a new MPI datatype */
if (prev_end == disp) {
/* this request rc can be coalesced into the previous
* displacements and blocklengths.
*/
blocklengths[true_rrcnt] += blocklen;
prev_end += blocklen;
}
else {
/* this request cannot be coalesced with the previous one */
true_rrcnt++;
subarray[true_rrcnt] = iodesc->mpitype;
displacements[true_rrcnt] = disp;
blocklengths[true_rrcnt] = blocklen;
prev_end = disp + blocklen;
}
}
else { /* request rc is not contiguous, must create a new MPI datatype */
true_rrcnt++;
if((mpierr = MPI_Type_create_subarray(sa_ndims, gdims,
sacount, sastart,MPI_ORDER_C
,iodesc->mpitype, subarray + true_rrcnt)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

if((mpierr = MPI_Type_commit(subarray + true_rrcnt)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}

#if PIO_ENABLE_LOGGING
LOG((3,"vard: blocklengths[%d]=%d displacement[%d]=%ld unlimdimoffset=%ld",rc,blocklengths[rc], rc, displacements[rc], unlimdimoffset));
#endif


}
true_rrcnt++;

if((mpierr = MPI_Type_create_struct(rrcnt, blocklengths, displacements, subarray, filetype)))
/* concatenate all MPI datatypes into filetype */
if((mpierr = MPI_Type_create_struct(true_rrcnt, blocklengths, displacements, subarray, filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

if((mpierr = MPI_Type_commit(filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

for( int rc=0; rc<rrcnt; rc++)
if (subarray[rc] != MPI_DATATYPE_NULL && (mpierr = MPI_Type_free(subarray + rc)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

for( int rc=0; rc<true_rrcnt; rc++)
if (subarray[rc] != MPI_DATATYPE_NULL && subarray[rc] != iodesc->mpitype &&
(mpierr = MPI_Type_free(subarray + rc)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

return PIO_NOERR;
}
Expand Down Expand Up @@ -427,25 +482,58 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
if (regioncnt == num_regions - 1)
{
#ifdef USE_VARD_WRITE
MPI_Datatype filetype = MPI_DATATYPE_NULL;
MPI_Aint var0_offset, var_offsets[nvars];
MPI_Offset vari_offset, vard_llen=0;
MPI_Datatype vartypes[nvars];
MPI_Datatype filetype = MPI_DATATYPE_NULL;
int blocklens[nvars];
int fvartype, var0_id;
int numReqs=0;
void *vard_bufptr;
int doFlush[nvars]; /* whether to flush or not */

/* construct doFlush[], so later when looping through nvars,
* it tells whether to flush or not.
*/
for (int nv = 0; nv < nvars; nv++) {
/* Get the var info. */
if ((ierr = get_var_desc(varids[nv], &file->varlist, &vdesc)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);
if (nv == 0) { /* first variable */
fvartype = vdesc->pio_type; /* first var's var type */
continue;
}
if (fvartype != vdesc->pio_type) {
/* nv's external datatype is different from nv-1 */
doFlush[nv-1] = 1;
fvartype = vdesc->pio_type;
}
else /* same as nv-1, no flush */
doFlush[nv-1] = 0;
}
doFlush[nvars-1] = 1; /* flush when reach the last variable */
#endif
int fvartype;
/* For each variable to be written. */
for (int nv = 0; nv < nvars; nv++)
{

/* Get the var info. */
if ((ierr = get_var_desc(varids[nv], &file->varlist, &vdesc)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);

#if USE_VARD_WRITE
/* vard does not support type conversion fail over to varn if var is not the same type as defined in file */
if ((ierr = ncmpi_inq_vartype(file->fh, varids[nv], &fvartype)))
/* PnetCDF 1.10.0 and later support type conversion in
* vard APIs. However, it requires all variables
* accessed by the filetype are of the same NC data
* type.
*/

/* obtain file offset of variable nv */
if ((ierr = ncmpi_inq_varoffset(file->fh, varids[nv], &vari_offset)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);
if (fvartype != vdesc->pio_type){
LOG((0, "ERROR: pnetcdf vard does not support type conversion varid %d filetype %d piotype %d"
,varids[nv],fvartype, vdesc->pio_type));
}

if (numReqs == 0) { /* 1st variable of same datatype */
var0_offset = vari_offset;
var0_id = varids[nv];
}
/* calculate the offset relative to the first var */
var_offsets[numReqs] = vari_offset - var0_offset;
blocklens[nv] = 1; /* 1 for each vartypes[nv] */

/* If this is the first variable or the frame has changed between variables (this should be rare) */
if(nv==0 || (nv > 0 && frame != NULL && frame[nv] != frame[nv-1])){
Expand All @@ -462,11 +550,19 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
thisframe = frame[nv];
else
thisframe = 0;

ierr = get_vard_mpidatatype(iodesc, gdim0, unlimdimoffset,
rrcnt, ndims, fndims,
thisframe, startlist, countlist, &filetype);
thisframe, startlist, countlist,
&vartypes[numReqs]);
}
else /* reuse the previous variable's datatype */
vartypes[numReqs] = vartypes[numReqs-1];
#else
/* Get the var info. */
if ((ierr = get_var_desc(varids[nv], &file->varlist, &vdesc)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);

if (vdesc->record >= 0 && ndims < fndims)
for (int rc = 0; rc < rrcnt; rc++)
startlist[rc][0] = frame[nv];
Expand All @@ -475,15 +571,60 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
bufptr = (void *)((char *)iobuf + nv * iodesc->mpitype_size * llen);

#if USE_VARD_WRITE
LOG((3, "vard: call ncmpi_put_vard llen = %d %d", llen, iodesc->mpitype_size ));
ierr = ncmpi_put_vard_all(file->fh, varids[nv], filetype, bufptr, llen, iodesc->mpitype);
LOG((3, "vard: return ncmpi_put_vard ierr = %d", ierr));
if(nv==nvars-1 && filetype != MPI_DATATYPE_NULL)
{
int mpierr;
if((mpierr = MPI_Type_free(&filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
if (numReqs == 0) { /* first var of the same type */
vard_bufptr = bufptr; /* preserve variable ID */
vard_llen = llen; /* reset I/O request size */
}
numReqs++;

if (doFlush[nv]) { /* flush the data now */
int mpierr;
/* concatenate vartypes[0...numReqs-1] */
if (numReqs > 1) {
/* check and remove NULL vartype */
int i, j=0;
for (i=0; i<numReqs; i++) {
if (vartypes[i] != MPI_DATATYPE_NULL) {
if (j < i) vartypes[j] = vartypes[i];
j++;
}
}
if (j > 0) { /* at least one vartypes[] is not NULL */
/* concatenate non-NULL vartypes */
if((mpierr = MPI_Type_create_struct(j, blocklens, var_offsets, vartypes, &filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

if((mpierr = MPI_Type_commit(&filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

/* free vartypes */
for (i=j-1; i>0; i--) {
if (vartypes[i] == vartypes[i-1]) continue;
if((mpierr = MPI_Type_free(&vartypes[i])))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
if((mpierr = MPI_Type_free(&vartypes[0])))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
else /* all vartypes[] are NULL */
filetype = MPI_DATATYPE_NULL;
}
else /* there is only one variable to flush */
filetype = vartypes[0];

LOG((3, "vard: call ncmpi_put_vard llen = %d %d", llen, iodesc->mpitype_size ));
ierr = ncmpi_put_vard_all(file->fh, var0_id, filetype, vard_bufptr, vard_llen, iodesc->mpitype);
LOG((3, "vard: return ncmpi_put_vard ierr = %d", ierr));
if(filetype != MPI_DATATYPE_NULL)
{
if((mpierr = MPI_Type_free(&filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
vard_llen = 0; /* reset request size to 0 */
numReqs = 0;
}
else /* don't flush yet, accumulate the request size */
vard_llen += llen;
#else
if (vdesc->nreqs % PIO_REQUEST_ALLOC_CHUNK == 0)
{
Expand Down Expand Up @@ -1161,8 +1302,6 @@ int pio_read_darray_nc(file_desc_t *file, io_desc_t *iodesc, int vid, void *iobu
else
unlimdimoffset = gdim0;

filetype = MPI_DATATYPE_NULL;

ierr = get_vard_mpidatatype(iodesc, gdim0, unlimdimoffset,
rrlen, ndims, fndims,
vdesc->record, startlist, countlist, &filetype);
Expand Down

0 comments on commit 2f0fb4a

Please sign in to comment.