Skip to content

Commit

Permalink
Merge pull request ESMCI#1579 from NCAR/ejh_async_nc_create
Browse files Browse the repository at this point in the history
starting to get async working with netcdf integration
  • Loading branch information
edwardhartnett authored Aug 7, 2019
2 parents 20457f9 + 4bea2db commit 04d8dc5
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 48 deletions.
5 changes: 5 additions & 0 deletions src/clib/pio.h
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,11 @@ extern "C" {
int nc_def_iosystemm(MPI_Comm comp_comm, int num_iotasks, int stride, int base, int rearr,
int *iosysidp);

int nc_def_async(MPI_Comm world, int num_io_procs, int *io_proc_list,
int component_count, int *num_procs_per_comp, int **proc_list,
MPI_Comm *io_comm, MPI_Comm *comp_comm, int rearranger,
int *iosysidp);

/* Set the default IOsystem ID. */
int nc_set_iosystem(int iosysid);

Expand Down
66 changes: 62 additions & 4 deletions src/ncint/ncint_pio.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
/** This is te default io system id. */
extern int diosysid;

/** Have we initialized the netCDF integration layer? This is where we
* register our dispatch layer with netcdf-c. */
extern int ncint_initialized;

/**
* Same as PIOc_Init_Intracomm().
*
Expand All @@ -28,6 +24,10 @@ nc_def_iosystemm(MPI_Comm comp_comm, int num_iotasks, int stride, int base,
{
int ret;

/* Make sure PIO was initialized. */
if ((ret = PIO_NCINT_initialize()))
return ret;

/* Call the PIOc_ function to initialize the intracomm. */
if ((ret = PIOc_Init_Intracomm(comp_comm, num_iotasks, stride, base, rearr,
iosysidp)))
Expand All @@ -39,6 +39,64 @@ nc_def_iosystemm(MPI_Comm comp_comm, int num_iotasks, int stride, int base,
return PIO_NOERR;
}

/**
* Same as PIOc_init_async().
*
* @param world the communicator containing all the available tasks.
* @param num_io_procs the number of processes for the IO component.
* @param io_proc_list an array of lenth num_io_procs with the
* processor number for each IO processor. If NULL then the IO
* processes are assigned starting at processes 0.
* @param component_count number of computational components
* @param num_procs_per_comp an array of int, of length
* component_count, with the number of processors in each computation
* component.
* @param proc_list an array of arrays containing the processor
* numbers for each computation component. If NULL then the
* computation components are assigned processors sequentially
* starting with processor num_io_procs.
* @param user_io_comm pointer to an MPI_Comm. If not NULL, it will
* get an MPI duplicate of the IO communicator. (It is a full
* duplicate and later must be freed with MPI_Free() by the caller.)
* @param user_comp_comm pointer to an array of pointers to MPI_Comm;
* the array is of length component_count. If not NULL, it will get an
* MPI duplicate of each computation communicator. (These are full
* duplicates and each must later be freed with MPI_Free() by the
* caller.)
* @param rearranger the default rearranger to use for decompositions
* in this IO system. Only PIO_REARR_BOX is supported for
* async. Support for PIO_REARR_SUBSET will be provided in a future
* version.
* @param iosysidp pointer to array of length component_count that
* gets the iosysid for each component.
*
* @return PIO_NOERR on success, error code otherwise.
* @author Ed Hartnett
*/
int
nc_def_async(MPI_Comm world, int num_io_procs, int *io_proc_list,
int component_count, int *num_procs_per_comp, int **proc_list,
MPI_Comm *io_comm, MPI_Comm *comp_comm, int rearranger,
int *iosysidp)
{
int ret;

/* Make sure PIO was initialized. */
if ((ret = PIO_NCINT_initialize()))
return ret;

/* Call the PIOc_ function to initialize the intracomm. */
if ((ret = PIOc_init_async(world, num_io_procs, io_proc_list,
component_count, num_procs_per_comp, proc_list,
io_comm, comp_comm, rearranger, iosysidp)))
return ret;

/* Remember the io system id. */
diosysid = *iosysidp;

return PIO_NOERR;
}

/**
* Set the default iosystemID.
*
Expand Down
15 changes: 9 additions & 6 deletions src/ncint/ncintdispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,17 @@ PIO_NCINT_initialize(void)
{
int ret;

NCINT_dispatch_table = &NCINT_dispatcher;
if (!ncint_initialized)
{
NCINT_dispatch_table = &NCINT_dispatcher;

PLOG((1, "Adding user-defined format for netCDF PIO integration"));
PLOG((1, "Adding user-defined format for netCDF PIO integration"));

/* Add our user defined format. */
if ((ret = nc_def_user_format(NC_UDF0, &NCINT_dispatcher, NULL)))
return ret;
ncint_initialized++;
/* Add our user defined format. */
if ((ret = nc_def_user_format(NC_UDF0, &NCINT_dispatcher, NULL)))
return ret;
ncint_initialized++;
}

