Skip to content

Commit

Permalink
Merge pull request #1 from PARALLELIO/master
Browse files Browse the repository at this point in the history
Pulling changes from the original master.
  • Loading branch information
Kevin Paul committed Jun 4, 2015
2 parents 42a86f0 + e21308e commit 76b6651
Show file tree
Hide file tree
Showing 7 changed files with 587 additions and 196 deletions.
8 changes: 3 additions & 5 deletions src/pio.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
#define PIO_OFFSET MPI_OFFSET
#define PIO_Offset MPI_Offset
#define PIO_MAX_VARS NC_MAX_VARS
#define PIO_MAX_REQUESTS 100*PIO_MAX_VARS
#define PIO_MAX_REQUESTS 100


/**
Expand All @@ -50,11 +50,9 @@ typedef struct var_desc_t
{
int record;
int ndims;
int type;
bool distributed;
int request; // used for pnetcdf iput calls
int fillrequest; //used for fill in pnetcdf iput for subset rearranger

int request[PIO_MAX_REQUESTS]; // used for pnetcdf iput calls
int nreqs;
void *fillbuf;
void *iobuf;

Expand Down
10 changes: 8 additions & 2 deletions src/pio_c_put_template.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,17 @@ int PIO_function()
#ifdef _PNETCDF
case PIO_IOTYPE_PNETCDF:
vdesc = file->varlist + varid;
int reqn;
reqn = vdesc->nreqs;
request = vdesc->request;
while(*request != NC_REQ_NULL){
reqn++;
request = vdesc->request+reqn;
}
if(ios->io_rank==0){
request = &(vdesc->request);
ierr = ncmpi_function();
}else{
vdesc->request = PIO_REQ_NULL;
*request = PIO_REQ_NULL;
}
flush_output_buffer(file, false, 0);
break;
Expand Down
83 changes: 37 additions & 46 deletions src/pio_darray.c
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,18 @@ void compute_buffer_init(iosystem_desc_t ios)
// printf("%s %d %d %ld %ld\n",__FILE__,__LINE__,ios->io_rank,iodesc->llen, tdsize);
// ierr = ncmpi_put_varn_all(ncid, vid, iodesc->maxregions, startlist, countlist,
// IOBUF, iodesc->llen, iodesc->basetype);

if(vdesc->request != NC_REQ_NULL){
printf("%s %d %d\n",__FILE__,__LINE__,vid);
int reqn=0;

while(vdesc->request[reqn] != NC_REQ_NULL){
reqn++;
}
vdesc->distributed = true;

ierr = ncmpi_bput_varn(ncid, vid, rrcnt, startlist, countlist,
IOBUF, iodesc->llen, iodesc->basetype, &(vdesc->request));
if(vdesc->request == NC_REQ_NULL){
vdesc->request = PIO_REQ_NULL; //keeps wait calls in sync
IOBUF, iodesc->llen, iodesc->basetype, vdesc->request+reqn);
if(vdesc->request[reqn] == NC_REQ_NULL){
vdesc->request[reqn] = PIO_REQ_NULL; //keeps wait calls in sync
}
vdesc->nreqs = reqn;

// printf("%s %d %X %d\n",__FILE__,__LINE__,IOBUF,request);
for(i=0;i<rrcnt;i++){
Expand Down Expand Up @@ -546,22 +547,22 @@ int pio_write_darray_multi_nc(file_desc_t *file, const int nvars, const int vid[
}
}
bufptr = (void *)((char *) IOBUF + nv*tsize*llen);
if(vdesc->request != NC_REQ_NULL){
printf("%s %d %d\n",__FILE__,__LINE__,vid[nv]);
}
vdesc->distributed = true;


int reqn=0;
while(vdesc->request[reqn] != NC_REQ_NULL){
reqn++;
}
ierr = ncmpi_iput_varn(ncid, vid[nv], rrcnt, startlist, countlist,
bufptr, llen, basetype, &(vdesc->request));
bufptr, llen, basetype, vdesc->request+reqn);
/*
ierr = ncmpi_bput_varn(ncid, vid[nv], rrcnt, startlist, countlist,
bufptr, llen, basetype, &(vdesc->request));
*/
if(vdesc->request == NC_REQ_NULL){
vdesc->request = PIO_REQ_NULL; //keeps wait calls in sync
if(vdesc->request[reqn] == NC_REQ_NULL){
vdesc->request[reqn] = PIO_REQ_NULL; //keeps wait calls in sync
}

vdesc->nreqs = reqn;
printf("%s %d %d %d\n",__FILE__,__LINE__,vdesc->nreqs,vdesc->request[reqn-1]);
}
for(i=0;i<rrcnt;i++){
//printf("%d %ld %ld %ld %ld\n",i,startlist[i][0],startlist[i][1],countlist[i][0],countlist[i][1]);
Expand Down Expand Up @@ -700,12 +701,6 @@ int PIOc_write_darray_multi(const int ncid, const int vid[], const int ioid, con
iodesc->maxfillregions, iodesc->fillregion, iodesc->holegridsize,
iodesc->holegridsize, iodesc->num_aiotasks,
vdesc0->fillbuf, frame);
for(int nv=0;nv<nvars;nv++){
var_desc_t *vdesc = file->varlist+vid[nv];
vdesc->fillrequest = vdesc->request;
vdesc->request = NC_REQ_NULL;
}

}

