diff options
author | Mohamad Chaarawi <chaarawi@hdfgroup.org> | 2014-05-02 20:33:36 (GMT) |
---|---|---|
committer | Mohamad Chaarawi <chaarawi@hdfgroup.org> | 2014-05-02 20:33:36 (GMT) |
commit | 5e172835bbcedfc9897f3dd653c0a49e52c2cd52 (patch) | |
tree | da1da0444213ccc2511e3de0638f3f42657373d1 /src | |
parent | 646bdfd0fbdd6446a817475803cb15aa67fb53fe (diff) | |
download | hdf5-5e172835bbcedfc9897f3dd653c0a49e52c2cd52.zip hdf5-5e172835bbcedfc9897f3dd653c0a49e52c2cd52.tar.gz hdf5-5e172835bbcedfc9897f3dd653c0a49e52c2cd52.tar.bz2 |
[svn-r25150] Add coresident support:
- need to set H5ENV_CORESIDENT to 1 to use that mode
- still doing memcpy for raw data now.
Diffstat (limited to 'src')
-rw-r--r-- | src/H5VLiod.c | 127 | ||||
-rw-r--r-- | src/H5VLiod_client.c | 10 | ||||
-rw-r--r-- | src/H5VLiod_common.h | 2 | ||||
-rw-r--r-- | src/H5VLiod_file.c | 6 | ||||
-rw-r--r-- | src/H5VLiod_server.c | 77 | ||||
-rw-r--r-- | src/H5VLiod_trans.c | 8 |
6 files changed, 180 insertions, 50 deletions
diff --git a/src/H5VLiod.c b/src/H5VLiod.c index 4072517..1db9dff 100644 --- a/src/H5VLiod.c +++ b/src/H5VLiod.c @@ -52,7 +52,7 @@ typedef struct H5VL_iod_axe_list_t { static na_addr_t PEER; static na_class_t *network_class = NULL; - +static int coresident = 0; static AXE_task_t g_axe_id; static H5VL_iod_axe_list_t axe_list; @@ -489,7 +489,9 @@ H5VL__iod_create_and_forward(hg_id_t op_id, H5RQ_type_t op_type, hg_status_t status; /* test the operation status */ + FUNC_LEAVE_API_THREADSAFE; ret = HG_Wait(*((hg_request_t *)request->req), HG_MAX_IDLE_TIME, &status); + FUNC_ENTER_API_THREADSAFE; if(HG_FAIL == ret) { fprintf(stderr, "failed to wait on request\n"); request->status = H5ES_STATUS_FAIL; @@ -545,6 +547,7 @@ EFF_init(MPI_Comm comm, MPI_Info UNUSED info) na_addr_t ion_target; double axe_seed; char addr_name[H5VL_IOD_MAX_ADDR_NAME]; + char *coresident_s = NULL; herr_t ret_value = SUCCEED; MPI_Comm_size(comm, &num_procs); @@ -560,65 +563,97 @@ EFF_init(MPI_Comm comm, MPI_Info UNUSED info) axe_list.head = NULL; axe_list.tail = NULL; - /* MSC - This is a temporary solution for connecting to the server - using mercury */ - /* Only rank 0 reads file */ - if (my_rank == 0) { - int count, line=0, num_ions; - FILE *config; - char config_addr_name[H5VL_IOD_MAX_ADDR_NAME]; + coresident_s = getenv ("H5ENV_CORESIDENT"); + if(NULL != coresident_s) + coresident = atoi(coresident_s); + - config = fopen("port.cfg", "r"); + if(1 != coresident) { + /* MSC - This is a temporary solution for connecting to the server + using mercury */ + /* Only rank 0 reads file */ + if (my_rank == 0) { + int count, line=0, num_ions; + FILE *config; + char config_addr_name[H5VL_IOD_MAX_ADDR_NAME]; + char cwd[1024]; + + if (getcwd(cwd, sizeof(cwd)) != NULL) + fprintf(stdout, "Reading port.cfg from: %s\n", cwd); + else { + fprintf(stderr, "etcwd() error\n"); + return FAIL; + } + + config = fopen("port.cfg", "r"); + if (config == NULL) { + fprintf(stderr, "could not open port.cfg file."); + return FAIL; + } - fscanf(config, "%d\n", &num_ions); + fscanf(config, "%d\n", &num_ions); #if H5_EFF_DEBUG - printf("Found %d servers\n", num_ions); + printf("Found %d servers\n", num_ions); #endif - /* read a line */ - if(fgets(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, config) != NULL) { - strncpy(addr_name, config_addr_name, H5VL_IOD_MAX_ADDR_NAME); - count = 1; - while(num_procs > line + (count*num_ions)) { - MPI_Send(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, MPI_BYTE, - line + (count*num_ions), tag, comm); - count ++; + /* read a line */ + if(fgets(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, config) != NULL) { + strncpy(addr_name, config_addr_name, H5VL_IOD_MAX_ADDR_NAME); + count = 1; + while(num_procs > line + (count*num_ions)) { + MPI_Send(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, MPI_BYTE, + line + (count*num_ions), tag, comm); + count ++; + } + line++; } - line++; - } - while (fgets(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, config) != NULL) { - count = 0; - while(num_procs > line + (count*num_ions)) { - MPI_Send(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, MPI_BYTE, - line + (count*num_ions), tag, comm); - count ++; + while (fgets(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, config) != NULL) { + count = 0; + while(num_procs > line + (count*num_ions)) { + MPI_Send(config_addr_name, H5VL_IOD_MAX_ADDR_NAME, MPI_BYTE, + line + (count*num_ions), tag, comm); + count ++; + } + line ++; } - line ++; + fclose(config); + } + else { + MPI_Recv(addr_name, H5VL_IOD_MAX_ADDR_NAME, MPI_BYTE, + 0, tag, comm, MPI_STATUS_IGNORE); } - fclose(config); - } - else { - MPI_Recv(addr_name, H5VL_IOD_MAX_ADDR_NAME, MPI_BYTE, - 0, tag, comm, MPI_STATUS_IGNORE); - } #if H5_EFF_DEBUG - printf("CN %d Connecting to ION %s\n", my_rank, addr_name); + printf("CN %d Connecting to ION %s\n", my_rank, addr_name); #endif + } /* initialize Mercury stuff */ - network_class = NA_MPI_Init(NULL, 0); - //network_class = NA_Initialize("tcp@mpi://0.0.0.0:0", 0); + //network_class = NA_MPI_Init(NULL, 0); + network_class = NA_Initialize("tcp@mpi://0.0.0.0:0", 0); if (HG_SUCCESS != HG_Init(network_class)) { fprintf(stderr, "Failed to initialize Mercury\n"); return FAIL; } - if (NA_SUCCESS != NA_Addr_lookup_wait(network_class, addr_name, &ion_target)) { - fprintf(stderr, "Server lookup failed\n"); - return FAIL; + + if(coresident == 1) { + ret_value = EFF_setup_coresident(comm, MPI_INFO_NULL); + if(ret_value != SUCCEED) + return FAIL; + + if (NA_SUCCESS != NA_Addr_self(network_class, &ion_target)) { + fprintf(stderr, "Server lookup failed\n"); + return FAIL; + } + } + else { + if (NA_SUCCESS != NA_Addr_lookup_wait(network_class, addr_name, &ion_target)) { + fprintf(stderr, "Server lookup failed\n"); + return FAIL; + } } PEER = ion_target; @@ -685,12 +720,20 @@ EFF_finalize(void) if(HG_Forward(PEER, H5VL_EFF_FINALIZE_ID, &ret_value, &ret_value, &hg_req) < 0) return FAIL; + FUNC_LEAVE_API_THREADSAFE; HG_Wait(hg_req, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE); + FUNC_ENTER_API_THREADSAFE; /* Free Mercury request */ if(HG_Request_free(hg_req) != HG_SUCCESS) return FAIL; + if(coresident == 1) { + ret_value = EFF_terminate_coresident(); + if(ret_value != SUCCEED) + return FAIL; + } + /* Free addr id */ if (NA_SUCCESS != NA_Addr_free(network_class, PEER)) return FAIL; @@ -9712,7 +9755,9 @@ H5VL_iod_cancel(void **req, H5ES_status_t *status) HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "failed to ship attribute write"); /* Wait on the cancel request to return */ + FUNC_LEAVE_API_THREADSAFE; ret = HG_Wait(hg_req, HG_MAX_IDLE_TIME, &hg_status); + FUNC_ENTER_API_THREADSAFE; /* If the actual wait Fails, then the status of the cancel operation is unknown */ @@ -9787,7 +9832,9 @@ H5VL_iod_test(void **req, H5ES_status_t *status) /* Test completion of the request if is still pending */ if(H5VL_IOD_PENDING == request->state) { + FUNC_LEAVE_API_THREADSAFE; ret = HG_Wait(*((hg_request_t *)request->req), 0, &hg_status); + FUNC_ENTER_API_THREADSAFE; if(HG_FAIL == ret) { fprintf(stderr, "failed to wait on request\n"); request->status = H5ES_STATUS_FAIL; diff --git a/src/H5VLiod_client.c b/src/H5VLiod_client.c index 0f2d07e..be8ae9a 100644 --- a/src/H5VLiod_client.c +++ b/src/H5VLiod_client.c @@ -222,7 +222,9 @@ H5VL_iod_request_wait(H5VL_iod_file_t *file, H5VL_iod_request_t *request) while(1) { HDassert(request->state == H5VL_IOD_PENDING); /* test the operation status */ + FUNC_LEAVE_API_THREADSAFE; ret = HG_Wait(*((hg_request_t *)request->req), 0, &status); + FUNC_ENTER_API_THREADSAFE; if(HG_FAIL == ret) { HERROR(H5E_FUNC, H5E_CANTINIT, "failed to wait on request\n"); request->status = H5ES_STATUS_FAIL; @@ -249,7 +251,9 @@ H5VL_iod_request_wait(H5VL_iod_file_t *file, H5VL_iod_request_t *request) tmp_req = cur_req->file_next; HDassert(cur_req->state == H5VL_IOD_PENDING); + FUNC_LEAVE_API_THREADSAFE; ret = HG_Wait(*((hg_request_t *)cur_req->req), 0, &tmp_status); + FUNC_ENTER_API_THREADSAFE; if(HG_FAIL == ret) { HERROR(H5E_FUNC, H5E_CANTINIT, "failed to wait on request\n"); cur_req->status = H5ES_STATUS_FAIL; @@ -309,7 +313,9 @@ H5VL_iod_request_wait_all(H5VL_iod_file_t *file) tmp_req = cur_req->file_next; HDassert(cur_req->state == H5VL_IOD_PENDING); + FUNC_LEAVE_API_THREADSAFE; ret = HG_Wait(*((hg_request_t *)cur_req->req), HG_MAX_IDLE_TIME, &status); + FUNC_ENTER_API_THREADSAFE; if(HG_FAIL == ret) { HERROR(H5E_FUNC, H5E_CANTINIT, "failed to wait on request\n"); cur_req->status = H5ES_STATUS_FAIL; @@ -369,8 +375,10 @@ H5VL_iod_request_wait_some(H5VL_iod_file_t *file, const void *object) /* If the request is pending on the object we want, complete it */ if(cur_req->obj == object && cur_req->state == H5VL_IOD_PENDING) { + FUNC_LEAVE_API_THREADSAFE; ret = HG_Wait(*((hg_request_t *)cur_req->req), HG_MAX_IDLE_TIME, &status); + FUNC_ENTER_API_THREADSAFE; if(HG_FAIL == ret) { HERROR(H5E_FUNC, H5E_CANTINIT, "failed to wait on request\n"); cur_req->status = H5ES_STATUS_FAIL; @@ -852,11 +860,13 @@ H5VL_iod_request_complete(H5VL_iod_file_t *file, H5VL_iod_request_t *req) if(HG_Forward(info->ion_target, info->read_id, &input, info->status, &hg_req) < 0) HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "failed to ship operation"); + FUNC_LEAVE_API_THREADSAFE; if(HG_FAIL == HG_Wait(hg_req, HG_MAX_IDLE_TIME, &hg_status)) { fprintf(stderr, "failed to wait on request\n"); req->status = H5ES_STATUS_FAIL; req->state = H5VL_IOD_COMPLETED; } + FUNC_ENTER_API_THREADSAFE; if(addrs) { free(addrs); diff --git a/src/H5VLiod_common.h b/src/H5VLiod_common.h index a0c4897..5a421ac 100644 --- a/src/H5VLiod_common.h +++ b/src/H5VLiod_common.h @@ -206,6 +206,8 @@ H5_DLL void H5VL_iod_free_list_free(void **free_list, size_t free_list_len); #ifdef H5_HAVE_EFF +H5_DLL herr_t EFF_setup_coresident(MPI_Comm comm, MPI_Info info); +H5_DLL herr_t EFF_terminate_coresident(void); H5_DLL void EFF__mercury_register_callbacks(void); #ifdef H5_HAVE_INDEXING diff --git a/src/H5VLiod_file.c b/src/H5VLiod_file.c index 5c6f2d7..2349e36 100644 --- a/src/H5VLiod_file.c +++ b/src/H5VLiod_file.c @@ -67,9 +67,9 @@ H5VL_iod_server_file_create_cb(AXE_engine_t UNUSED axe_engine, #if H5_EFF_DEBUG fprintf(stderr, "Start file create %s ", input->name); - fprintf(stderr, "with MDKV %"PRIx64" ", mdkv_id), - fprintf(stderr, "with attrKV %"PRIx64"\n", attrkv_id), - fprintf(stderr, "with OIDKV %"PRIx64"\n", oidkv_id), + fprintf(stderr, "with MDKV %"PRIx64" ", mdkv_id); + fprintf(stderr, "with attrKV %"PRIx64"\n", attrkv_id); + fprintf(stderr, "with OIDKV %"PRIx64"\n", oidkv_id); #endif /* convert HDF5 flags to IOD flags */ diff --git a/src/H5VLiod_server.c b/src/H5VLiod_server.c index d2c8b78..2dd16c4 100644 --- a/src/H5VLiod_server.c +++ b/src/H5VLiod_server.c @@ -101,7 +101,8 @@ EFF_start_server(MPI_Comm comm, MPI_Info UNUSED info) /******************* Initialize mercury ********************/ /* initialize the netwrok class */ - network_class = NA_MPI_Init(NULL, MPI_INIT_SERVER); + //network_class = NA_MPI_Init(NULL, MPI_INIT_SERVER); + network_class = NA_Initialize("tcp@mpi://0.0.0.0:0", 1); /* Allocate table addrs */ na_addr_table = (char**) malloc((size_t)num_ions_g * sizeof(char*)); @@ -121,6 +122,15 @@ EFF_start_server(MPI_Comm comm, MPI_Info UNUSED info) /* Only rank 0 writes file */ if (my_rank_g == 0) { + char cwd[1024]; + + if (getcwd(cwd, sizeof(cwd)) != NULL) + fprintf(stdout, "Writing port.cfg to: %s\n", cwd); + else { + fprintf(stderr, "etcwd() error\n"); + return FAIL; + } + config = fopen("port.cfg", "w+"); if (config != NULL) { fprintf(config, "%d\n", num_ions_g); @@ -130,8 +140,8 @@ EFF_start_server(MPI_Comm comm, MPI_Info UNUSED info) fclose(config); } else { - fprintf(stderr, "could not open port.cfg file.\n"); - return FAIL; + fprintf(stderr, "could not open port.cfg file.\n"); + return FAIL; } } @@ -345,6 +355,67 @@ done: return ret_value; } /* end H5VL_iod_server_eff_finalize() */ +herr_t +EFF_setup_coresident(MPI_Comm comm, MPI_Info UNUSED info) +{ + AXE_engine_attr_t engine_attr; + herr_t ret_value = SUCCEED; + + MPI_Comm_size(comm, &num_ions_g); + MPI_Comm_rank(comm, &my_rank_g); + + iod_comm = comm; + + /* register server specific callbacks */ + H5VL_EFF_OPEN_CONTAINER = MERCURY_REGISTER("container_open", hg_const_string_t, iod_handle_t, + H5VL_iod_server_container_open); + H5VL_EFF_CLOSE_CONTAINER = MERCURY_REGISTER("container_close", iod_handle_t, ret_t, + H5VL_iod_server_container_close); + H5VL_EFF_ANALYSIS_FARM = MERCURY_REGISTER("analysis_farm", analysis_farm_in_t, + analysis_farm_out_t, H5VL_iod_server_analysis_farm); + H5VL_EFF_ANALYSIS_FARM_TRANSFER = MERCURY_REGISTER("analysis_transfer", analysis_transfer_in_t, + analysis_transfer_out_t, + H5VL_iod_server_analysis_transfer); + + /* Initialize engine attribute */ + if(AXEengine_attr_init(&engine_attr) != AXE_SUCCEED) + return FAIL; + + /* Set number of threads in AXE engine */ + if(AXEset_num_threads(&engine_attr, 16) != AXE_SUCCEED) + return FAIL; + + /* Create AXE engine */ + if(AXEcreate_engine(&engine, &engine_attr) != AXE_SUCCEED) + return FAIL; + + if(AXEengine_attr_destroy(&engine_attr) != AXE_SUCCEED) + return FAIL; + + /* Initialize Python runtime */ +#ifdef H5_HAVE_PYTHON + Py_Initialize(); +#endif + + return ret_value; +} + +herr_t +EFF_terminate_coresident(void) +{ + herr_t ret_value = SUCCEED; + + /* Finalize Python runtime */ +#ifdef H5_HAVE_PYTHON + Py_Finalize(); +#endif + + if(AXE_SUCCEED != AXEterminate_engine(engine, TRUE)) + return FAIL; + + return ret_value; +} + H5VL_AXE_TASK_CB(H5VL_iod_server_analysis_execute, analysis_execute_in_t) H5VL_AXE_TASK_CB(H5VL_iod_server_file_create, file_create_in_t) H5VL_AXE_TASK_CB(H5VL_iod_server_file_open, file_open_in_t) diff --git a/src/H5VLiod_trans.c b/src/H5VLiod_trans.c index 083d753..c79c60f 100644 --- a/src/H5VLiod_trans.c +++ b/src/H5VLiod_trans.c @@ -917,7 +917,7 @@ static void check_ion_corruptions(iod_trans_id_t trans_num) printf("CORRUPTING CS at step %d, ARRAY ID %"PRIx64"\n", step, oid); - ret = corrupt_data("eff_vpic", oid, trans_num, 20, cor_data); + ret = corrupt_data("mohamad.chaarawi.eff_vpic", oid, trans_num, 20, cor_data); if(ret < 0) { fprintf(stderr, "cant't corrupt data. %d (%s).\n", ret, strerror(-ret)); } @@ -945,7 +945,7 @@ static void check_ion_corruptions(iod_trans_id_t trans_num) printf("CORRUPTING CS at step %d, BLOB ID %"PRIx64"\n", step, oid); - ret = corrupt_data("eff_vpic", oid, trans_num, 5, cor_data); + ret = corrupt_data("mohamad.chaarawi.eff_vpic", oid, trans_num, 5, cor_data); if(ret < 0) { fprintf(stderr, "cant't corrupt data. %d (%s).\n", ret, strerror(-ret)); } @@ -973,7 +973,7 @@ static void check_ion_corruptions(iod_trans_id_t trans_num) printf("CORRUPTING CS at step %d, KV ID %"PRIx64"\n", step, oid); - ret = corrupt_kv("eff_vpic", oid, trans_num, step*2+1, cor_data); + ret = corrupt_kv("mohamad.chaarawi.eff_vpic", oid, trans_num, step*2+1, cor_data); if(ret < 0) { fprintf(stderr, "cant't corrupt data. %d (%s).\n", ret, strerror(-ret)); } @@ -1119,7 +1119,7 @@ static void check_daos_corruptions(iod_hint_list_t *chint, iod_trans_id_t trans_ step, oid); } - ret = corrupt_kv("eff_vpic", oid, trans_num, step*2+1, cor_data); + ret = corrupt_kv("mohamad.chaarawi.eff_vpic", oid, trans_num, step*2+1, cor_data); if(ret < 0) { fprintf(stderr, "cant't corrupt data. %d (%s).\n", ret, strerror(-ret)); } |