Skip to content

Commit

Permalink
concatenate filetypes of multiple variables for vard API
Browse files Browse the repository at this point in the history
  • Loading branch information
wkliao committed Jun 27, 2018
1 parent 7080803 commit cad03a9
Showing 1 changed file with 126 additions and 45 deletions.
171 changes: 126 additions & 45 deletions src/clib/pio_darray_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ int get_gdim0(file_desc_t *file,io_desc_t *iodesc, int varid, int fndims, MPI_Of
return ierr;
}

static
int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdimoffset,
int rrcnt, int ndims, int fndims,
int frame, PIO_Offset **startlist, PIO_Offset **countlist,
Expand All @@ -116,6 +117,8 @@ int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdi
/* preserve the value of unlimdimoffset, as it may be changed */
PIO_Offset _unlimdimoffset = unlimdimoffset;

*filetype = MPI_DATATYPE_NULL;

if(rrcnt == 0)
return PIO_NOERR;

Expand All @@ -125,11 +128,6 @@ int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdi
blocklengths[rc] = 1;
subarray[rc] = MPI_DATATYPE_NULL;
}
if(*filetype != MPI_DATATYPE_NULL)
{
if ((mpierr = MPI_Type_free(filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
if(fndims > ndims)
{
if ( gdim0 > 0)
Expand All @@ -156,8 +154,8 @@ int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdi
gdims[i] = iodesc->dimlen[i];
}

int true_rrcnt=-1;
MPI_Aint prev_end=-1;
int true_rrcnt=-1; /* true number of contiguous requests */
MPI_Aint prev_end=-1; /* end offset of rc-1 request */
for( int rc=0; rc<rrcnt; rc++)
{
int sacount[fndims];
Expand All @@ -184,16 +182,18 @@ int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdi
for (int i=sa_ndims-1; i>=0; i--)
{
if (isContig) {
/* blocklen is the amount of this request */
/* blocklen is the amount of this request, rc */
blocklen *= sacount[i];
/* disp is the flattened starting index */
/* disp is the flattened starting array index */
disp += sastart[i] * shape;
/* shape is the dimension product from sa_ndims-1 to i */
shape *= gdims[i];

if (warnContig == 0) {
if (sacount[i] < gdims[i])
/* remaining sacount[ii] must == 1 */
/* first i detected to access partial dimension. If this
* one is contiguous, the remaining sacount[i-1 ... 0]
* must all == 1 */
warnContig = 1; /* possible non-contiguos */
}
else if (sacount[i] != 1) {
Expand All @@ -202,32 +202,32 @@ int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdi
}
}
}
/* if this is a record variable, add the gap of record size */
disp += _unlimdimoffset * max(0, frame);

#if PIO_ENABLE_LOGGING
for (int i=0; i< sa_ndims; i++)
LOG((3, "vard: sastart[%d]=%d sacount[%d]=%d gdims[%d]=%d %ld %ld displacement = %ld un %d",
i,sastart[i], i,sacount[i], i, gdims[i], startlist[rc][i], countlist[rc][i], displacements[rc], unlimdimoffset));
#endif
if (isContig) { /* no need to create a new MPI datatype */
if (isContig) { /* this request rc is contiguous, no need to create a new MPI datatype */
if (prev_end == disp) {
/* this request is actually contiguous from the previous one
* and can be coalesced into the previous displacements and
* blocklengths.
/* this request rc can be coalesced into the previous
* displacements and blocklengths.
*/
blocklengths[true_rrcnt] += blocklen;
prev_end += blocklen;
}
else {
/* this request is not contiguous from the previous one */
/* this request cannot be coalesced with the previous one */
true_rrcnt++;
subarray[true_rrcnt] = iodesc->mpitype;
displacements[true_rrcnt] = disp;
blocklengths[true_rrcnt] = blocklen;
prev_end = disp + blocklen;
}
}
else {
else { /* request rc is not contiguous, must create a new MPI datatype */
true_rrcnt++;
if((mpierr = MPI_Type_create_subarray(sa_ndims, gdims,
sacount, sastart,MPI_ORDER_C
Expand All @@ -245,6 +245,7 @@ int get_vard_mpidatatype(io_desc_t *iodesc, MPI_Offset gdim0, PIO_Offset unlimdi
}
true_rrcnt++;

/* concatenate all MPI datatypes into filetype */
if((mpierr = MPI_Type_create_struct(true_rrcnt, blocklengths, displacements, subarray, filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

Expand Down Expand Up @@ -481,29 +482,58 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
if (regioncnt == num_regions - 1)
{
#ifdef USE_VARD_WRITE
MPI_Datatype filetype = MPI_DATATYPE_NULL;
MPI_Aint var0_offset, var_offsets[nvars];
MPI_Offset vari_offset, vard_llen=0;
MPI_Datatype vartypes[nvars];
MPI_Datatype filetype = MPI_DATATYPE_NULL;
int blocklens[nvars];
int fvartype, var0_id;
int numReqs=0;
void *vard_bufptr;
int doFlush[nvars]; /* whether to flush or not */

/* construct doFlush[], so later when looping through nvars,
* it tells whether to flush or not.
*/
for (int nv = 0; nv < nvars; nv++) {
/* Get the var info. */
if ((ierr = get_var_desc(varids[nv], &file->varlist, &vdesc)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);
if (nv == 0) { /* first variable */
fvartype = vdesc->pio_type; /* first var's var type */
continue;
}
if (fvartype != vdesc->pio_type) {
/* nv's external datatype is different from nv-1 */
doFlush[nv-1] = 1;
fvartype = vdesc->pio_type;
}
else /* same as nv-1, no flush */
doFlush[nv-1] = 0;
}
doFlush[nvars-1] = 1; /* flush when reach the last variable */
#endif
int fvartype;
/* For each variable to be written. */
for (int nv = 0; nv < nvars; nv++)
{

/* Get the var info. */
if ((ierr = get_var_desc(varids[nv], &file->varlist, &vdesc)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);

#if USE_VARD_WRITE
/* In PnetCDF 1.10.0 and later, supports type
* conversion in vard APIs. However, it requires all
* variables accessed by the filetype are of the same
* NC data type.
* */
if ((ierr = ncmpi_inq_vartype(file->fh, varids[nv], &fvartype)))
/* PnetCDF 1.10.0 and later support type conversion in
* vard APIs. However, it requires all variables
* accessed by the filetype are of the same NC data
* type.
*/

/* obtain file offset of variable nv */
if ((ierr = ncmpi_inq_varoffset(file->fh, varids[nv], &vari_offset)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);
if (fvartype != vdesc->pio_type){
LOG((0, "ERROR: pnetcdf vard does not support type conversion varid %d filetype %d piotype %d"
,varids[nv],fvartype, vdesc->pio_type));
}

if (numReqs == 0) { /* 1st variable of same datatype */
var0_offset = vari_offset;
var0_id = varids[nv];
}
/* calculate the offset relative to the first var */
var_offsets[numReqs] = vari_offset - var0_offset;
blocklens[nv] = 1; /* 1 for each vartypes[nv] */

/* If this is the first variable or the frame has changed between variables (this should be rare) */
if(nv==0 || (nv > 0 && frame != NULL && frame[nv] != frame[nv-1])){
Expand All @@ -520,11 +550,19 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
thisframe = frame[nv];
else
thisframe = 0;

ierr = get_vard_mpidatatype(iodesc, gdim0, unlimdimoffset,
rrcnt, ndims, fndims,
thisframe, startlist, countlist, &filetype);
thisframe, startlist, countlist,
&vartypes[numReqs]);
}
else /* reuse the previous variable's datatype */
vartypes[numReqs] = vartypes[numReqs-1];
#else
/* Get the var info. */
if ((ierr = get_var_desc(varids[nv], &file->varlist, &vdesc)))
return pio_err(NULL, file, ierr, __FILE__, __LINE__);

if (vdesc->record >= 0 && ndims < fndims)
for (int rc = 0; rc < rrcnt; rc++)
startlist[rc][0] = frame[nv];
Expand All @@ -533,15 +571,60 @@ int write_darray_multi_par(file_desc_t *file, int nvars, int fndims, const int *
bufptr = (void *)((char *)iobuf + nv * iodesc->mpitype_size * llen);

#if USE_VARD_WRITE
LOG((3, "vard: call ncmpi_put_vard llen = %d %d", llen, iodesc->mpitype_size ));
ierr = ncmpi_put_vard_all(file->fh, varids[nv], filetype, bufptr, llen, iodesc->mpitype);
LOG((3, "vard: return ncmpi_put_vard ierr = %d", ierr));
if(nv==nvars-1 && filetype != MPI_DATATYPE_NULL)
{
int mpierr;
if((mpierr = MPI_Type_free(&filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
if (numReqs == 0) { /* first var of the same type */
vard_bufptr = bufptr; /* preserve variable ID */
vard_llen = llen; /* reset I/O request size */
}
numReqs++;

if (doFlush[nv]) { /* flush the data now */
int mpierr;
/* concatenate vartypes[0...numReqs-1] */
if (numReqs > 1) {
/* check and remove NULL vartype */
int i, j=0;
for (i=0; i<numReqs; i++) {
if (vartypes[i] != MPI_DATATYPE_NULL) {
if (j < i) vartypes[j] = vartypes[i];
j++;
}
}
if (j > 0) { /* at least one vartypes[] is not NULL */
/* concatenate non-NULL vartypes */
if((mpierr = MPI_Type_create_struct(j, blocklens, var_offsets, vartypes, &filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

if((mpierr = MPI_Type_commit(&filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);

/* free vartypes */
for (i=j-1; i>0; i--) {
if (vartypes[i] == vartypes[i-1]) continue;
if((mpierr = MPI_Type_free(&vartypes[i])))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
if((mpierr = MPI_Type_free(&vartypes[0])))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
else /* all vartypes[] are NULL */
filetype = MPI_DATATYPE_NULL;
}
else /* there is only one variable to flush */
filetype = vartypes[0];

LOG((3, "vard: call ncmpi_put_vard llen = %d %d", llen, iodesc->mpitype_size ));
ierr = ncmpi_put_vard_all(file->fh, var0_id, filetype, vard_bufptr, vard_llen, iodesc->mpitype);
LOG((3, "vard: return ncmpi_put_vard ierr = %d", ierr));
if(filetype != MPI_DATATYPE_NULL)
{
if((mpierr = MPI_Type_free(&filetype)))
return check_mpi(NULL, mpierr, __FILE__, __LINE__);
}
vard_llen = 0; /* reset request size to 0 */
numReqs = 0;
}
else /* don't flush yet, accumulate the request size */
vard_llen += llen;
#else
if (vdesc->nreqs % PIO_REQUEST_ALLOC_CHUNK == 0)
{
Expand Down Expand Up @@ -1219,8 +1302,6 @@ int pio_read_darray_nc(file_desc_t *file, io_desc_t *iodesc, int vid, void *iobu
else
unlimdimoffset = gdim0;

filetype = MPI_DATATYPE_NULL;

ierr = get_vard_mpidatatype(iodesc, gdim0, unlimdimoffset,
rrlen, ndims, fndims,
vdesc->record, startlist, countlist, &filetype);
Expand Down

0 comments on commit cad03a9

Please sign in to comment.