return NC_NOERR;
}
Expand Down
5 changes: 4 additions & 1 deletion tests/ncint/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ AM_CPPFLAGS = -I$(top_srcdir)/src/clib
LDADD = ${top_builddir}/src/clib/libpioc.la

# Build the test for make check.
check_PROGRAMS = tst_pio_udf
check_PROGRAMS = tst_pio_udf tst_pio_async

tst_pio_udf_SOURCES = tst_pio_udf.c pio_err_macros.h
tst_pio_async_SOURCES = tst_pio_async.c pio_err_macros.h

if RUN_TESTS
# Tests will run from a bash script.
Expand Down
64 changes: 64 additions & 0 deletions tests/ncint/pio_err_macros.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/* This is part of the netCDF package.
Copyright 2018 University Corporation for Atmospheric Research/Unidata
See COPYRIGHT file for conditions of use.
Common includes, defines, etc., for test code in the libsrc4 and
nc_test4 directories.
Ed Hartnett, Russ Rew, Dennis Heimbigner
*/

#ifndef _PIO_ERR_MACROS_H
#define _PIO_ERR_MACROS_H

#include "config.h"
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

/* Err is used to keep track of errors within each set of tests,
* total_err is the number of errors in the entire test program, which
* generally cosists of several sets of tests. */
static int total_err = 0, err = 0;

/* This macro prints an error message with line number and name of
* test program. */
#define PERR do { \
fflush(stdout); /* Make sure our stdout is synced with stderr. */ \
err++; \
fprintf(stderr, "Sorry! Unexpected result, %s, line: %d\n", \
__FILE__, __LINE__); \
fflush(stderr); \
return 2; \
} while (0)

/* After a set of tests, report the number of errors, and increment
* total_err. */
#define PSUMMARIZE_ERR do { \
if (err) \
{ \
printf("%d failures\n", err); \
total_err += err; \
err = 0; \
} \
else \
if (!my_rank) \
printf("ok.\n"); \
} while (0)

/* This macro prints out our total number of errors, if any, and exits
* with a 0 if there are not, or a 2 if there were errors. Make will
* stop if a non-zero value is returned from a test program. */
#define PFINAL_RESULTS do { \
if (total_err) \
{ \
printf("%d errors detected! Sorry!\n", total_err); \
return 2; \
} \
if (!my_rank) \
printf("*** Tests successful!\n\n"); \
return 0; \
} while (0)

#endif /* _PIO_ERR_MACROS_H */
124 changes: 124 additions & 0 deletions tests/ncint/tst_pio_async.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/* Test netcdf integration layer.
Ed Hartnett
*/

#include "config.h"
#include <pio.h>
#include "pio_err_macros.h"

#define FILE_NAME "tst_pio_async.nc"
#define VAR_NAME "data_var"
#define DIM_NAME_UNLIMITED "dim_unlimited"
#define DIM_NAME_X "dim_x"
#define DIM_NAME_Y "dim_y"
#define DIM_LEN_X 4
#define DIM_LEN_Y 4
#define NDIM2 2
#define NDIM3 3

extern NC_Dispatch NCINT_dispatcher;

/* Number of computational components to create. */
#define COMPONENT_COUNT 1

