summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMohamad Chaarawi <chaarawi@hdfgroup.org>2014-05-02 20:33:36 (GMT)
committerMohamad Chaarawi <chaarawi@hdfgroup.org>2014-05-02 20:33:36 (GMT)
commit5e172835bbcedfc9897f3dd653c0a49e52c2cd52 (patch)
treeda1da0444213ccc2511e3de0638f3f42657373d1 /src
parent646bdfd0fbdd6446a817475803cb15aa67fb53fe (diff)
downloadhdf5-5e172835bbcedfc9897f3dd653c0a49e52c2cd52.zip
hdf5-5e172835bbcedfc9897f3dd653c0a49e52c2cd52.tar.gz
hdf5-5e172835bbcedfc9897f3dd653c0a49e52c2cd52.tar.bz2
[svn-r25150] Add coresident support:
- need to set H5ENV_CORESIDENT to 1 to use that mode - still doing memcpy for raw data now.
Diffstat (limited to 'src')
-rw-r--r--src/H5VLiod.c127
-rw-r--r--src/H5VLiod_client.c10
-rw-r--r--src/H5VLiod_common.h2
-rw-r--r--src/H5VLiod_file.c6
-rw-r--r--src/H5VLiod_server.c77
-rw-r--r--src/H5VLiod_trans.c8
6 files changed, 180 insertions, 50 deletions
diff --git a/src/H5VLiod.c b/src/H5VLiod.c
index 4072517..1db9dff 100644
--- a/src/H5VLiod.c
+++ b/src/H5VLiod.c
@@ -52,7 +52,7 @@ typedef struct H5VL_iod_axe_list_t {
static na_addr_t PEER;
static na_class_t *network_class = NULL;
-
+static int coresident = 0;
static AXE_task_t g_axe_id;
static H5VL_iod_axe_list_t axe_list;
@@ -489,7 +489,9 @@ H5VL__iod_create_and_forward(hg_id_t op_id, H5RQ_type_t op_type,
hg_status_t status;
/* test the operation status */
+ FUNC_LEAVE_API_THREADSAFE;
ret = HG_Wait(*((hg_request_t *)request->req), HG_MAX_IDLE_TIME, &status);
+ FUNC_ENTER_API_THREADSAFE;
if(HG_FAIL == ret) {
fprintf(stderr, "failed to wait on request\n");
request->status = H5ES_STATUS_FAIL;
@@ -545,6 +547,7 @@ EFF_init(MPI_Comm comm, MPI_Info UNUSED info)
na_addr_t ion_target;
double axe_seed;
char addr_name[H5VL_IOD_MAX_ADDR_NAME];
+ char *coresident_s = NULL;
herr_t ret_value = SUCCEED;
MPI_Comm_size(comm, &num_procs);
@@ -560,65 +563,97 @@ EFF_init(MPI_Comm comm, MPI_Info UNUSED info)
axe_list.head = NULL;
axe_list.tail = NULL;
- /* MSC - This is a temporary solution for connecting to the server
- using mercury */
- /* Only rank 0 reads file */
- if (my_rank == 0) {
- int count, line=0, num_ions;
- FILE *config;
- char config_addr_name[H5VL_IOD_MAX_ADDR_NAME];
+ coresident_s = getenv ("H5ENV_CORESIDENT");
+ if(NULL != coresident_s)
+ coresident = atoi(coresident_s);
+
- config = fopen("port.cfg", "r");
+ if(1 != coresident) {
+ /* MSC - This is a temporary solution for connecting to the server
+ using mercury */
+ /* Only rank 0 reads file */
+ if (my_rank == 0) {
+ int count, line=0, num_ions;
+ FILE *config;
+ char config_addr_name[H5VL_IOD_MAX_ADDR_NAME];
+ char cwd[1024];
+
+ if (getcwd(cwd, sizeof(cwd)) != NULL)
+ fprintf(stdout, "Reading port.cfg from: %s\n", cwd);
+ else {
+ fprintf(stderr, "etcwd() error\n");
+ return FAIL;
+ }
+
+ config = fopen("port.cfg", "r");
+ if (config == NULL) {
+ fprintf(stderr, "could not open port.cfg file.");
+ return FAIL;
+ }
- fscanf(config, "%d\n", &num_ions);
+ fscanf(config, "%d\n", &num_ions);
#if H5_EFF_DEBUG
- printf("Found %d servers\n", num_ions);
+ printf("Found %d servers\n", num_ions);
#endif
- /* read a line */
- if(fgets(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, config) != NULL) {
- strncpy(addr_name, config_addr_name, H5VL_IOD_MAX_ADDR_NAME);
- count = 1;
- while(num_procs > line + (count*num_ions)) {
- MPI_Send(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, MPI_BYTE,
- line + (count*num_ions), tag, comm);
- count ++;
+ /* read a line */
+ if(fgets(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, config) != NULL) {
+ strncpy(addr_name, config_addr_name, H5VL_IOD_MAX_ADDR_NAME);
+ count = 1;
+ while(num_procs > line + (count*num_ions)) {
+ MPI_Send(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, MPI_BYTE,
+ line + (count*num_ions), tag, comm);
+ count ++;
+ }
+ line++;
}
- line++;
- }
- while (fgets(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, config) != NULL) {
- count = 0;
- while(num_procs > line + (count*num_ions)) {
- MPI_Send(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, MPI_BYTE,
- line + (count*num_ions), tag, comm);
- count ++;
+ while (fgets(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, config) != NULL) {
+ count = 0;
+ while(num_procs > line + (count*num_ions)) {
+ MPI_Send(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, MPI_BYTE,
+ line + (count*num_ions), tag, comm);
+ count ++;
+ }
+ line ++;
}
- line ++;
+ fclose(config);
+ }
+ else {
+ MPI_Recv(addr_name, H5VL_IOD_MAX_ADDR_NAME, MPI_BYTE,
+ 0, tag, comm, MPI_STATUS_IGNORE);
}
- fclose(config);
- }
- else {
- MPI_Recv(addr_name, H5VL_IOD_MAX_ADDR_NAME, MPI_BYTE,
- 0, tag, comm, MPI_STATUS_IGNORE);
- }
#if H5_EFF_DEBUG
- printf("CN %d Connecting to ION %s\n", my_rank, addr_name);
+ printf("CN %d Connecting to ION %s\n", my_rank, addr_name);
#endif
+ }
/* initialize Mercury stuff */
- network_class = NA_MPI_Init(NULL, 0);
- //network_class = NA_Initialize("tcp@mpi://0.0.0.0:0", 0);
+ //network_class = NA_MPI_Init(NULL, 0);
+ network_class = NA_Initialize("tcp@mpi://0.0.0.0:0", 0);
if (HG_SUCCESS != HG_Init(network_class)) {
fprintf(stderr, "Failed to initialize Mercury\n");
return FAIL;
}
- if (NA_SUCCESS != NA_Addr_lookup_wait(network_class, addr_name, &ion_target)) {
- fprintf(stderr, "Server lookup failed\n");
- return FAIL;
+
+ if(coresident == 1) {
+ ret_value = EFF_setup_coresident(comm, MPI_INFO_NULL);
+ if(ret_value != SUCCEED)
+ return FAIL;
+
+ if (NA_SUCCESS != NA_Addr_self(network_class, &ion_target)) {
+ fprintf(stderr, "Server lookup failed\n");
+ return FAIL;
+ }
+ }
+ else {
+ if (NA_SUCCESS != NA_Addr_lookup_wait(network_class, addr_name, &ion_target)) {
+ fprintf(stderr, "Server lookup failed\n");
+ return FAIL;
+ }
}
PEER = ion_target;
@@ -685,12 +720,20 @@ EFF_finalize(void)
if(HG_Forward(PEER, H5VL_EFF_FINALIZE_ID, &ret_value, &ret_value, &hg_req) < 0)
return FAIL;
+ FUNC_LEAVE_API_THREADSAFE;
HG_Wait(hg_req, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE);
+ FUNC_ENTER_API_THREADSAFE;
/* Free Mercury request */
if(HG_Request_free(hg_req) != HG_SUCCESS)
return FAIL;
+ if(coresident == 1) {
+ ret_value = EFF_terminate_coresident();
+ if(ret_value != SUCCEED)
+ return FAIL;
+ }
+
/* Free addr id */
if (NA_SUCCESS != NA_Addr_free(network_class, PEER))
return FAIL;
@@ -9712,7 +9755,9 @@ H5VL_iod_cancel(void **req, H5ES_status_t *status)
HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "failed to ship attribute write");
/* Wait on the cancel request to return */
+ FUNC_LEAVE_API_THREADSAFE;
ret = HG_Wait(hg_req, HG_MAX_IDLE_TIME, &hg_status);
+ FUNC_ENTER_API_THREADSAFE;
/* If the actual wait Fails, then the status of the cancel
operation is unknown */
@@ -9787,7 +9832,9 @@ H5VL_iod_test(void **req, H5ES_status_t *status)
/* Test completion of the request if is still pending */
if(H5VL_IOD_PENDING == request->state) {
+ FUNC_LEAVE_API_THREADSAFE;
ret = HG_Wait(*((hg_request_t *)request->req), 0, &hg_status);
+ FUNC_ENTER_API_THREADSAFE;
if(HG_FAIL == ret) {
fprintf(stderr, "failed to wait on request\n");
request->status = H5ES_STATUS_FAIL;
diff --git a/src/H5VLiod_client.c b/src/H5VLiod_client.c
index 0f2d07e..be8ae9a 100644
--- a/src/H5VLiod_client.c
+++ b/src/H5VLiod_client.c
@@ -222,7 +222,9 @@ H5VL_iod_request_wait(H5VL_iod_file_t *file, H5VL_iod_request_t *request)
while(1) {
HDassert(request->state == H5VL_IOD_PENDING);
/* test the operation status */
+ FUNC_LEAVE_API_THREADSAFE;
ret = HG_Wait(*((hg_request_t *)request->req), 0, &status);
+ FUNC_ENTER_API_THREADSAFE;
if(HG_FAIL == ret) {
HERROR(H5E_FUNC, H5E_CANTINIT, "failed to wait on request\n");
request->status = H5ES_STATUS_FAIL;
@@ -249,7 +251,9 @@ H5VL_iod_request_wait(H5VL_iod_file_t *file, H5VL_iod_request_t *request)
tmp_req = cur_req->file_next;
HDassert(cur_req->state == H5VL_IOD_PENDING);
+ FUNC_LEAVE_API_THREADSAFE;
ret = HG_Wait(*((hg_request_t *)cur_req->req), 0, &tmp_status);
+ FUNC_ENTER_API_THREADSAFE;
if(HG_FAIL == ret) {
HERROR(H5E_FUNC, H5E_CANTINIT, "failed to wait on request\n");
cur_req->status = H5ES_STATUS_FAIL;
@@ -309,7 +313,9 @@ H5VL_iod_request_wait_all(H5VL_iod_file_t *file)
tmp_req = cur_req->file_next;
HDassert(cur_req->state == H5VL_IOD_PENDING);
+ FUNC_LEAVE_API_THREADSAFE;
ret = HG_Wait(*((hg_request_t *)cur_req->req), HG_MAX_IDLE_TIME, &status);
+ FUNC_ENTER_API_THREADSAFE;
if(HG_FAIL == ret) {
HERROR(H5E_FUNC, H5E_CANTINIT, "failed to wait on request\n");
cur_req->status = H5ES_STATUS_FAIL;
@@ -369,8 +375,10 @@ H5VL_iod_request_wait_some(H5VL_iod_file_t *file, const void *object)
/* If the request is pending on the object we want, complete it */
if(cur_req->obj == object &&
cur_req->state == H5VL_IOD_PENDING) {
+ FUNC_LEAVE_API_THREADSAFE;
ret = HG_Wait(*((hg_request_t *)cur_req->req), HG_MAX_IDLE_TIME,
&status);
+ FUNC_ENTER_API_THREADSAFE;
if(HG_FAIL == ret) {
HERROR(H5E_FUNC, H5E_CANTINIT, "failed to wait on request\n");
cur_req->status = H5ES_STATUS_FAIL;
@@ -852,11 +860,13 @@ H5VL_iod_request_complete(H5VL_iod_file_t *file, H5VL_iod_request_t *req)
if(HG_Forward(info->ion_target, info->read_id, &input, info->status, &hg_req) < 0)
HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "failed to ship operation");
+ FUNC_LEAVE_API_THREADSAFE;
if(HG_FAIL == HG_Wait(hg_req, HG_MAX_IDLE_TIME, &hg_status)) {
fprintf(stderr, "failed to wait on request\n");
req->status = H5ES_STATUS_FAIL;
req->state = H5VL_IOD_COMPLETED;
}
+ FUNC_ENTER_API_THREADSAFE;
if(addrs) {
free(addrs);
diff --git a/src/H5VLiod_common.h b/src/H5VLiod_common.h
index a0c4897..5a421ac 100644
--- a/src/H5VLiod_common.h
+++ b/src/H5VLiod_common.h
@@ -206,6 +206,8 @@ H5_DLL void H5VL_iod_free_list_free(void **free_list, size_t free_list_len);
#ifdef H5_HAVE_EFF
+H5_DLL herr_t EFF_setup_coresident(MPI_Comm comm, MPI_Info info);
+H5_DLL herr_t EFF_terminate_coresident(void);
H5_DLL void EFF__mercury_register_callbacks(void);
#ifdef H5_HAVE_INDEXING
diff --git a/src/H5VLiod_file.c b/src/H5VLiod_file.c
index 5c6f2d7..2349e36 100644
--- a/src/H5VLiod_file.c
+++ b/src/H5VLiod_file.c
@@ -67,9 +67,9 @@ H5VL_iod_server_file_create_cb(AXE_engine_t UNUSED axe_engine,
#if H5_EFF_DEBUG
fprintf(stderr, "Start file create %s ", input->name);
- fprintf(stderr, "with MDKV %"PRIx64" ", mdkv_id),
- fprintf(stderr, "with attrKV %"PRIx64"\n", attrkv_id),
- fprintf(stderr, "with OIDKV %"PRIx64"\n", oidkv_id),
+ fprintf(stderr, "with MDKV %"PRIx64" ", mdkv_id);
+ fprintf(stderr, "with attrKV %"PRIx64"\n", attrkv_id);
+ fprintf(stderr, "with OIDKV %"PRIx64"\n", oidkv_id);
#endif
/* convert HDF5 flags to IOD flags */
diff --git a/src/H5VLiod_server.c b/src/H5VLiod_server.c
index d2c8b78..2dd16c4 100644
--- a/src/H5VLiod_server.c
+++ b/src/H5VLiod_server.c
@@ -101,7 +101,8 @@ EFF_start_server(MPI_Comm comm, MPI_Info UNUSED info)
/******************* Initialize mercury ********************/
/* initialize the netwrok class */
- network_class = NA_MPI_Init(NULL, MPI_INIT_SERVER);
+ //network_class = NA_MPI_Init(NULL, MPI_INIT_SERVER);
+ network_class = NA_Initialize("tcp@mpi://0.0.0.0:0", 1);
/* Allocate table addrs */
na_addr_table = (char**) malloc((size_t)num_ions_g * sizeof(char*));
@@ -121,6 +122,15 @@ EFF_start_server(MPI_Comm comm, MPI_Info UNUSED info)
/* Only rank 0 writes file */
if (my_rank_g == 0) {
+ char cwd[1024];
+
+ if (getcwd(cwd, sizeof(cwd)) != NULL)
+ fprintf(stdout, "Writing port.cfg to: %s\n", cwd);
+ else {
+ fprintf(stderr, "etcwd() error\n");
+ return FAIL;
+ }
+
config = fopen("port.cfg", "w+");
if (config != NULL) {
fprintf(config, "%d\n", num_ions_g);
@@ -130,8 +140,8 @@ EFF_start_server(MPI_Comm comm, MPI_Info UNUSED info)
fclose(config);
}
else {
- fprintf(stderr, "could not open port.cfg file.\n");
- return FAIL;
+ fprintf(stderr, "could not open port.cfg file.\n");
+ return FAIL;
}
}
@@ -345,6 +355,67 @@ done:
return ret_value;
} /* end H5VL_iod_server_eff_finalize() */
+herr_t
+EFF_setup_coresident(MPI_Comm comm, MPI_Info UNUSED info)
+{
+ AXE_engine_attr_t engine_attr;
+ herr_t ret_value = SUCCEED;
+
+ MPI_Comm_size(comm, &num_ions_g);
+ MPI_Comm_rank(comm, &my_rank_g);
+
+ iod_comm = comm;
+
+ /* register server specific callbacks */
+ H5VL_EFF_OPEN_CONTAINER = MERCURY_REGISTER("container_open", hg_const_string_t, iod_handle_t,
+ H5VL_iod_server_container_open);
+ H5VL_EFF_CLOSE_CONTAINER = MERCURY_REGISTER("container_close", iod_handle_t, ret_t,
+ H5VL_iod_server_container_close);
+ H5VL_EFF_ANALYSIS_FARM = MERCURY_REGISTER("analysis_farm", analysis_farm_in_t,
+ analysis_farm_out_t, H5VL_iod_server_analysis_farm);
+ H5VL_EFF_ANALYSIS_FARM_TRANSFER = MERCURY_REGISTER("analysis_transfer", analysis_transfer_in_t,
+ analysis_transfer_out_t,
+ H5VL_iod_server_analysis_transfer);
+
+ /* Initialize engine attribute */
+ if(AXEengine_attr_init(&engine_attr) != AXE_SUCCEED)
+ return FAIL;
+
+ /* Set number of threads in AXE engine */
+ if(AXEset_num_threads(&engine_attr, 16) != AXE_SUCCEED)
+ return FAIL;
+
+ /* Create AXE engine */
+ if(AXEcreate_engine(&engine, &engine_attr) != AXE_SUCCEED)
+ return FAIL;
+
+ if(AXEengine_attr_destroy(&engine_attr) != AXE_SUCCEED)
+ return FAIL;
+
+ /* Initialize Python runtime */
+#ifdef H5_HAVE_PYTHON
+ Py_Initialize();
+#endif
+
+ return ret_value;
+}
+
+herr_t
+EFF_terminate_coresident(void)
+{
+ herr_t ret_value = SUCCEED;
+
+ /* Finalize Python runtime */
+#ifdef H5_HAVE_PYTHON
+ Py_Finalize();
+#endif
+
+ if(AXE_SUCCEED != AXEterminate_engine(engine, TRUE))
+ return FAIL;
+
+ return ret_value;
+}
+
H5VL_AXE_TASK_CB(H5VL_iod_server_analysis_execute, analysis_execute_in_t)
H5VL_AXE_TASK_CB(H5VL_iod_server_file_create, file_create_in_t)
H5VL_AXE_TASK_CB(H5VL_iod_server_file_open, file_open_in_t)
diff --git a/src/H5VLiod_trans.c b/src/H5VLiod_trans.c
index 083d753..c79c60f 100644
--- a/src/H5VLiod_trans.c
+++ b/src/H5VLiod_trans.c
@@ -917,7 +917,7 @@ static void check_ion_corruptions(iod_trans_id_t trans_num)
printf("CORRUPTING CS at step %d, ARRAY ID %"PRIx64"\n",
step, oid);
- ret = corrupt_data("eff_vpic", oid, trans_num, 20, cor_data);
+ ret = corrupt_data("mohamad.chaarawi.eff_vpic", oid, trans_num, 20, cor_data);
if(ret < 0) {
fprintf(stderr, "cant't corrupt data. %d (%s).\n", ret, strerror(-ret));
}
@@ -945,7 +945,7 @@ static void check_ion_corruptions(iod_trans_id_t trans_num)
printf("CORRUPTING CS at step %d, BLOB ID %"PRIx64"\n",
step, oid);
- ret = corrupt_data("eff_vpic", oid, trans_num, 5, cor_data);
+ ret = corrupt_data("mohamad.chaarawi.eff_vpic", oid, trans_num, 5, cor_data);
if(ret < 0) {
fprintf(stderr, "cant't corrupt data. %d (%s).\n", ret, strerror(-ret));
}
@@ -973,7 +973,7 @@ static void check_ion_corruptions(iod_trans_id_t trans_num)
printf("CORRUPTING CS at step %d, KV ID %"PRIx64"\n",
step, oid);
- ret = corrupt_kv("eff_vpic", oid, trans_num, step*2+1, cor_data);
+ ret = corrupt_kv("mohamad.chaarawi.eff_vpic", oid, trans_num, step*2+1, cor_data);
if(ret < 0) {
fprintf(stderr, "cant't corrupt data. %d (%s).\n", ret, strerror(-ret));
}
@@ -1119,7 +1119,7 @@ static void check_daos_corruptions(iod_hint_list_t *chint, iod_trans_id_t trans_
step, oid);
}
- ret = corrupt_kv("eff_vpic", oid, trans_num, step*2+1, cor_data);
+ ret = corrupt_kv("mohamad.chaarawi.eff_vpic", oid, trans_num, step*2+1, cor_data);
if(ret < 0) {
fprintf(stderr, "cant't corrupt data. %d (%s).\n", ret, strerror(-ret));
}