ierr = pio_write_darray_multi_nc(file, nvars, vid,
Expand Down Expand Up @@ -794,7 +789,7 @@ int PIOc_write_darray_multi(const int ncid, const int vid[], const int ioid, con
wmb = wmb->next;
}
/* flush the previous record before starting a new one. this is collective */
if((vdesc->request != NC_REQ_NULL) ||
if((vdesc->request[0] != NC_REQ_NULL) ||
(wmb->frame != NULL && vdesc->record != wmb->frame[0])){
needsflush = 2; // flush to disk
}
Expand Down Expand Up @@ -1191,8 +1186,8 @@ int pio_read_darray_nc(file_desc_t *file, io_desc_t *iodesc, const int vid, void
}

if(tmp_bufsize>0){
startlist[rrlen] = (PIO_Offset *) malloc(fndims * sizeof(PIO_Offset));
countlist[rrlen] = (PIO_Offset *) malloc(fndims * sizeof(PIO_Offset));
startlist[rrlen] = (PIO_Offset *) bget(fndims * sizeof(PIO_Offset));
countlist[rrlen] = (PIO_Offset *) bget(fndims * sizeof(PIO_Offset));

for(int j=0;j<fndims; j++){
startlist[rrlen][j] = start[j];
Expand All @@ -1205,8 +1200,8 @@ int pio_read_darray_nc(file_desc_t *file, io_desc_t *iodesc, const int vid, void
ierr = ncmpi_get_varn_all(file->fh, vid, rrlen, startlist,
countlist, IOBUF, iodesc->llen, iodesc->basetype);
for(i=0;i<rrlen;i++){
free(startlist[i]);
free(countlist[i]);
brel(startlist[i]);
brel(countlist[i]);
}
}
}
Expand Down Expand Up @@ -1321,38 +1316,34 @@ int flush_output_buffer(file_desc_t *file, bool force, PIO_Offset addsize)
#ifdef MPIO_ONESIDED
/*onesided optimization requires that all of the requests in a wait_all call represent
a contiguous block of data in the file */
if(rcnt>0 && (prev_type != vdesc->type ||
prev_dist != vdesc->distributed ||
prev_record != vdesc->record ||
(vdesc->request == NC_REQ_NULL &&
vdesc->fillrequest == NC_REQ_NULL))){
if(rcnt>0 && (prev_record != vdesc->record ||
vdesc->nreqs==0)){
if(file->iosystem->io_rank==0) printf("%s %d %d\n",__FILE__,__LINE__,rcnt);
ierr = ncmpi_wait_all(file->fh, rcnt, request,status);
rcnt=0;
}
prev_record = vdesc->record;
prev_dist = vdesc->distributed;
prev_type = vdesc->type;
#endif
// printf("%s %d %d %d %d %d \n",__FILE__,__LINE__,i,rcnt,vdesc->request,vdesc->fillrequest);

if(vdesc->request != NC_REQ_NULL){
// if(file->iosystem->io_rank==0) printf("%s %d %d %d %d %d %d\n",__FILE__,__LINE__,i,vdesc->request,vdesc->distributed, vdesc->record, vdesc->type);
request[rcnt++] = max(vdesc->request,NC_REQ_NULL);
vdesc->request = NC_REQ_NULL;
}
if(vdesc->fillrequest != NC_REQ_NULL){
//if(file->iosystem->io_rank==0) printf("%s %d %d %d %d\n",__FILE__,__LINE__,i,vdesc->fillrequest, vdesc->distributed);
request[rcnt++]=max(vdesc->fillrequest,NC_REQ_NULL);
vdesc->fillrequest = NC_REQ_NULL;
// printf("%s %d %d %d %d %d \n",__FILE__,__LINE__,i,rcnt,vdesc->request,vdesc->fillrequest);
int reqcnt=0;
while(vdesc->request[reqcnt] != NC_REQ_NULL) {
// if(file->iosystem->io_rank==0) printf("%s %d %d %d %d %d %d\n",__FILE__,__LINE__,i,vdesc->request,vdesc->distributed, vdesc->record, vdesc->type);
printf("%s %d %d %d\n",__FILE__,__LINE__,i,vdesc->request[0]);
request[rcnt++] = max(vdesc->request[reqcnt],NC_REQ_NULL);
vdesc->request[reqcnt] = NC_REQ_NULL;
reqcnt++;
}
vdesc->nreqs=0;
// if(file->iosystem->io_rank < 2) printf("%s %d varid=%d\n",__FILE__,__LINE__,i);
#ifdef FLUSH_EVERY_VAR
ierr = ncmpi_wait_all(file->fh, rcnt, request,status);
rcnt=0;
#endif

}
if(file->iosystem->io_rank==0){
printf("%s %d %d\n",__FILE__,__LINE__,rcnt);
}
if(rcnt>0){
if(file->iosystem->io_rank==0){
printf("%s %d %d\n",__FILE__,__LINE__,rcnt);
Expand Down
16 changes: 8 additions & 8 deletions src/pio_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ int PIOc_openfile(const int iosysid, int *ncidp, int *iotype,
file->varlist[i].record = -1;
file->varlist[i].ndims = -1;
#ifdef _PNETCDF
file->varlist[i].request = NC_REQ_NULL;
file->varlist[i].fillrequest = NC_REQ_NULL;
file->varlist[i].distributed = false;
file->varlist[i].type = 0;
for(int req=0;req<PIO_MAX_REQUESTS;req++){
file->varlist[i].request[req] = NC_REQ_NULL;
}
file->varlist[i].nreqs=0;
#endif
file->varlist[i].fillbuf = NULL;
file->varlist[i].iobuf = NULL;
Expand Down Expand Up @@ -164,10 +164,10 @@ int PIOc_createfile(const int iosysid, int *ncidp, int *iotype,
file->varlist[i].record = -1;
file->varlist[i].ndims = -1;
#ifdef _PNETCDF
file->varlist[i].request = NC_REQ_NULL;
file->varlist[i].fillrequest = NC_REQ_NULL;
file->varlist[i].distributed = false;
file->varlist[i].type = 0;
for(int req=0;req<PIO_MAX_REQUESTS;req++){
file->varlist[i].request[req] = NC_REQ_NULL;
}
file->varlist[i].nreqs=0;
#endif
file->varlist[i].fillbuf = NULL;
file->varlist[i].iobuf = NULL;
Expand Down
3 changes: 0 additions & 3 deletions src/pio_nc.c
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,8 @@ int PIOc_def_var (int ncid, const char *name, nc_type xtype, int ndims, const in
ierr = iotype_error(file->iotype,__FILE__,__LINE__);
}
}
if(ierr != PIO_NOERR)
printf("%s %d %s\n",__FILE__,__LINE__,name);
ierr = check_netcdf(file, ierr, errstr,__LINE__);
mpierr = MPI_Bcast(varidp , 1, MPI_INT, ios->ioroot, ios->my_comm);
file->varlist[*varidp].type = xtype;
return ierr;
}

Expand Down
Loading

0 comments on commit 76b6651

Please sign in to comment.