diff options
author | Mohamad Chaarawi <chaarawi@hdfgroup.org> | 2015-06-21 21:45:01 (GMT) |
---|---|---|
committer | Mohamad Chaarawi <chaarawi@hdfgroup.org> | 2015-06-21 21:45:01 (GMT) |
commit | cf07aa6803a70f52f6a178219097e9f9e6ee1a51 (patch) | |
tree | 38bbce437bb596ee74745a550d37a61260610cf4 /src/H5C.c | |
parent | 48e1bbb0f87c6712189f82a7c5d8835a87d21ee2 (diff) | |
download | hdf5-cf07aa6803a70f52f6a178219097e9f9e6ee1a51.zip hdf5-cf07aa6803a70f52f6a178219097e9f9e6ee1a51.tar.gz hdf5-cf07aa6803a70f52f6a178219097e9f9e6ee1a51.tar.bz2 |
[svn-r27261] apply metadata enhancement patch that was based on the mdc 3 branch.
Diffstat (limited to 'src/H5C.c')
-rw-r--r-- | src/H5C.c | 975 |
1 files changed, 897 insertions, 78 deletions
@@ -109,6 +109,19 @@ H5FL_DEFINE_STATIC(H5C_t); +#ifdef H5_HAVE_PARALLEL +typedef struct H5C_collective_write_t { + size_t length; + hbool_t free_buf; + void *buf; + haddr_t offset; +} H5C_collective_write_t; +static herr_t H5C_collective_write(H5F_t *f, + hid_t dxpl_id, + H5SL_t *collective_write_list); +static herr_t H5C_collective_write_free(void *_item, void *key, void *op_data); +#endif /* H5_HAVE_PARALLEL */ + /* * Private file-scope function declarations: */ @@ -144,8 +157,9 @@ static herr_t H5C_flush_single_entry(const H5F_t * f, hid_t dxpl_id, haddr_t addr, unsigned flags, - hbool_t del_entry_from_slist_on_destroy, - int64_t *entry_size_change_ptr); + hbool_t del_entry_from_slist_on_destroy, + int64_t *entry_size_change_ptr, + H5SL_t * collective_write_list); static herr_t H5C_flush_invalidate_cache(const H5F_t * f, hid_t dxpl_id, @@ -153,6 +167,9 @@ static herr_t H5C_flush_invalidate_cache(const H5F_t * f, static void * H5C_load_entry(H5F_t * f, hid_t dxpl_id, +#ifdef H5_HAVE_PARALLEL + hbool_t coll_access, +#endif /* H5_HAVE_PARALLEL */ const H5C_class_t * type, haddr_t addr, void * udata); @@ -178,6 +195,9 @@ static herr_t H5C_flush_marked_entries(H5F_t * f, hid_t dxpl_id, H5C_t * cache_ptr); +static herr_t H5C__generate_image(H5F_t *f, H5C_t * cache_ptr, H5C_cache_entry_t *entry_ptr, + hid_t dxpl_id, int64_t *entry_size_change_ptr); + #if H5C_DO_TAGGING_SANITY_CHECKS static herr_t H5C_verify_tag(int id, haddr_t tag); #endif @@ -529,6 +549,7 @@ H5C_apply_candidate_list(H5F_t * f, H5C_cache_entry_t * entry_ptr = NULL; H5C_cache_entry_t * flush_ptr = NULL; H5C_cache_entry_t * delayed_ptr = NULL; + H5SL_t * collective_write_list = NULL; #if H5C_DO_SANITY_CHECKS haddr_t last_addr; #endif /* H5C_DO_SANITY_CHECKS */ @@ -561,6 +582,10 @@ H5C_apply_candidate_list(H5F_t * f, HDfprintf(stdout, "%s", tbl_buf); #endif /* H5C_APPLY_CANDIDATE_LIST__DEBUG */ + /* Create skip list of entries for collective write */ + if(NULL == (collective_write_list = H5SL_create(H5SL_TYPE_HADDR, NULL))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCREATE, FAIL, "can't create skip list for entries") + n = num_candidates / mpi_size; m = num_candidates % mpi_size; HDassert(n >= 0); @@ -665,6 +690,17 @@ H5C_apply_candidate_list(H5F_t * f, entries_to_clear++; entry_ptr->clear_on_unprotect = TRUE; } /* end else */ + + /* Entries marked as collectively accessed and are in the + candidate list to clear from the cache have to be + removed from the coll list. This is OK since the + candidate list is collective and uniform across all + ranks. */ + if(TRUE == entry_ptr->coll_access) { + entry_ptr->coll_access = FALSE; + cache_ptr->num_coll_entries --; + H5C__REMOVE_FROM_COLL_LIST(cache_ptr, entry_ptr, FAIL) + } } /* end else */ } /* end for */ @@ -746,6 +782,7 @@ H5C_apply_candidate_list(H5F_t * f, clear_ptr->addr, H5C__FLUSH_CLEAR_ONLY_FLAG, TRUE, + NULL, NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, "Can't clear entry.") } /* end if */ @@ -796,7 +833,8 @@ H5C_apply_candidate_list(H5F_t * f, flush_ptr->addr, H5C__NO_FLAGS_SET, TRUE, - NULL) < 0) + NULL, + collective_write_list) < 0) HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, "Can't flush entry.") if ( ( cache_ptr->entries_removed_counter > 1 ) || @@ -956,11 +994,12 @@ H5C_apply_candidate_list(H5F_t * f, #endif /* H5C_APPLY_CANDIDATE_LIST__DEBUG */ if(H5C_flush_single_entry(f, - dxpl_id, - clear_ptr->addr, - H5C__FLUSH_CLEAR_ONLY_FLAG, - TRUE, - NULL) < 0) + dxpl_id, + clear_ptr->addr, + H5C__FLUSH_CLEAR_ONLY_FLAG, + TRUE, + NULL, + NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, "Can't clear entry.") } /* end else-if */ @@ -977,11 +1016,12 @@ H5C_apply_candidate_list(H5F_t * f, #endif /* H5C_APPLY_CANDIDATE_LIST__DEBUG */ if(H5C_flush_single_entry(f, - dxpl_id, - flush_ptr->addr, - H5C__NO_FLAGS_SET, - TRUE, - NULL) < 0) + dxpl_id, + flush_ptr->addr, + H5C__NO_FLAGS_SET, + TRUE, + NULL, + collective_write_list) < 0) HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, "Can't clear entry.") } /* end else-if */ } /* end if */ @@ -1015,26 +1055,41 @@ H5C_apply_candidate_list(H5F_t * f, if (delayed_ptr) { if (delayed_ptr->clear_on_unprotect) { + if(H5C_flush_single_entry(f, + dxpl_id, + delayed_ptr->addr, + H5C__FLUSH_CLEAR_ONLY_FLAG, + TRUE, + NULL, + NULL) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, + "Can't flush entry collectively.") entry_ptr->clear_on_unprotect = FALSE; entries_cleared++; } else if (delayed_ptr->flush_immediately) { + if(H5C_flush_single_entry(f, + dxpl_id, + delayed_ptr->addr, + H5C__NO_FLAGS_SET, + TRUE, + NULL, + collective_write_list) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, + "Can't flush entry collectively.") entry_ptr->flush_immediately = FALSE; entries_flushed++; } /* end if */ - if(H5C_flush_single_entry(f, - dxpl_id, - delayed_ptr->addr, - H5C__NO_FLAGS_SET, - TRUE, - NULL) < 0) - HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, - "Can't flush entry collectively.") - entries_flushed_collectively++; entries_flushed_or_cleared_last++; } /* end if */ + /* Write collective list */ + if(H5C_collective_write(f, + dxpl_id, + collective_write_list) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, "Can't write metadata collectively") + /* ====================================================================== * * Finished flushing everything. * * ====================================================================== */ @@ -1054,6 +1109,10 @@ done: if(candidate_assignment_table != NULL) candidate_assignment_table = (int *)H5MM_xfree((void *)candidate_assignment_table); + if(collective_write_list) + if(H5SL_destroy(collective_write_list, H5C_collective_write_free, NULL) < 0) + HDONE_ERROR(H5E_CACHE, H5E_CANTFREE, FAIL, "failed to destroy skip list") + FUNC_LEAVE_NOAPI(ret_value) } /* H5C_apply_candidate_list() */ #endif /* H5_HAVE_PARALLEL */ @@ -1396,6 +1455,14 @@ H5C_create(size_t max_cache_size, cache_ptr->LRU_head_ptr = NULL; cache_ptr->LRU_tail_ptr = NULL; +#ifdef H5_HAVE_PARALLEL + cache_ptr->coll_list_len = 0; + cache_ptr->coll_list_size = (size_t)0; + cache_ptr->coll_head_ptr = NULL; + cache_ptr->coll_tail_ptr = NULL; + cache_ptr->num_coll_entries = 0; +#endif /* H5_HAVE_PARALLEL */ + cache_ptr->cLRU_list_len = 0; cache_ptr->cLRU_list_size = (size_t)0; cache_ptr->cLRU_head_ptr = NULL; @@ -1831,7 +1898,7 @@ H5C_expunge_entry(H5F_t *f, hid_t dxpl_id, const H5C_class_t *type, entry_size = entry_ptr->size; #endif /* H5C_DO_EXTREME_SANITY_CHECKS */ - if(H5C_flush_single_entry(f, dxpl_id, entry_ptr->addr, flush_flags, TRUE, NULL) < 0) + if(H5C_flush_single_entry(f, dxpl_id, entry_ptr->addr, flush_flags, TRUE, NULL, NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTEXPUNGE, FAIL, "H5C_flush_single_entry() failed.") #if H5C_DO_SANITY_CHECKS @@ -2157,11 +2224,12 @@ H5C_flush_cache(H5F_t *f, hid_t dxpl_id, unsigned flags) entry_size_change = 0; #endif /* H5C_DO_SANITY_CHECKS */ status = H5C_flush_single_entry(f, - dxpl_id, - entry_ptr->addr, - flags, - FALSE, - entry_size_change_ptr); + dxpl_id, + entry_ptr->addr, + flags, + FALSE, + entry_size_change_ptr, + NULL); if ( status < 0 ) { /* This shouldn't happen -- if it does, @@ -2217,11 +2285,12 @@ H5C_flush_cache(H5F_t *f, hid_t dxpl_id, unsigned flags) entry_size_change = 0; #endif /* H5C_DO_SANITY_CHECKS */ status = H5C_flush_single_entry(f, - dxpl_id, - entry_ptr->addr, - flags, - FALSE, - entry_size_change_ptr); + dxpl_id, + entry_ptr->addr, + flags, + FALSE, + entry_size_change_ptr, + NULL); if ( status < 0 ) { /* This shouldn't happen -- if it does, @@ -2915,7 +2984,9 @@ H5C_insert_entry(H5F_t * f, hbool_t insert_pinned; hbool_t flush_last; #ifdef H5_HAVE_PARALLEL + hbool_t coll_access = FALSE; /* whether access to the cache entry is done collectively */ hbool_t flush_collectively; + H5P_genplist_t * dxpl; /* dataset transfer property list */ #endif /* H5_HAVE_PARALLEL */ hbool_t set_flush_marker; hbool_t write_permitted = TRUE; @@ -3050,6 +3121,9 @@ H5C_insert_entry(H5F_t * f, entry_ptr->aux_next = NULL; entry_ptr->aux_prev = NULL; + entry_ptr->coll_next = NULL; + entry_ptr->coll_prev = NULL; + H5C__RESET_CACHE_ENTRY_STATS(entry_ptr) if ( ( cache_ptr->flash_size_increase_possible ) && @@ -3180,6 +3254,50 @@ H5C_insert_entry(H5F_t * f, H5C__UPDATE_STATS_FOR_INSERTION(cache_ptr, entry_ptr) +#ifdef H5_HAVE_PARALLEL + /* Get the dataset transfer property list */ + if(NULL == (dxpl = (H5P_genplist_t *)H5I_object(dxpl_id))) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a property list"); + + if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI)) { + coll_access = (H5P_USER_TRUE == f->coll_md_read ? TRUE : FALSE); + + if(!coll_access && H5P_FORCE_FALSE != f->coll_md_read) { + H5P_coll_md_read_flag_t prop_value; + + /* get the property value */ + if(H5P_get(dxpl, H5_COLL_MD_READ_FLAG_NAME, &prop_value) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "Can't get collective metadata access flag") + + coll_access = (H5P_USER_TRUE == prop_value ? TRUE : FALSE); + } + //fprintf(stderr, "COLLACCESS = %d.. FILE property = %d\n", coll_access, f->coll_md_read); + } + + entry_ptr->coll_access = coll_access; + if(coll_access) { + //fprintf(stderr, "NEW (%llu, %s)\n", addr, entry_ptr->type->name); + cache_ptr->num_coll_entries ++; + H5C__INSERT_IN_COLL_LIST(cache_ptr, entry_ptr, FAIL) + + /* Make sure the size of the collective entries in the cache remain in check */ + if(H5P_USER_TRUE == f->coll_md_read) { + if(cache_ptr->max_cache_size*0.8 < cache_ptr->coll_list_size) { + if(H5C_clear_coll_entries(f, dxpl_id, cache_ptr, 1) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTGET, FAIL, "H5C_clear_coll_entries() failed.") + } + } + else { + if(cache_ptr->max_cache_size*0.4 < cache_ptr->coll_list_size) { + if(H5C_clear_coll_entries(f, dxpl_id, cache_ptr, 1) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTGET, FAIL, "H5C_clear_coll_entries() failed.") + } + } + + } + entry_ptr->ind_access_while_coll = FALSE; +#endif + done: #if H5C_DO_EXTREME_SANITY_CHECKS @@ -3359,6 +3477,12 @@ H5C_mark_entries_as_clean(H5F_t * f, * scan the LRU list shortly, and clear all those entries * not currently protected. */ + if(TRUE == entry_ptr->coll_access) { + entry_ptr->coll_access = FALSE; + cache_ptr->num_coll_entries --; + H5C__REMOVE_FROM_COLL_LIST(cache_ptr, entry_ptr, FAIL) + } + entry_ptr->clear_on_unprotect = TRUE; #if H5C_DO_SANITY_CHECKS if ( entry_ptr->is_protected ) { @@ -3424,6 +3548,7 @@ H5C_mark_entries_as_clean(H5F_t * f, clear_ptr->addr, H5C__FLUSH_CLEAR_ONLY_FLAG, TRUE, + NULL, NULL) < 0 ) { HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, "Can't clear entry.") @@ -3459,6 +3584,7 @@ H5C_mark_entries_as_clean(H5F_t * f, clear_ptr->addr, H5C__FLUSH_CLEAR_ONLY_FLAG, TRUE, + NULL, NULL) < 0 ) { HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, "Can't clear entry.") @@ -3853,6 +3979,14 @@ H5C_resize_entry(void *thing, size_t new_size) (entry_ptr->size), (new_size)) } /* end if */ +#ifdef H5_HAVE_PARALLEL + if(entry_ptr->coll_access) { + H5C__DLL_UPDATE_FOR_SIZE_CHANGE((cache_ptr->coll_list_len), \ + (cache_ptr->coll_list_size), \ + (entry_ptr->size), (new_size)) + } /* end if */ +#endif /* H5_HAVE_PARALLEL */ + /* update the hash table */ H5C__UPDATE_INDEX_FOR_SIZE_CHANGE((cache_ptr), (entry_ptr->size),\ (new_size), (entry_ptr), (was_clean)); @@ -4080,14 +4214,15 @@ H5C_protect(H5F_t * f, hbool_t flush_last; #ifdef H5_HAVE_PARALLEL hbool_t flush_collectively; + hbool_t coll_access = FALSE; /* whether access to the cache entry is done collectively */ #endif /* H5_HAVE_PARALLEL */ hbool_t write_permitted; herr_t result; size_t empty_space; void * thing; H5C_cache_entry_t * entry_ptr; - haddr_t tag = HADDR_UNDEF; - H5P_genplist_t *dxpl; /* dataset transfer property list */ + haddr_t tag = HADDR_UNDEF; + H5P_genplist_t * dxpl; /* dataset transfer property list */ void * ret_value; /* Return value */ FUNC_ENTER_NOAPI(NULL) @@ -4108,7 +4243,7 @@ H5C_protect(H5F_t * f, ( H5C_validate_pinned_entry_list(cache_ptr) < 0 ) || ( H5C_validate_lru_list(cache_ptr) < 0 ) ) { - HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, \ + HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, NULL, \ "an extreme sanity check failed on entry.\n"); } #endif /* H5C_DO_EXTREME_SANITY_CHECKS */ @@ -4119,15 +4254,101 @@ H5C_protect(H5F_t * f, flush_collectively = ( (flags & H5C__FLUSH_COLLECTIVELY_FLAG) != 0 ); #endif /* H5_HAVE_PARALLEL */ + /* Get the dataset transfer property list */ + if(NULL == (dxpl = (H5P_genplist_t *)H5I_object(dxpl_id))) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, NULL, "not a property list"); + +#ifdef H5_HAVE_PARALLEL + if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI)) { + coll_access = (H5P_USER_TRUE == f->coll_md_read ? TRUE : FALSE); + + if(!coll_access && H5P_FORCE_FALSE != f->coll_md_read) { + H5P_coll_md_read_flag_t prop_value; + + /* get the property value */ + if(H5P_get(dxpl, H5_COLL_MD_READ_FLAG_NAME, &prop_value) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, NULL, "Can't get collective metadata access flag") + + coll_access = (H5P_USER_TRUE == prop_value ? TRUE : FALSE); + } + //fprintf(stderr, "COLLACCESS = %d.. FILE property = %d\n", coll_access, f->coll_md_read); + } +#endif /* H5_HAVE_PARALLEL */ + /* first check to see if the target is in cache */ H5C__SEARCH_INDEX(cache_ptr, addr, entry_ptr, NULL) if ( entry_ptr != NULL ) { - /* Check for trying to load the wrong type of entry from an address */ if(entry_ptr->type != type) HGOTO_ERROR(H5E_CACHE, H5E_BADTYPE, NULL, "incorrect cache entry type") + /* if this is a collective metadata read, the entry is not + marked as collective, and is clean, it is possible that + other processes will not have it in its cache and will + expect a bcast of the entry from process 0. So process 0 + will bcast the entry to all other ranks. Ranks that do have + the entry in their cache still have to participate in the + bcast. */ +#ifdef H5_HAVE_PARALLEL + if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI) && coll_access && + !(entry_ptr->is_dirty) && !(entry_ptr->coll_access)) { + MPI_Comm comm; /* File MPI Communicator */ + int mpi_code; /* MPI error code */ + int buf_size; + + if((comm = H5F_mpi_get_comm(f)) == MPI_COMM_NULL) + HGOTO_ERROR(H5E_FILE, H5E_CANTGET, NULL, "get_comm request failed") + + if(entry_ptr->image_ptr == NULL) { + int mpi_rank; + size_t image_size; + + if((mpi_rank = H5F_mpi_get_rank(f)) < 0) + HGOTO_ERROR(H5E_FILE, H5E_CANTGET, NULL, "Can't get MPI rank") + + if ( entry_ptr->compressed ) + image_size = entry_ptr->compressed_size; + else + image_size = entry_ptr->size; + + HDassert(image_size > 0); + + entry_ptr->image_ptr = H5MM_malloc(image_size + H5C_IMAGE_EXTRA_SPACE); + + if (NULL == entry_ptr->image_ptr) { + HGOTO_ERROR(H5E_CACHE, H5E_CANTALLOC, NULL, \ + "memory allocation failed for on disk image buffer") + } +#if H5C_DO_MEMORY_SANITY_CHECKS + HDmemcpy(((uint8_t *)entry_ptr->image_ptr) + image_size, + H5C_IMAGE_SANITY_VALUE, H5C_IMAGE_EXTRA_SPACE); +#endif /* H5C_DO_MEMORY_SANITY_CHECKS */ + if(0 == mpi_rank) { + if(H5C__generate_image(f, cache_ptr, entry_ptr, dxpl_id, NULL) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTGET, NULL, "Can't get Image") + } + } + + HDassert(entry_ptr->image_ptr); + + H5_CHECKED_ASSIGN(buf_size, int, entry_ptr->size, size_t); + if(MPI_SUCCESS != (mpi_code = MPI_Bcast(entry_ptr->image_ptr, buf_size, + MPI_BYTE, 0, comm))) + HMPI_GOTO_ERROR(NULL, "MPI_Bcast failed", mpi_code) + + entry_ptr->coll_access = TRUE; + cache_ptr->num_coll_entries ++; + + //fprintf(stderr, "ONLY INSERT (%llu, %s)\n", addr, entry_ptr->type->name); + H5C__INSERT_IN_COLL_LIST(cache_ptr, entry_ptr, NULL) + } + else if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI) && coll_access && entry_ptr->coll_access) { + //fprintf(stderr, "UPDATE (%llu, %s)\n", addr, entry_ptr->type->name); + H5C__MOVE_TO_TOP_IN_COLL_LIST(cache_ptr, entry_ptr, NULL) + } +#endif /* H5_HAVE_PARALLEL */ + #if H5C_DO_TAGGING_SANITY_CHECKS /* The entry is already in the cache, but make sure that the tag value being passed in via dxpl is still legal. This will ensure that had @@ -4135,10 +4356,6 @@ H5C_protect(H5F_t * f, and it would have received a legal tag value after getting loaded from disk. */ - /* Get the dataset transfer property list */ - if(NULL == (dxpl = (H5P_genplist_t *)H5I_object_verify(dxpl_id, H5I_GENPROP_LST))) - HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, NULL, "not a property list"); - /* Get the tag from the DXPL */ if( (H5P_get(dxpl, "H5AC_metadata_tag", &tag)) < 0 ) HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, NULL, "unable to query property value"); @@ -4162,7 +4379,11 @@ H5C_protect(H5F_t * f, hit = FALSE; - thing = H5C_load_entry(f, dxpl_id, type, addr, udata); + thing = H5C_load_entry(f, dxpl_id, +#ifdef H5_HAVE_PARALLEL + coll_access, +#endif /* H5_HAVE_PARALLEL */ + type, addr, udata); if ( thing == NULL ) { @@ -4171,6 +4392,17 @@ H5C_protect(H5F_t * f, entry_ptr = (H5C_cache_entry_t *)thing; +#ifdef H5_HAVE_PARALLEL + if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI)) { + if(entry_ptr->coll_access) { + cache_ptr->num_coll_entries ++; + //fprintf(stderr, "LOAD and INSERT (%llu, %s)\n", addr, entry_ptr->type->name); + H5C__INSERT_IN_COLL_LIST(cache_ptr, entry_ptr, NULL) + } + } + //fprintf(stderr, "LOADED (%llu, %s) - %d\n", addr, entry_ptr->type->name, coll_access); +#endif /* H5_HAVE_PARALLEL */ + /* Apply tag to newly protected entry */ if(H5C_tag_entry(cache_ptr, entry_ptr, dxpl_id) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTTAG, NULL, "Cannot tag metadata entry") @@ -4453,6 +4685,29 @@ H5C_protect(H5F_t * f, } } +#ifdef H5_HAVE_PARALLEL + if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI)) { + /* Make sure the size of the collective entries in the cache remain in check */ + if( TRUE == coll_access) { + if(H5P_USER_TRUE == f->coll_md_read) { + //fprintf(stderr, "COLL entries size = %zu, MAX = %zu\n", cache_ptr->coll_list_size, cache_ptr->max_cache_size); + if(cache_ptr->max_cache_size*0.8 < cache_ptr->coll_list_size) { + //fprintf(stderr, "COLL entries at 80.. CLEARING\n"); + if(H5C_clear_coll_entries(f, dxpl_id, cache_ptr, 1) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTGET, NULL, "H5C_clear_coll_entries() failed.") + } + } + else { + if(cache_ptr->max_cache_size*0.4 < cache_ptr->coll_list_size) { + //fprintf(stderr, "COLL entries at 40.. CLEARING\n"); + if(H5C_clear_coll_entries(f, dxpl_id, cache_ptr, 1) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTGET, NULL, "H5C_clear_coll_entries() failed.") + } + } + } + } +#endif /* H5_HAVE_PARALLEL */ + done: #if H5C_DO_EXTREME_SANITY_CHECKS @@ -4460,7 +4715,7 @@ done: ( H5C_validate_pinned_entry_list(cache_ptr) < 0 ) || ( H5C_validate_lru_list(cache_ptr) < 0 ) ) { - HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, \ + HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, NULL, \ "an extreme sanity check failed on exit.\n"); } #endif /* H5C_DO_EXTREME_SANITY_CHECKS */ @@ -6138,6 +6393,7 @@ H5C_unprotect(H5F_t * f, addr, flush_flags, TRUE, + NULL, NULL) < 0 ) { HGOTO_ERROR(H5E_CACHE, H5E_CANTUNPROTECT, FAIL, "Can't flush.") @@ -6179,6 +6435,7 @@ H5C_unprotect(H5F_t * f, addr, H5C__FLUSH_CLEAR_ONLY_FLAG, TRUE, + NULL, NULL) < 0 ) { HGOTO_ERROR(H5E_CACHE, H5E_CANTUNPROTECT, FAIL, "Can't clear.") @@ -7518,6 +7775,7 @@ H5C__autoadjust__ageout__evict_aged_out_entries(H5F_t * f, entry_ptr->addr, H5C__NO_FLAGS_SET, FALSE, + NULL, NULL); if ( ( cache_ptr->entries_removed_counter > 1 ) || @@ -7534,6 +7792,7 @@ H5C__autoadjust__ageout__evict_aged_out_entries(H5F_t * f, entry_ptr->addr, H5C__FLUSH_INVALIDATE_FLAG, TRUE, + NULL, NULL); } @@ -7625,6 +7884,7 @@ H5C__autoadjust__ageout__evict_aged_out_entries(H5F_t * f, entry_ptr->addr, H5C__FLUSH_INVALIDATE_FLAG, TRUE, + NULL, NULL); if ( result < 0 ) { @@ -8404,7 +8664,8 @@ H5C_flush_invalidate_cache(const H5F_t * f, entry_ptr->addr, H5C__NO_FLAGS_SET, FALSE, - entry_size_change_ptr); + entry_size_change_ptr, + NULL); if ( status < 0 ) { /* This shouldn't happen -- if it does, we @@ -8462,11 +8723,12 @@ H5C_flush_invalidate_cache(const H5F_t * f, #endif /* H5C_DO_SANITY_CHECKS */ status = H5C_flush_single_entry(f, - dxpl_id, - entry_ptr->addr, - (cooked_flags | H5C__FLUSH_INVALIDATE_FLAG), - TRUE, - entry_size_change_ptr); + dxpl_id, + entry_ptr->addr, + (cooked_flags | H5C__FLUSH_INVALIDATE_FLAG), + TRUE, + entry_size_change_ptr, + NULL); if ( status < 0 ) { /* This shouldn't happen -- if it does, we @@ -8604,11 +8866,12 @@ H5C_flush_invalidate_cache(const H5F_t * f, entry_was_dirty = entry_ptr->is_dirty; status = H5C_flush_single_entry(f, - dxpl_id, - entry_ptr->addr, - (cooked_flags | H5C__FLUSH_INVALIDATE_FLAG), - TRUE, - NULL); + dxpl_id, + entry_ptr->addr, + (cooked_flags | H5C__FLUSH_INVALIDATE_FLAG), + TRUE, + NULL, + NULL); if ( status < 0 ) { @@ -8820,7 +9083,8 @@ H5C_flush_single_entry(const H5F_t * f, haddr_t addr, unsigned flags, hbool_t del_entry_from_slist_on_destroy, - int64_t * entry_size_change_ptr) + int64_t * entry_size_change_ptr, + H5SL_t * collective_write_list) { H5C_t * cache_ptr = f->shared->cache; hbool_t destroy; /* external flag */ @@ -8866,6 +9130,8 @@ H5C_flush_single_entry(const H5F_t * f, /* attempt to find the target entry in the hash table */ H5C__SEARCH_INDEX(cache_ptr, addr, entry_ptr, FAIL) + HDassert(FALSE == entry_ptr->coll_access); + /* we will write the entry to disk if it exists, is dirty, and if the * clear only flag is not set. */ @@ -8948,7 +9214,7 @@ H5C_flush_single_entry(const H5F_t * f, unsigned coll_meta; /* Collective metadata write flag */ /* Get the dataset transfer property list */ - if(NULL == (dxpl = H5I_object(dxpl_id))) + if(NULL == (dxpl = (H5P_genplist_t *)H5I_object(dxpl_id))) HGOTO_ERROR(H5E_CACHE, H5E_BADTYPE, FAIL, "not a dataset transfer property list") /* Get the collective metadata write property */ @@ -9301,6 +9567,25 @@ H5C_flush_single_entry(const H5F_t * f, else image_size = entry_ptr->size; +#ifdef H5_HAVE_PARALLEL + if(collective_write_list) { + H5C_collective_write_t *item = NULL; + + if(NULL == (item = (H5C_collective_write_t *)H5MM_malloc(sizeof(H5C_collective_write_t)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, FAIL, "unable to allocate skip list item") + + item->length = image_size; + item->free_buf = FALSE; + item->buf = entry_ptr->image_ptr; + item->offset = entry_ptr->addr; + + if(H5SL_insert(collective_write_list, item, &item->offset) < 0) { + H5MM_free(item); + HGOTO_ERROR(H5E_HEAP, H5E_CANTINSERT, FAIL, "unable to insert skip list item") + } /* end if */ + } /* end if */ + else +#endif /* H5_HAVE_PARALLEL */ if ( ( H5F_block_write(f, entry_ptr->type->mem_type, entry_ptr->addr, image_size, dxpl_id, @@ -9644,6 +9929,9 @@ done: static void * H5C_load_entry(H5F_t * f, hid_t dxpl_id, +#ifdef H5_HAVE_PARALLEL + hbool_t coll_access, +#endif /* H5_HAVE_PARALLEL */ const H5C_class_t * type, haddr_t addr, void * udata) @@ -9662,6 +9950,11 @@ H5C_load_entry(H5F_t * f, H5C_cache_entry_t * entry; /* Alias for thing loaded, as cache entry */ size_t len; /* Size of image in file */ unsigned u; /* Local index variable */ +#ifdef H5_HAVE_PARALLEL + int mpi_rank; /* MPI process rank */ + MPI_Comm comm; /* File MPI Communicator */ + int mpi_code; /* MPI error code */ +#endif /* H5_HAVE_PARALLEL */ void * ret_value; /* Return value */ FUNC_ENTER_NOAPI_NOINIT @@ -9808,11 +10101,39 @@ H5C_load_entry(H5F_t * f, HDmemcpy(((uint8_t *)image) + len, H5C_IMAGE_SANITY_VALUE, H5C_IMAGE_EXTRA_SPACE); #endif /* H5C_DO_MEMORY_SANITY_CHECKS */ +#ifdef H5_HAVE_PARALLEL + if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI)) { + if((mpi_rank = H5F_mpi_get_rank(f)) < 0) + HGOTO_ERROR(H5E_FILE, H5E_CANTGET, NULL, "Can't get MPI rank") + if((comm = H5F_mpi_get_comm(f)) == MPI_COMM_NULL) + HGOTO_ERROR(H5E_FILE, H5E_CANTGET, NULL, "get_comm request failed") + } +#endif /* H5_HAVE_PARALLEL */ + /* Get the on-disk entry image */ - if ( 0 == (type->flags & H5C__CLASS_SKIP_READS) ) + if ( 0 == (type->flags & H5C__CLASS_SKIP_READS) ) { +#ifdef H5_HAVE_PARALLEL + if(!coll_access || 0 == mpi_rank) { +#endif /* H5_HAVE_PARALLEL */ + if(H5F_block_read(f, type->mem_type, addr, len, dxpl_id, image) < 0) HGOTO_ERROR(H5E_CACHE, H5E_READERROR, NULL, "Can't read image*") +#ifdef H5_HAVE_PARALLEL + } + /* if the collective metadata read optimization is turned on, + bcast the metadata read from process 0 to all ranks in the file + communicator */ + if(coll_access) { + int buf_size; + + H5_CHECKED_ASSIGN(buf_size, int, len, size_t); + if(MPI_SUCCESS != (mpi_code = MPI_Bcast(image, buf_size, MPI_BYTE, 0, comm))) + HMPI_GOTO_ERROR(NULL, "MPI_Bcast failed", mpi_code) + } +#endif /* H5_HAVE_PARALLEL */ + } + /* Deserialize the on-disk image into the native memory form */ if(NULL == (thing = type->deserialize(image, len, udata, &dirty))) HGOTO_ERROR(H5E_CACHE, H5E_CANTLOAD, NULL, "Can't deserialize image") @@ -9920,21 +10241,32 @@ H5C_load_entry(H5F_t * f, HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, NULL, \ "free_icr callback failed") +#ifdef H5_HAVE_PARALLEL + if(!coll_access || 0 == mpi_rank) { +#endif /* H5_HAVE_PARALLEL */ /* Go get the on-disk image again */ if(H5F_block_read(f, type->mem_type, addr, new_len, dxpl_id, image) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTLOAD, NULL, "Can't read image") - HGOTO_ERROR(H5E_CACHE, H5E_CANTLOAD, NULL, \ - "Can't read image") +#ifdef H5_HAVE_PARALLEL + } + /* if the collective metadata read optimization is turned on, + bcast the metadata read from process 0 to all ranks in the file + communicator */ + if(coll_access) { + int buf_size; + + H5_CHECKED_ASSIGN(buf_size, int, new_len, size_t); + if(MPI_SUCCESS != (mpi_code = MPI_Bcast(image, buf_size, MPI_BYTE, 0, comm))) + HMPI_GOTO_ERROR(NULL, "MPI_Bcast failed", mpi_code) + } +#endif /* H5_HAVE_PARALLEL */ - /* Deserialize on-disk image into native memory - * form again - */ + /* Deserialize on-disk image into native memory form again */ if(NULL == (thing = type->deserialize(image, new_len, udata, &dirty))) - - HGOTO_ERROR(H5E_CACHE, H5E_CANTLOAD, NULL, \ - "Can't deserialize image") + HGOTO_ERROR(H5E_CACHE, H5E_CANTLOAD, NULL, "Can't deserialize image") #ifndef NDEBUG { @@ -10019,6 +10351,8 @@ H5C_load_entry(H5F_t * f, #ifdef H5_HAVE_PARALLEL entry->clear_on_unprotect = FALSE; entry->flush_immediately = FALSE; + entry->coll_access = coll_access; + entry->ind_access_while_coll = FALSE; #endif /* H5_HAVE_PARALLEL */ entry->flush_in_progress = FALSE; entry->destroy_in_progress = FALSE; @@ -10037,6 +10371,9 @@ H5C_load_entry(H5F_t * f, entry->aux_next = NULL; entry->aux_prev = NULL; + entry->coll_next = NULL; + entry->coll_prev = NULL; + H5C__RESET_CACHE_ENTRY_STATS(entry); ret_value = thing; @@ -10230,32 +10567,48 @@ H5C_make_space_in_cache(H5F_t * f, cache_ptr->entries_removed_counter = 0; cache_ptr->last_entry_removed_ptr = NULL; + if(TRUE == entry_ptr->coll_access) { + entry_ptr->coll_access = FALSE; + cache_ptr->num_coll_entries --; + H5C__REMOVE_FROM_COLL_LIST(cache_ptr, entry_ptr, FAIL) + } + result = H5C_flush_single_entry(f, dxpl_id, entry_ptr->addr, H5C__NO_FLAGS_SET, FALSE, + NULL, NULL); - if ( ( cache_ptr->entries_removed_counter > 1 ) || ( cache_ptr->last_entry_removed_ptr == prev_ptr ) ) restart_scan = TRUE; - } else if ( (cache_ptr->index_size + space_needed) - > - cache_ptr->max_cache_size ) { + } else if ( (cache_ptr->index_size + space_needed) > cache_ptr->max_cache_size +#ifdef H5_HAVE_PARALLEL + && !(entry_ptr->coll_access) +#endif /* H5_HAVE_PARALLEL */ + ) { #if H5C_COLLECT_CACHE_STATS cache_ptr->entries_scanned_to_make_space++; #endif /* H5C_COLLECT_CACHE_STATS */ + if(TRUE == entry_ptr->coll_access) { + entry_ptr->coll_access = FALSE; + cache_ptr->num_coll_entries --; + H5C__REMOVE_FROM_COLL_LIST(cache_ptr, entry_ptr, FAIL) + } + result = H5C_flush_single_entry(f, dxpl_id, entry_ptr->addr, H5C__FLUSH_INVALIDATE_FLAG, TRUE, + NULL, NULL); + } else { /* We have enough space so don't flush clean entry. @@ -10404,18 +10757,25 @@ H5C_make_space_in_cache(H5F_t * f, prev_ptr = entry_ptr->aux_prev; - result = H5C_flush_single_entry(f, - dxpl_id, - entry_ptr->addr, - H5C__FLUSH_INVALIDATE_FLAG, - TRUE, - NULL); +#ifdef H5_HAVE_PARALLEL + if(!(entry_ptr->coll_access)) { +#endif /* H5_HAVE_PARALLEL */ + result = H5C_flush_single_entry(f, + dxpl_id, + entry_ptr->addr, + H5C__FLUSH_INVALIDATE_FLAG, + TRUE, + NULL, + NULL); - if ( result < 0 ) { + if ( result < 0 ) { - HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, \ - "unable to flush entry") + HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, \ + "unable to flush entry") + } +#ifdef H5_HAVE_PARALLEL } +#endif /* H5_HAVE_PARALLEL */ /* we are scanning the clean LRU, so the serialize function * will not be called on any entry -- thus there is no @@ -11305,6 +11665,222 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* H5C_flush_marked_entries */ +#ifdef H5_HAVE_PARALLEL + +/*------------------------------------------------------------------------- + * + * Function: H5C_clear_coll_entries + * + * Purpose: + * + * Return: FAIL if error is detected, SUCCEED otherwise. + * + * Programmer: Mohamad Chaarawi + * April, 2015 + * + *------------------------------------------------------------------------- + */ +herr_t +H5C_clear_coll_entries(H5F_t H5_ATTR_UNUSED *f, hid_t H5_ATTR_UNUSED dxpl_id, H5C_t * cache_ptr, + hbool_t partial) +{ + int32_t list_len, coll_entries_cleared = 0, coll_dirty_entries=0; + H5C_cache_entry_t * entry_ptr = NULL; + H5C_cache_entry_t * prev_ptr; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_NOAPI_NOINIT + + //fprintf(stderr, "List len = %d, NUM coll entries = %d\n", cache_ptr->coll_list_len, cache_ptr->num_coll_entries); + entry_ptr = cache_ptr->coll_tail_ptr; + list_len = cache_ptr->coll_list_len; + while(entry_ptr && (coll_entries_cleared < (partial ? list_len/2 : list_len))) { + prev_ptr = entry_ptr->coll_prev; + + HDassert(entry_ptr->coll_access); + + if(entry_ptr->is_dirty && partial && 0) { + coll_dirty_entries ++; + //fprintf(stderr, "%d: %llu Coll entry is Dirty\n", mpi_rank, entry_ptr->addr); + } + else { + entry_ptr->coll_access = FALSE; + H5C__REMOVE_FROM_COLL_LIST(cache_ptr, entry_ptr, FAIL) + coll_entries_cleared ++; + //fprintf(stderr, "Cleared %llu Coll entries Cleaned = %d\n", entry_ptr->addr, coll_entries_cleared); + } + entry_ptr = prev_ptr; + } + + //fprintf(stderr, "NUM COLL entries = %d, CLEARED %d, Dirty %d\n", + //cache_ptr->num_coll_entries, coll_entries_cleared, coll_dirty_entries); + + cache_ptr->num_coll_entries -= coll_entries_cleared; + //fprintf(stderr, "NUM COLL entries = %d, NUM COLL dirty entries = %d\n", cache_ptr->num_coll_entries, coll_dirty_entries); + HDassert(cache_ptr->coll_list_len == cache_ptr->num_coll_entries); + //HDassert(0 == coll_dirty_entries - cache_ptr->num_coll_entries); + +done: + FUNC_LEAVE_NOAPI(ret_value) +} /* H5C_clear_coll_entries */ + +static herr_t +H5C_collective_write(H5F_t *f, + hid_t dxpl_id, + H5SL_t *collective_write_list) +{ + H5P_genplist_t *plist = NULL; + H5FD_mpio_xfer_t xfer_mode = H5FD_MPIO_COLLECTIVE; + H5FD_mpio_xfer_t orig_xfer_mode = -1; + H5SL_node_t *node; + H5C_collective_write_t *item; + int count; + void *base_buf; + int *length_array = NULL; + MPI_Aint *buf_array = NULL; + MPI_Aint *offset_array = NULL; + MPI_Datatype btype; + MPI_Datatype ftype; + hbool_t btype_created = FALSE; + hbool_t ftype_created = FALSE; + int mpi_code; + int i; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_NOAPI_NOINIT + + count = (int)H5SL_count(collective_write_list); + + if(NULL == (plist = (H5P_genplist_t *)H5I_object(dxpl_id))) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a data transfer property list") + + if(H5P_get(plist, H5D_XFER_IO_XFER_MODE_NAME, &orig_xfer_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "can't set MPI-I/O property") + + if(count > 0) { + if(H5P_set(plist, H5D_XFER_IO_XFER_MODE_NAME, &xfer_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "can't set MPI-I/O property") + + /* Allocate arrays */ + if(NULL == (length_array = (int *)H5MM_malloc((size_t)count * sizeof(int)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, FAIL, "memory allocation failed for collective write table length array") + if(NULL == (buf_array = (MPI_Aint *)H5MM_malloc((size_t)count * sizeof(MPI_Aint)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, FAIL, "memory allocation failed for collective buf table length array") + if(NULL == (offset_array = (MPI_Aint *)H5MM_malloc((size_t)count * sizeof(MPI_Aint)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, FAIL, "memory allocation failed for collective offset table length array") + + /* Fill arrays */ + node = H5SL_first(collective_write_list); + HDassert(node); + if(NULL == (item = (H5C_collective_write_t *)H5SL_item(node))) + HGOTO_ERROR(H5E_CACHE, H5E_NOTFOUND, FAIL, "can't retrieve skip list item") + + length_array[0] = (int)item->length; + base_buf = item->buf; + buf_array[0] = (MPI_Aint)0; + offset_array[0] = (MPI_Aint)item->offset; + + node = H5SL_next(node); + i = 1; + while(node) { + if(NULL == (item = (H5C_collective_write_t *)H5SL_item(node))) + HGOTO_ERROR(H5E_CACHE, H5E_NOTFOUND, FAIL, "can't retrieve skip list item") + + length_array[i] = (int)item->length; + buf_array[i] = (MPI_Aint)item->buf - (MPI_Aint)base_buf; + offset_array[i] = (MPI_Aint)item->offset; + node = H5SL_next(node); + i++; + } /* end while */ + + /* Create memory mpi type */ + if(MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed(count, length_array, buf_array, MPI_BYTE, &btype))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) + btype_created = TRUE; + if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&btype))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + + /* Create file mpi type */ + if(MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed(count, length_array, offset_array, MPI_BYTE, &ftype))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) + ftype_created = TRUE; + if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&ftype))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + + /* Pass buf type, file type to the file driver. */ + if(H5FD_mpi_setup_collective(dxpl_id, &btype, &ftype) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "can't set MPI-I/O properties") + + /* Write data */ + if(H5F_block_write(f, H5FD_MEM_DEFAULT, (haddr_t)0, (size_t)1, dxpl_id, base_buf) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "unable to write entries collectively") + } /* end if */ + else { + MPI_Status mpi_stat; + MPI_File mpi_fh_p; + MPI_File mpi_fh; + + if(H5F_get_mpi_handle(f, (MPI_File **)&mpi_fh_p) < 0) + HGOTO_ERROR(H5E_FILE, H5E_CANTGET, FAIL, "can't get mpi file handle") + mpi_fh = *(MPI_File*)mpi_fh_p; + + /* just to match up with the 1st MPI_File_set_view from H5FD_mpio_write() */ + if(MPI_SUCCESS != (mpi_code = MPI_File_set_view(mpi_fh, (MPI_Offset)0, MPI_BYTE, + MPI_BYTE, "native", MPI_INFO_NULL))) + HMPI_GOTO_ERROR(FAIL, "MPI_File_set_view failed", mpi_code) + + /* just to match up with MPI_File_write_at_all from H5FD_mpio_write() */ + HDmemset(&mpi_stat, 0, sizeof(MPI_Status)); + if(MPI_SUCCESS != (mpi_code = MPI_File_write_at_all(mpi_fh, (MPI_Offset)0, NULL, 0, + MPI_BYTE, &mpi_stat))) + HMPI_GOTO_ERROR(FAIL, "MPI_File_write_at_all failed", mpi_code) + + /* just to match up with the 2nd MPI_File_set_view (reset) in H5FD_mpio_write() */ + if(MPI_SUCCESS != (mpi_code = MPI_File_set_view(mpi_fh, (MPI_Offset)0, MPI_BYTE, + MPI_BYTE, "native", MPI_INFO_NULL))) + HMPI_GOTO_ERROR(FAIL, "MPI_File_set_view failed", mpi_code) + } /* end else */ + +done: + /* Free arrays */ + length_array = (int *)H5MM_xfree(length_array); + buf_array = (MPI_Aint *)H5MM_xfree(buf_array); + offset_array = (MPI_Aint *)H5MM_xfree(offset_array); + + /* Free MPI Types */ + if(btype_created && MPI_SUCCESS != (mpi_code = MPI_Type_free(&btype))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + if(ftype_created && MPI_SUCCESS != (mpi_code = MPI_Type_free(&ftype))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + + /* Reset dxpl */ + if(orig_xfer_mode != H5FD_MPIO_COLLECTIVE) { + HDassert(plist); + if(H5P_set(plist, H5D_XFER_IO_XFER_MODE_NAME, &orig_xfer_mode) < 0) + HDONE_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "can't set MPI-I/O property") + } /* end if */ + + FUNC_LEAVE_NOAPI(ret_value); +} /* end H5C_collective_write() */ + +static herr_t +H5C_collective_write_free(void *_item, void H5_ATTR_UNUSED *key, void H5_ATTR_UNUSED *op_data) +{ + H5C_collective_write_t *item = (H5C_collective_write_t *)_item; + + FUNC_ENTER_NOAPI_NOINIT_NOERR + + HDassert(item); + + if(item->free_buf) + item->buf = H5MM_xfree(item->buf); + /*!FIXME change to use free list for items */ + H5MM_free(item); + + FUNC_LEAVE_NOAPI(SUCCEED) +} /* end H5C_double_collective_table() */ +#endif /* H5_HAVE_PARALLEL */ + #if H5C_DO_TAGGING_SANITY_CHECKS /*------------------------------------------------------------------------- @@ -11424,3 +12000,246 @@ H5C_retag_copied_metadata(H5C_t * cache_ptr, haddr_t metadata_tag) FUNC_LEAVE_NOAPI_VOID } /* H5C_retag_copied_metadata */ +static herr_t +H5C__generate_image(H5F_t *f, H5C_t * cache_ptr, H5C_cache_entry_t *entry_ptr, + hid_t dxpl_id, int64_t *entry_size_change_ptr) +{ + haddr_t new_addr = HADDR_UNDEF; + haddr_t old_addr = HADDR_UNDEF; + size_t new_len = 0; + size_t new_compressed_len = 0; + unsigned serialize_flags = H5C__SERIALIZE_NO_FLAGS_SET; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_NOAPI_NOINIT + + HDassert(!entry_ptr->image_up_to_date); + + /* reset cache_ptr->slist_changed so we can detect slist + * modifications in the pre_serialize call. + */ + cache_ptr->slist_changed = FALSE; + + /* make note of the entry's current address */ + old_addr = entry_ptr->addr; + + /* Call client's pre-serialize callback, if there's one */ + if ( ( entry_ptr->type->pre_serialize != NULL ) && + ( (entry_ptr->type->pre_serialize)(f, dxpl_id, + (void *)entry_ptr, + entry_ptr->addr, + entry_ptr->size, + entry_ptr->compressed_size, + &new_addr, &new_len, + &new_compressed_len, + &serialize_flags) < 0 ) ) { + HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, \ + "unable to pre-serialize entry"); + } + + /* set cache_ptr->slist_change_in_pre_serialize if the + * slist was modified. + */ + if ( cache_ptr->slist_changed ) + cache_ptr->slist_change_in_pre_serialize = TRUE; + + /* Check for any flags set in the pre-serialize callback */ + if ( serialize_flags != H5C__SERIALIZE_NO_FLAGS_SET ) { + /* Check for unexpected flags from serialize callback */ + if ( serialize_flags & ~(H5C__SERIALIZE_RESIZED_FLAG | + H5C__SERIALIZE_MOVED_FLAG | + H5C__SERIALIZE_COMPRESSED_FLAG)) { + HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, \ + "unknown serialize flag(s)"); + } + +#ifdef H5_HAVE_PARALLEL + if ( cache_ptr->aux_ptr != NULL ) + HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, \ + "resize/move in serialize occured in parallel case."); +#endif + + /* Resize the buffer if required */ + if ( ( ( ! entry_ptr->compressed ) && + ( serialize_flags & H5C__SERIALIZE_RESIZED_FLAG ) ) || + ( ( entry_ptr->compressed ) && + ( serialize_flags & H5C__SERIALIZE_COMPRESSED_FLAG ) ) ) { + size_t new_image_size; + + if ( entry_ptr->compressed ) + new_image_size = new_compressed_len; + else + new_image_size = new_len; + + HDassert(new_image_size > 0); + + /* Release the current image */ + if ( entry_ptr->image_ptr ) { + entry_ptr->image_ptr = H5MM_xfree(entry_ptr->image_ptr); + } + + /* Allocate a new image buffer */ + entry_ptr->image_ptr = + H5MM_malloc(new_image_size + H5C_IMAGE_EXTRA_SPACE); + + if ( NULL == entry_ptr->image_ptr ) + { + HGOTO_ERROR(H5E_CACHE, H5E_CANTALLOC, FAIL, \ + "memory allocation failed for on disk image buffer"); + } + +#if H5C_DO_MEMORY_SANITY_CHECKS + + HDmemcpy(((uint8_t *)entry_ptr->image_ptr) + new_image_size, + H5C_IMAGE_SANITY_VALUE, + H5C_IMAGE_EXTRA_SPACE); + +#endif /* H5C_DO_MEMORY_SANITY_CHECKS */ + + } /* end if */ + + /* If required, update the entry and the cache data structures + * for a resize. + */ + if ( serialize_flags & H5C__SERIALIZE_RESIZED_FLAG ) { + + H5C__UPDATE_STATS_FOR_ENTRY_SIZE_CHANGE(cache_ptr, \ + entry_ptr, new_len); + + /* update the hash table for the size change*/ + H5C__UPDATE_INDEX_FOR_SIZE_CHANGE(cache_ptr, \ + entry_ptr->size, \ + new_len, entry_ptr, \ + !(entry_ptr->is_dirty)); + + /* The entry can't be protected since we are + * in the process of flushing it. Thus we must + * update the replacement policy data + * structures for the size change. The macro + * deals with the pinned case. + */ + H5C__UPDATE_RP_FOR_SIZE_CHANGE(cache_ptr, entry_ptr, new_len); + + /* as we haven't updated the cache data structures for + * for the flush or flush destroy yet, the entry should + * be in the slist. Thus update it for the size change. + */ + HDassert(entry_ptr->in_slist); + H5C__UPDATE_SLIST_FOR_SIZE_CHANGE(cache_ptr, entry_ptr->size, \ + new_len); + + /* if defined, update *entry_size_change_ptr for the + * change in entry size. + */ + if ( entry_size_change_ptr != NULL ) + { + *entry_size_change_ptr = (int64_t)new_len; + *entry_size_change_ptr -= (int64_t)(entry_ptr->size); + } + + /* finally, update the entry for its new size */ + entry_ptr->size = new_len; + } /* end if */ + + /* If required, udate the entry and the cache data structures + * for a move + */ + if(serialize_flags & H5C__SERIALIZE_MOVED_FLAG) { +#if H5C_DO_SANITY_CHECKS + int64_t saved_slist_len_increase; + int64_t saved_slist_size_increase; +#endif /* H5C_DO_SANITY_CHECKS */ + + H5C__UPDATE_STATS_FOR_MOVE(cache_ptr, entry_ptr); + + if ( entry_ptr->addr == old_addr ) { + /* we must update cache data structures for the + * change in address. + */ + + /* delete the entry from the hash table and the slist */ + H5C__DELETE_FROM_INDEX(cache_ptr, entry_ptr); + H5C__REMOVE_ENTRY_FROM_SLIST(cache_ptr, entry_ptr); + + /* update the entry for its new address */ + entry_ptr->addr = new_addr; + + /* and then reinsert in the index and slist */ + H5C__INSERT_IN_INDEX(cache_ptr, entry_ptr, FAIL); + +#if H5C_DO_SANITY_CHECKS + /* save cache_ptr->slist_len_increase and + * cache_ptr->slist_size_increase before the + * reinsertion into the slist, and restore + * them afterwards to avoid skewing our sanity + * checking. + */ + saved_slist_len_increase = cache_ptr->slist_len_increase; + saved_slist_size_increase = cache_ptr->slist_size_increase; +#endif /* H5C_DO_SANITY_CHECKS */ + + H5C__INSERT_ENTRY_IN_SLIST(cache_ptr, entry_ptr, FAIL); + +#if H5C_DO_SANITY_CHECKS + cache_ptr->slist_len_increase = saved_slist_len_increase; + cache_ptr->slist_size_increase = saved_slist_size_increase; +#endif /* H5C_DO_SANITY_CHECKS */ + } + else { + HDassert(entry_ptr->addr == new_addr); + } + } /* end if */ + + if ( serialize_flags & H5C__SERIALIZE_COMPRESSED_FLAG ) { + /* just save the new compressed entry size in + * entry_ptr->compressed_size. We don't need to + * do more, as compressed size is only used for I/O. + */ + HDassert(entry_ptr->compressed); + entry_ptr->compressed_size = new_compressed_len; + } + } /* end if ( serialize_flags != H5C__SERIALIZE_NO_FLAGS_SET ) */ + + /* Serialize object into buffer */ + { + size_t image_len; + + if ( entry_ptr->compressed ) + image_len = entry_ptr->compressed_size; + else + image_len = entry_ptr->size; + + /* reset cache_ptr->slist_changed so we can detect slist + * modifications in the serialize call. + */ + cache_ptr->slist_changed = FALSE; + + + if ( entry_ptr->type->serialize(f, entry_ptr->image_ptr, + image_len, + (void *)entry_ptr) < 0) { + HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, \ + "unable to serialize entry"); + } + + /* set cache_ptr->slist_change_in_serialize if the + * slist was modified. + */ + if ( cache_ptr->slist_changed ) + cache_ptr->slist_change_in_pre_serialize = TRUE; + +#if H5C_DO_MEMORY_SANITY_CHECKS + + HDassert(0 == HDmemcmp(((uint8_t *)entry_ptr->image_ptr) + + image_len, + H5C_IMAGE_SANITY_VALUE, + H5C_IMAGE_EXTRA_SPACE)); + +#endif /* H5C_DO_MEMORY_SANITY_CHECKS */ + + entry_ptr->image_up_to_date = TRUE; + } + +done: + FUNC_LEAVE_NOAPI(ret_value) +} /* H5C__generate_image */ |