int
main(int argc, char **argv)
{
int my_rank;
int ntasks;

/* Initialize MPI. */
if (MPI_Init(&argc, &argv)) PERR;

/* Learn my rank and the total number of processors. */
if (MPI_Comm_rank(MPI_COMM_WORLD, &my_rank)) PERR;
if (MPI_Comm_size(MPI_COMM_WORLD, &ntasks)) PERR;

if (!my_rank)
printf("\n*** Testing netCDF integration layer.\n");
if (!my_rank)
printf("*** testing simple async use of netCDF integration layer...");
{
int ncid, ioid;
/* int dimid[NDIM3], varid; */
int dimlen[NDIM3] = {NC_UNLIMITED, DIM_LEN_X, DIM_LEN_Y};
int iosysid;
size_t elements_per_pe;
size_t *compdof; /* The decomposition mapping. */
/* int *my_data; */
/* int *data_in; */
int num_procs2[COMPONENT_COUNT] = {3};
int num_io_procs = 1;
int i;

/* Turn on logging for PIO library. */
PIOc_set_log_level(3);

/* Initialize the intracomm. */
if (nc_def_async(MPI_COMM_WORLD, num_io_procs, NULL, COMPONENT_COUNT,
num_procs2, NULL, NULL, NULL, PIO_REARR_BOX, &iosysid))
PERR;

if (my_rank)
{
/* Create a file with a 3D record var. */
if (nc_create(FILE_NAME, NC_UDF0, &ncid)) PERR;
/* if (nc_def_dim(ncid, DIM_NAME_UNLIMITED, dimlen[0], &dimid[0])) PERR; */
/* if (nc_def_dim(ncid, DIM_NAME_X, dimlen[1], &dimid[1])) PERR; */
/* if (nc_def_dim(ncid, DIM_NAME_Y, dimlen[2], &dimid[2])) PERR; */
/* if (nc_def_var(ncid, VAR_NAME, NC_INT, NDIM3, dimid, &varid)) PERR; */

/* Calculate a decomposition for distributed arrays. */
elements_per_pe = DIM_LEN_X * DIM_LEN_Y / ntasks;
if (!(compdof = malloc(elements_per_pe * sizeof(size_t))))
PERR;
for (i = 0; i < elements_per_pe; i++)
compdof[i] = my_rank * elements_per_pe + i;

/* Create the PIO decomposition for this test. */
if (nc_def_decomp(iosysid, PIO_INT, NDIM2, &dimlen[1], elements_per_pe,
compdof, &ioid, 1, NULL, NULL)) PERR;
free(compdof);

/* /\* Create some data on this processor. *\/ */
/* if (!(my_data = malloc(elements_per_pe * sizeof(int)))) PERR; */
/* for (i = 0; i < elements_per_pe; i++) */
/* my_data[i] = my_rank * 10 + i; */

/* /\* Write some data with distributed arrays. *\/ */
/* if (nc_put_vard_int(ncid, varid, ioid, 0, my_data)) PERR; */
/* if (nc_close(ncid)) PERR; */

/* /\* Check that our user-defined format has been added. *\/ */
/* if (nc_inq_user_format(NC_UDF0, &disp_in, NULL)) PERR; */
/* if (disp_in != &NCINT_dispatcher) PERR; */

/* /\* Open the file. *\/ */
/* if (nc_open(FILE_NAME, NC_UDF0, &ncid)) PERR; */

/* /\* Read distributed arrays. *\/ */
/* if (!(data_in = malloc(elements_per_pe * sizeof(int)))) PERR; */
/* if (nc_get_vard_int(ncid, varid, ioid, 0, data_in)) PERR; */

/* /\* Check results. *\/ */
/* for (i = 0; i < elements_per_pe; i++) */
/* if (data_in[i] != my_data[i]) PERR; */

/* Close file. */
if (nc_close(ncid)) PERR;

/* /\* Free resources. *\/ */
/* free(data_in); */
/* free(my_data); */
if (nc_free_decomp(ioid)) PERR;
if (nc_free_iosystem(iosysid)) PERR;
}
}
if (!my_rank)
PSUMMARIZE_ERR;

/* Finalize MPI. */
MPI_Finalize();
PFINAL_RESULTS;
}
Loading

0 comments on commit 04d8dc5

Please sign in to comment.