diff options
Diffstat (limited to 'src/H5C.c')
-rw-r--r-- | src/H5C.c | 609 |
1 files changed, 569 insertions, 40 deletions
@@ -88,7 +88,6 @@ #include "H5Cpkg.h" /* Cache */ #include "H5Eprivate.h" /* Error handling */ #include "H5Fpkg.h" /* Files */ -#include "H5FDprivate.h" /* File drivers */ #include "H5FLprivate.h" /* Free Lists */ #include "H5Iprivate.h" /* IDs */ #include "H5MFprivate.h" /* File memory management */ @@ -156,6 +155,9 @@ static herr_t H5C_flush_ring(H5F_t *f, hid_t dxpl_id, H5C_ring_t ring, static void * H5C_load_entry(H5F_t * f, hid_t dxpl_id, +#ifdef H5_HAVE_PARALLEL + hbool_t coll_access, +#endif /* H5_HAVE_PARALLEL */ const H5C_class_t * type, haddr_t addr, void * udata); @@ -180,6 +182,9 @@ static herr_t H5C_mark_tagged_entries(H5C_t * cache_ptr, static herr_t H5C_flush_marked_entries(H5F_t * f, hid_t dxpl_id); +static herr_t H5C__generate_image(H5F_t *f, H5C_t * cache_ptr, H5C_cache_entry_t *entry_ptr, + hid_t dxpl_id, int64_t *entry_size_change_ptr); + #if H5C_DO_TAGGING_SANITY_CHECKS static herr_t H5C_verify_tag(int id, haddr_t tag); #endif @@ -551,6 +556,13 @@ H5C_create(size_t max_cache_size, cache_ptr->LRU_head_ptr = NULL; cache_ptr->LRU_tail_ptr = NULL; +#ifdef H5_HAVE_PARALLEL + cache_ptr->coll_list_len = 0; + cache_ptr->coll_list_size = (size_t)0; + cache_ptr->coll_head_ptr = NULL; + cache_ptr->coll_tail_ptr = NULL; +#endif /* H5_HAVE_PARALLEL */ + cache_ptr->cLRU_list_len = 0; cache_ptr->cLRU_list_size = (size_t)0; cache_ptr->cLRU_head_ptr = NULL; @@ -972,6 +984,12 @@ H5C_expunge_entry(H5F_t *f, hid_t dxpl_id, const H5C_class_t *type, HGOTO_ERROR(H5E_CACHE, H5E_CANTEXPUNGE, FAIL, "Target entry is protected.") if(entry_ptr->is_pinned) HGOTO_ERROR(H5E_CACHE, H5E_CANTEXPUNGE, FAIL, "Target entry is pinned.") +#ifdef H5_HAVE_PARALLEL + if(entry_ptr->coll_access) { + entry_ptr->coll_access = FALSE; + H5C__REMOVE_FROM_COLL_LIST(cache_ptr, entry_ptr, FAIL) + } +#endif /* H5_HAVE_PARALLEL */ /* If we get this far, call H5C__flush_single_entry() with the * H5C__FLUSH_INVALIDATE_FLAG and the H5C__FLUSH_CLEAR_ONLY_FLAG. @@ -989,8 +1007,8 @@ H5C_expunge_entry(H5F_t *f, hid_t dxpl_id, const H5C_class_t *type, /* Delete the entry from the skip list on destroy */ flush_flags |= H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG; - if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, flush_flags, NULL) < 0) - HGOTO_ERROR(H5E_CACHE, H5E_CANTEXPUNGE, FAIL, "can't flush entry") + if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, flush_flags, NULL, NULL) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTEXPUNGE, FAIL, "H5C_flush_single_entry() failed.") #if H5C_DO_SANITY_CHECKS if ( entry_was_dirty ) @@ -1752,19 +1770,20 @@ H5C_insert_entry(H5F_t * f, void * thing, unsigned int flags) { - H5C_t * cache_ptr; + H5C_t *cache_ptr; H5P_genplist_t *dxpl; H5AC_ring_t ring = H5C_RING_UNDEFINED; hbool_t insert_pinned; hbool_t flush_last; #ifdef H5_HAVE_PARALLEL + hbool_t coll_access = FALSE; /* whether access to the cache entry is done collectively */ hbool_t flush_collectively; #endif /* H5_HAVE_PARALLEL */ hbool_t set_flush_marker; hbool_t write_permitted = TRUE; size_t empty_space; - H5C_cache_entry_t * entry_ptr; - H5C_cache_entry_t * test_entry_ptr; + H5C_cache_entry_t *entry_ptr; + H5C_cache_entry_t *test_entry_ptr; unsigned u; /* Local index variable */ herr_t ret_value = SUCCEED; /* Return value */ @@ -1903,6 +1922,9 @@ H5C_insert_entry(H5F_t * f, entry_ptr->aux_next = NULL; entry_ptr->aux_prev = NULL; + entry_ptr->coll_next = NULL; + entry_ptr->coll_prev = NULL; + H5C__RESET_CACHE_ENTRY_STATS(entry_ptr) if ( ( cache_ptr->flash_size_increase_possible ) && @@ -2007,6 +2029,49 @@ H5C_insert_entry(H5F_t * f, H5C__UPDATE_STATS_FOR_INSERTION(cache_ptr, entry_ptr) +#ifdef H5_HAVE_PARALLEL + /* Get the dataset transfer property list */ + if(NULL == (dxpl = (H5P_genplist_t *)H5I_object(dxpl_id))) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a property list"); + + if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI)) { + coll_access = (H5P_USER_TRUE == f->coll_md_read ? TRUE : FALSE); + + if(!coll_access && H5P_FORCE_FALSE != f->coll_md_read) { + H5P_coll_md_read_flag_t prop_value; + + /* get the property value */ + if(H5P_get(dxpl, H5_COLL_MD_READ_FLAG_NAME, &prop_value) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "Can't get collective metadata access flag") + + coll_access = (H5P_USER_TRUE == prop_value ? TRUE : FALSE); + } + //fprintf(stderr, "COLLACCESS = %d.. FILE property = %d\n", coll_access, f->coll_md_read); + } + + entry_ptr->coll_access = coll_access; + if(coll_access) { + //fprintf(stderr, "NEW (%llu, %s)\n", addr, entry_ptr->type->name); + H5C__INSERT_IN_COLL_LIST(cache_ptr, entry_ptr, FAIL) + + /* Make sure the size of the collective entries in the cache remain in check */ + if(H5P_USER_TRUE == f->coll_md_read) { + if(cache_ptr->max_cache_size*80 < cache_ptr->coll_list_size*100) { + if(H5C_clear_coll_entries(cache_ptr, 1) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTGET, FAIL, "H5C_clear_coll_entries() failed.") + } + } + else { + if(cache_ptr->max_cache_size*40 < cache_ptr->coll_list_size*100) { + if(H5C_clear_coll_entries(cache_ptr, 1) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTGET, FAIL, "H5C_clear_coll_entries() failed.") + } + } + + } + entry_ptr->ind_access_while_coll = FALSE; +#endif + done: #if H5C_DO_EXTREME_SANITY_CHECKS @@ -2368,6 +2433,14 @@ H5C_resize_entry(void *thing, size_t new_size) (entry_ptr->size), (new_size)) } /* end if */ +#ifdef H5_HAVE_PARALLEL + if(entry_ptr->coll_access) { + H5C__DLL_UPDATE_FOR_SIZE_CHANGE((cache_ptr->coll_list_len), \ + (cache_ptr->coll_list_size), \ + (entry_ptr->size), (new_size)) + } /* end if */ +#endif /* H5_HAVE_PARALLEL */ + /* update the hash table */ H5C__UPDATE_INDEX_FOR_SIZE_CHANGE((cache_ptr), (entry_ptr->size),\ (new_size), (entry_ptr), (was_clean)); @@ -2589,7 +2662,6 @@ H5C_protect(H5F_t * f, unsigned flags) { H5C_t * cache_ptr; - H5P_genplist_t *dxpl; H5AC_ring_t ring = H5C_RING_UNDEFINED; hbool_t hit; hbool_t have_write_permitted = FALSE; @@ -2597,11 +2669,13 @@ H5C_protect(H5F_t * f, hbool_t flush_last; #ifdef H5_HAVE_PARALLEL hbool_t flush_collectively; + hbool_t coll_access = FALSE; /* whether access to the cache entry is done collectively */ #endif /* H5_HAVE_PARALLEL */ hbool_t write_permitted; size_t empty_space; void * thing; H5C_cache_entry_t * entry_ptr; + H5P_genplist_t * dxpl; /* dataset transfer property list */ void * ret_value = NULL; /* Return value */ FUNC_ENTER_NOAPI(NULL) @@ -2641,6 +2715,23 @@ H5C_protect(H5F_t * f, if((H5P_get(dxpl, H5AC_RING_NAME, &ring)) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTGET, NULL, "unable to query ring value") +#ifdef H5_HAVE_PARALLEL + if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI)) { + coll_access = (H5P_USER_TRUE == f->coll_md_read ? TRUE : FALSE); + + if(!coll_access && H5P_FORCE_FALSE != f->coll_md_read) { + H5P_coll_md_read_flag_t prop_value; + + /* get the property value */ + if(H5P_get(dxpl, H5_COLL_MD_READ_FLAG_NAME, &prop_value) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, NULL, "Can't get collective metadata access flag") + + coll_access = (H5P_USER_TRUE == prop_value ? TRUE : FALSE); + } + //fprintf(stderr, "COLLACCESS = %d.. FILE property = %d\n", coll_access, f->coll_md_read); + } +#endif /* H5_HAVE_PARALLEL */ + /* first check to see if the target is in cache */ H5C__SEARCH_INDEX(cache_ptr, addr, entry_ptr, NULL) @@ -2652,6 +2743,71 @@ H5C_protect(H5F_t * f, if(entry_ptr->type != type) HGOTO_ERROR(H5E_CACHE, H5E_BADTYPE, NULL, "incorrect cache entry type") + /* if this is a collective metadata read, the entry is not + marked as collective, and is clean, it is possible that + other processes will not have it in its cache and will + expect a bcast of the entry from process 0. So process 0 + will bcast the entry to all other ranks. Ranks that do have + the entry in their cache still have to participate in the + bcast. */ +#ifdef H5_HAVE_PARALLEL + if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI) && coll_access && + !(entry_ptr->is_dirty) && !(entry_ptr->coll_access)) { + MPI_Comm comm; /* File MPI Communicator */ + int mpi_code; /* MPI error code */ + int buf_size; + + if((comm = H5F_mpi_get_comm(f)) == MPI_COMM_NULL) + HGOTO_ERROR(H5E_FILE, H5E_CANTGET, NULL, "get_comm request failed") + + if(entry_ptr->image_ptr == NULL) { + int mpi_rank; + size_t image_size; + + if((mpi_rank = H5F_mpi_get_rank(f)) < 0) + HGOTO_ERROR(H5E_FILE, H5E_CANTGET, NULL, "Can't get MPI rank") + + if ( entry_ptr->compressed ) + image_size = entry_ptr->compressed_size; + else + image_size = entry_ptr->size; + + HDassert(image_size > 0); + + entry_ptr->image_ptr = H5MM_malloc(image_size + H5C_IMAGE_EXTRA_SPACE); + + if (NULL == entry_ptr->image_ptr) { + HGOTO_ERROR(H5E_CACHE, H5E_CANTALLOC, NULL, \ + "memory allocation failed for on disk image buffer") + } +#if H5C_DO_MEMORY_SANITY_CHECKS + HDmemcpy(((uint8_t *)entry_ptr->image_ptr) + image_size, + H5C_IMAGE_SANITY_VALUE, H5C_IMAGE_EXTRA_SPACE); +#endif /* H5C_DO_MEMORY_SANITY_CHECKS */ + if(0 == mpi_rank) { + if(H5C__generate_image(f, cache_ptr, entry_ptr, dxpl_id, NULL) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTGET, NULL, "Can't get Image") + } + } + + HDassert(entry_ptr->image_ptr); + + H5_CHECKED_ASSIGN(buf_size, int, entry_ptr->size, size_t); + if(MPI_SUCCESS != (mpi_code = MPI_Bcast(entry_ptr->image_ptr, buf_size, + MPI_BYTE, 0, comm))) + HMPI_GOTO_ERROR(NULL, "MPI_Bcast failed", mpi_code) + + entry_ptr->coll_access = TRUE; + + //fprintf(stderr, "ONLY INSERT (%llu, %s)\n", addr, entry_ptr->type->name); + H5C__INSERT_IN_COLL_LIST(cache_ptr, entry_ptr, NULL) + } + else if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI) && coll_access && entry_ptr->coll_access) { + //fprintf(stderr, "UPDATE (%llu, %s)\n", addr, entry_ptr->type->name); + H5C__MOVE_TO_TOP_IN_COLL_LIST(cache_ptr, entry_ptr, NULL) + } +#endif /* H5_HAVE_PARALLEL */ + #if H5C_DO_TAGGING_SANITY_CHECKS { haddr_t tag = HADDR_UNDEF; @@ -2684,7 +2840,11 @@ H5C_protect(H5F_t * f, hit = FALSE; - thing = H5C_load_entry(f, dxpl_id, type, addr, udata); + thing = H5C_load_entry(f, dxpl_id, +#ifdef H5_HAVE_PARALLEL + coll_access, +#endif /* H5_HAVE_PARALLEL */ + type, addr, udata); if ( thing == NULL ) { @@ -2692,8 +2852,16 @@ H5C_protect(H5F_t * f, } entry_ptr = (H5C_cache_entry_t *)thing; - entry_ptr->ring = ring; +#ifdef H5_HAVE_PARALLEL + if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI)) { + if(entry_ptr->coll_access) { + //fprintf(stderr, "LOAD and INSERT (%llu, %s)\n", addr, entry_ptr->type->name); + H5C__INSERT_IN_COLL_LIST(cache_ptr, entry_ptr, NULL) + } + } + //fprintf(stderr, "LOADED (%llu, %s) - %d\n", addr, entry_ptr->type->name, coll_access); +#endif /* H5_HAVE_PARALLEL */ /* Apply tag to newly protected entry */ if(H5C_tag_entry(cache_ptr, entry_ptr, dxpl_id) < 0) @@ -2931,6 +3099,29 @@ H5C_protect(H5F_t * f, } } +#ifdef H5_HAVE_PARALLEL + if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI)) { + /* Make sure the size of the collective entries in the cache remain in check */ + if( TRUE == coll_access) { + if(H5P_USER_TRUE == f->coll_md_read) { + //fprintf(stderr, "COLL entries size = %zu, MAX = %zu\n", cache_ptr->coll_list_size, cache_ptr->max_cache_size); + if(cache_ptr->max_cache_size*80 < cache_ptr->coll_list_size*100) { + //fprintf(stderr, "COLL entries at 80.. CLEARING\n"); + if(H5C_clear_coll_entries(cache_ptr, 1) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTGET, NULL, "H5C_clear_coll_entries() failed.") + } + } + else { + if(cache_ptr->max_cache_size*40 < cache_ptr->coll_list_size*100) { + //fprintf(stderr, "COLL entries at 40.. CLEARING\n"); + if(H5C_clear_coll_entries(cache_ptr, 1) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTGET, NULL, "H5C_clear_coll_entries() failed.") + } + } + } + } +#endif /* H5_HAVE_PARALLEL */ + done: #if H5C_DO_EXTREME_SANITY_CHECKS @@ -2938,7 +3129,7 @@ done: ( H5C_validate_pinned_entry_list(cache_ptr) < 0 ) || ( H5C_validate_lru_list(cache_ptr) < 0 ) ) { - HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, \ + HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, NULL, \ "an extreme sanity check failed on exit.\n"); } #endif /* H5C_DO_EXTREME_SANITY_CHECKS */ @@ -4591,7 +4782,7 @@ H5C_unprotect(H5F_t * f, /* Delete the entry from the skip list on destroy */ flush_flags |= H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG; - if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, flush_flags, NULL) < 0) + if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, flush_flags, NULL, NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTUNPROTECT, FAIL, "Can't flush entry") #if H5C_DO_SANITY_CHECKS @@ -4617,7 +4808,7 @@ H5C_unprotect(H5F_t * f, else if(test_entry_ptr != entry_ptr) HGOTO_ERROR(H5E_CACHE, H5E_CANTUNPROTECT, FAIL, "hash table contains multiple entries for addr?!?.") - if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__FLUSH_CLEAR_ONLY_FLAG | H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG, NULL) < 0) + if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__FLUSH_CLEAR_ONLY_FLAG | H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG, NULL, NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTUNPROTECT, FAIL, "Can't clear entry") } #endif /* H5_HAVE_PARALLEL */ @@ -5949,7 +6140,7 @@ H5C__autoadjust__ageout__evict_aged_out_entries(H5F_t * f, cache_ptr->entries_removed_counter = 0; cache_ptr->last_entry_removed_ptr = NULL; - if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__NO_FLAGS_SET, NULL) < 0) + if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__NO_FLAGS_SET, NULL, NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "unable to flush entry") if ( ( cache_ptr->entries_removed_counter > 1 ) || @@ -5961,7 +6152,7 @@ H5C__autoadjust__ageout__evict_aged_out_entries(H5F_t * f, bytes_evicted += entry_ptr->size; - if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__FLUSH_INVALIDATE_FLAG | H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG, NULL) < 0 ) + if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__FLUSH_INVALIDATE_FLAG | H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG, NULL, NULL) < 0 ) HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "unable to flush entry") } @@ -6041,8 +6232,7 @@ H5C__autoadjust__ageout__evict_aged_out_entries(H5F_t * f, prev_ptr = entry_ptr->prev; if ( ! (entry_ptr->is_dirty) ) { - - if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__FLUSH_INVALIDATE_FLAG | H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG, NULL) < 0) + if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__FLUSH_INVALIDATE_FLAG | H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG, NULL, NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "unable to flush clean entry") } /* just skip the entry if it is dirty, as we can't do @@ -6890,7 +7080,7 @@ H5C_flush_invalidate_ring(const H5F_t * f, hid_t dxpl_id, H5C_ring_t ring, entry_size_change = 0; #endif /* H5C_DO_SANITY_CHECKS */ - if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__NO_FLAGS_SET, entry_size_change_ptr) < 0) + if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__NO_FLAGS_SET, entry_size_change_ptr, NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "dirty pinned entry flush failed.") #if H5C_DO_SANITY_CHECKS /* entry size may have changed during the flush. @@ -6935,9 +7125,9 @@ H5C_flush_invalidate_ring(const H5F_t * f, hid_t dxpl_id, H5C_ring_t ring, entry_size_change = 0; #endif /* H5C_DO_SANITY_CHECKS */ - if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, + if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, (cooked_flags | H5C__FLUSH_INVALIDATE_FLAG | H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG), - entry_size_change_ptr) < 0) + entry_size_change_ptr, NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "dirty entry flush destroy failed.") #if H5C_DO_SANITY_CHECKS /* entry size may have changed during the flush. @@ -7058,8 +7248,8 @@ H5C_flush_invalidate_ring(const H5F_t * f, hid_t dxpl_id, H5C_ring_t ring, entry_was_dirty = entry_ptr->is_dirty; if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, - (cooked_flags | H5C__FLUSH_INVALIDATE_FLAG | H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG), - NULL) < 0) + (cooked_flags | H5C__FLUSH_INVALIDATE_FLAG | H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG), + NULL, NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "Entry flush destroy failed.") if(entry_was_dirty) { @@ -7429,7 +7619,7 @@ H5C_flush_ring(H5F_t *f, hid_t dxpl_id, H5C_ring_t ring, unsigned flags) entry_size_change = 0; #endif /* H5C_DO_SANITY_CHECKS */ - if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, flags, entry_size_change_ptr) < 0) + if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, flags, entry_size_change_ptr, NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "dirty pinned entry flush failed.") #if H5C_DO_SANITY_CHECKS @@ -7474,7 +7664,7 @@ H5C_flush_ring(H5F_t *f, hid_t dxpl_id, H5C_ring_t ring, unsigned flags) flushed_entries_size += (int64_t)entry_ptr->size; entry_size_change = 0; #endif /* H5C_DO_SANITY_CHECKS */ - if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, flags, entry_size_change_ptr) < 0) + if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, flags, entry_size_change_ptr, NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "Can't flush entry.") #if H5C_DO_SANITY_CHECKS @@ -7623,7 +7813,7 @@ done: */ herr_t H5C__flush_single_entry(const H5F_t *f, hid_t dxpl_id, H5C_cache_entry_t *entry_ptr, - unsigned flags, int64_t *entry_size_change_ptr) + unsigned flags, int64_t *entry_size_change_ptr, H5SL_t *collective_write_list) { H5C_t * cache_ptr; /* Cache for file */ hbool_t destroy; /* external flag */ @@ -7668,6 +7858,10 @@ H5C__flush_single_entry(const H5F_t *f, hid_t dxpl_id, H5C_cache_entry_t *entry_ else destroy_entry = destroy; +#ifdef H5_HAVE_PARALLEL + HDassert(FALSE == entry_ptr->coll_access); +#endif + /* we will write the entry to disk if it exists, is dirty, and if the * clear only flag is not set. */ @@ -8025,6 +8219,25 @@ H5C__flush_single_entry(const H5F_t *f, hid_t dxpl_id, H5C_cache_entry_t *entry_ else image_size = entry_ptr->size; +#ifdef H5_HAVE_PARALLEL + if(collective_write_list) { + H5C_collective_write_t *item = NULL; + + if(NULL == (item = (H5C_collective_write_t *)H5MM_malloc(sizeof(H5C_collective_write_t)))) + HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, FAIL, "unable to allocate skip list item") + + item->length = image_size; + item->free_buf = FALSE; + item->buf = entry_ptr->image_ptr; + item->offset = entry_ptr->addr; + + if(H5SL_insert(collective_write_list, item, &item->offset) < 0) { + H5MM_free(item); + HGOTO_ERROR(H5E_HEAP, H5E_CANTINSERT, FAIL, "unable to insert skip list item") + } /* end if */ + } /* end if */ + else +#endif /* H5_HAVE_PARALLEL */ if(H5F_block_write(f, entry_ptr->type->mem_type, entry_ptr->addr, image_size, dxpl_id, entry_ptr->image_ptr) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "Can't write image to file.") @@ -8277,6 +8490,9 @@ done: static void * H5C_load_entry(H5F_t * f, hid_t dxpl_id, +#ifdef H5_HAVE_PARALLEL + hbool_t coll_access, +#endif /* H5_HAVE_PARALLEL */ const H5C_class_t * type, haddr_t addr, void * udata) @@ -8295,6 +8511,11 @@ H5C_load_entry(H5F_t * f, H5C_cache_entry_t * entry; /* Alias for thing loaded, as cache entry */ size_t len; /* Size of image in file */ unsigned u; /* Local index variable */ +#ifdef H5_HAVE_PARALLEL + int mpi_rank; /* MPI process rank */ + MPI_Comm comm; /* File MPI Communicator */ + int mpi_code; /* MPI error code */ +#endif /* H5_HAVE_PARALLEL */ void * ret_value = NULL; /* Return value */ FUNC_ENTER_NOAPI_NOINIT @@ -8441,11 +8662,39 @@ H5C_load_entry(H5F_t * f, HDmemcpy(((uint8_t *)image) + len, H5C_IMAGE_SANITY_VALUE, H5C_IMAGE_EXTRA_SPACE); #endif /* H5C_DO_MEMORY_SANITY_CHECKS */ +#ifdef H5_HAVE_PARALLEL + if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI)) { + if((mpi_rank = H5F_mpi_get_rank(f)) < 0) + HGOTO_ERROR(H5E_FILE, H5E_CANTGET, NULL, "Can't get MPI rank") + if((comm = H5F_mpi_get_comm(f)) == MPI_COMM_NULL) + HGOTO_ERROR(H5E_FILE, H5E_CANTGET, NULL, "get_comm request failed") + } +#endif /* H5_HAVE_PARALLEL */ + /* Get the on-disk entry image */ - if ( 0 == (type->flags & H5C__CLASS_SKIP_READS) ) + if ( 0 == (type->flags & H5C__CLASS_SKIP_READS) ) { +#ifdef H5_HAVE_PARALLEL + if(!coll_access || 0 == mpi_rank) { +#endif /* H5_HAVE_PARALLEL */ + if(H5F_block_read(f, type->mem_type, addr, len, dxpl_id, image) < 0) HGOTO_ERROR(H5E_CACHE, H5E_READERROR, NULL, "Can't read image*") +#ifdef H5_HAVE_PARALLEL + } + /* if the collective metadata read optimization is turned on, + bcast the metadata read from process 0 to all ranks in the file + communicator */ + if(coll_access) { + int buf_size; + + H5_CHECKED_ASSIGN(buf_size, int, len, size_t); + if(MPI_SUCCESS != (mpi_code = MPI_Bcast(image, buf_size, MPI_BYTE, 0, comm))) + HMPI_GOTO_ERROR(NULL, "MPI_Bcast failed", mpi_code) + } +#endif /* H5_HAVE_PARALLEL */ + } + /* Deserialize the on-disk image into the native memory form */ if(NULL == (thing = type->deserialize(image, len, udata, &dirty))) HGOTO_ERROR(H5E_CACHE, H5E_CANTLOAD, NULL, "Can't deserialize image") @@ -8553,21 +8802,32 @@ H5C_load_entry(H5F_t * f, HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, NULL, \ "free_icr callback failed") +#ifdef H5_HAVE_PARALLEL + if(!coll_access || 0 == mpi_rank) { +#endif /* H5_HAVE_PARALLEL */ /* Go get the on-disk image again */ if(H5F_block_read(f, type->mem_type, addr, new_len, dxpl_id, image) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTLOAD, NULL, "Can't read image") - HGOTO_ERROR(H5E_CACHE, H5E_CANTLOAD, NULL, \ - "Can't read image") +#ifdef H5_HAVE_PARALLEL + } + /* if the collective metadata read optimization is turned on, + bcast the metadata read from process 0 to all ranks in the file + communicator */ + if(coll_access) { + int buf_size; + + H5_CHECKED_ASSIGN(buf_size, int, new_len, size_t); + if(MPI_SUCCESS != (mpi_code = MPI_Bcast(image, buf_size, MPI_BYTE, 0, comm))) + HMPI_GOTO_ERROR(NULL, "MPI_Bcast failed", mpi_code) + } +#endif /* H5_HAVE_PARALLEL */ - /* Deserialize on-disk image into native memory - * form again - */ + /* Deserialize on-disk image into native memory form again */ if(NULL == (thing = type->deserialize(image, new_len, udata, &dirty))) - - HGOTO_ERROR(H5E_CACHE, H5E_CANTLOAD, NULL, \ - "Can't deserialize image") + HGOTO_ERROR(H5E_CACHE, H5E_CANTLOAD, NULL, "Can't deserialize image") #ifndef NDEBUG { @@ -8652,6 +8912,8 @@ H5C_load_entry(H5F_t * f, #ifdef H5_HAVE_PARALLEL entry->clear_on_unprotect = FALSE; entry->flush_immediately = FALSE; + entry->coll_access = coll_access; + entry->ind_access_while_coll = FALSE; #endif /* H5_HAVE_PARALLEL */ entry->flush_in_progress = FALSE; entry->destroy_in_progress = FALSE; @@ -8672,6 +8934,9 @@ H5C_load_entry(H5F_t * f, entry->aux_next = NULL; entry->aux_prev = NULL; + entry->coll_next = NULL; + entry->coll_prev = NULL; + H5C__RESET_CACHE_ENTRY_STATS(entry); ret_value = thing; @@ -8864,7 +9129,14 @@ H5C_make_space_in_cache(H5F_t * f, cache_ptr->entries_removed_counter = 0; cache_ptr->last_entry_removed_ptr = NULL; - if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__NO_FLAGS_SET, NULL) < 0) +#ifdef H5_HAVE_PARALLEL + if(TRUE == entry_ptr->coll_access) { + entry_ptr->coll_access = FALSE; + H5C__REMOVE_FROM_COLL_LIST(cache_ptr, entry_ptr, FAIL) + } +#endif + + if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__NO_FLAGS_SET, NULL, NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "unable to flush entry") if ( ( cache_ptr->entries_removed_counter > 1 ) || @@ -8872,16 +9144,24 @@ H5C_make_space_in_cache(H5F_t * f, restart_scan = TRUE; - } else if ( (cache_ptr->index_size + space_needed) - > - cache_ptr->max_cache_size ) { + } else if ( (cache_ptr->index_size + space_needed) > cache_ptr->max_cache_size +#ifdef H5_HAVE_PARALLEL + && !(entry_ptr->coll_access) +#endif /* H5_HAVE_PARALLEL */ + ) { #if H5C_COLLECT_CACHE_STATS cache_ptr->entries_scanned_to_make_space++; #endif /* H5C_COLLECT_CACHE_STATS */ - if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__FLUSH_INVALIDATE_FLAG | H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG, NULL) < 0) - HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "unable to flush entry") +#ifdef H5_HAVE_PARALLEL + if(TRUE == entry_ptr->coll_access) { + entry_ptr->coll_access = FALSE; + H5C__REMOVE_FROM_COLL_LIST(cache_ptr, entry_ptr, FAIL) + } +#endif + if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__FLUSH_INVALIDATE_FLAG | H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG, NULL, NULL) < 0) + HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "unable to flush entry") } else { /* We have enough space so don't flush clean entry. */ #if H5C_COLLECT_CACHE_STATS @@ -9016,8 +9296,14 @@ H5C_make_space_in_cache(H5F_t * f, prev_ptr = entry_ptr->aux_prev; - if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__FLUSH_INVALIDATE_FLAG | H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG, NULL) < 0) +#ifdef H5_HAVE_PARALLEL + if(!(entry_ptr->coll_access)) { +#endif /* H5_HAVE_PARALLEL */ + if(H5C__flush_single_entry(f, dxpl_id, entry_ptr, H5C__FLUSH_INVALIDATE_FLAG | H5C__DEL_FROM_SLIST_ON_DESTROY_FLAG, NULL, NULL) < 0) HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, "unable to flush entry") +#ifdef H5_HAVE_PARALLEL + } +#endif /* H5_HAVE_PARALLEL */ /* we are scanning the clean LRU, so the serialize function * will not be called on any entry -- thus there is no @@ -10069,3 +10355,246 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* H5C_get_entry_ring() */ +static herr_t +H5C__generate_image(H5F_t *f, H5C_t * cache_ptr, H5C_cache_entry_t *entry_ptr, + hid_t dxpl_id, int64_t *entry_size_change_ptr) +{ + haddr_t new_addr = HADDR_UNDEF; + haddr_t old_addr = HADDR_UNDEF; + size_t new_len = 0; + size_t new_compressed_len = 0; + unsigned serialize_flags = H5C__SERIALIZE_NO_FLAGS_SET; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_NOAPI_NOINIT + + HDassert(!entry_ptr->image_up_to_date); + + /* reset cache_ptr->slist_changed so we can detect slist + * modifications in the pre_serialize call. + */ + cache_ptr->slist_changed = FALSE; + + /* make note of the entry's current address */ + old_addr = entry_ptr->addr; + + /* Call client's pre-serialize callback, if there's one */ + if ( ( entry_ptr->type->pre_serialize != NULL ) && + ( (entry_ptr->type->pre_serialize)(f, dxpl_id, + (void *)entry_ptr, + entry_ptr->addr, + entry_ptr->size, + entry_ptr->compressed_size, + &new_addr, &new_len, + &new_compressed_len, + &serialize_flags) < 0 ) ) { + HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, \ + "unable to pre-serialize entry"); + } + + /* set cache_ptr->slist_change_in_pre_serialize if the + * slist was modified. + */ + if ( cache_ptr->slist_changed ) + cache_ptr->slist_change_in_pre_serialize = TRUE; + + /* Check for any flags set in the pre-serialize callback */ + if ( serialize_flags != H5C__SERIALIZE_NO_FLAGS_SET ) { + /* Check for unexpected flags from serialize callback */ + if ( serialize_flags & ~(H5C__SERIALIZE_RESIZED_FLAG | + H5C__SERIALIZE_MOVED_FLAG | + H5C__SERIALIZE_COMPRESSED_FLAG)) { + HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, \ + "unknown serialize flag(s)"); + } + +#ifdef H5_HAVE_PARALLEL + if ( cache_ptr->aux_ptr != NULL ) + HGOTO_ERROR(H5E_CACHE, H5E_SYSTEM, FAIL, \ + "resize/move in serialize occured in parallel case."); +#endif + + /* Resize the buffer if required */ + if ( ( ( ! entry_ptr->compressed ) && + ( serialize_flags & H5C__SERIALIZE_RESIZED_FLAG ) ) || + ( ( entry_ptr->compressed ) && + ( serialize_flags & H5C__SERIALIZE_COMPRESSED_FLAG ) ) ) { + size_t new_image_size; + + if ( entry_ptr->compressed ) + new_image_size = new_compressed_len; + else + new_image_size = new_len; + + HDassert(new_image_size > 0); + + /* Release the current image */ + if ( entry_ptr->image_ptr ) { + entry_ptr->image_ptr = H5MM_xfree(entry_ptr->image_ptr); + } + + /* Allocate a new image buffer */ + entry_ptr->image_ptr = + H5MM_malloc(new_image_size + H5C_IMAGE_EXTRA_SPACE); + + if ( NULL == entry_ptr->image_ptr ) + { + HGOTO_ERROR(H5E_CACHE, H5E_CANTALLOC, FAIL, \ + "memory allocation failed for on disk image buffer"); + } + +#if H5C_DO_MEMORY_SANITY_CHECKS + + HDmemcpy(((uint8_t *)entry_ptr->image_ptr) + new_image_size, + H5C_IMAGE_SANITY_VALUE, + H5C_IMAGE_EXTRA_SPACE); + +#endif /* H5C_DO_MEMORY_SANITY_CHECKS */ + + } /* end if */ + + /* If required, update the entry and the cache data structures + * for a resize. + */ + if ( serialize_flags & H5C__SERIALIZE_RESIZED_FLAG ) { + + H5C__UPDATE_STATS_FOR_ENTRY_SIZE_CHANGE(cache_ptr, \ + entry_ptr, new_len); + + /* update the hash table for the size change*/ + H5C__UPDATE_INDEX_FOR_SIZE_CHANGE(cache_ptr, \ + entry_ptr->size, \ + new_len, entry_ptr, \ + !(entry_ptr->is_dirty)); + + /* The entry can't be protected since we are + * in the process of flushing it. Thus we must + * update the replacement policy data + * structures for the size change. The macro + * deals with the pinned case. + */ + H5C__UPDATE_RP_FOR_SIZE_CHANGE(cache_ptr, entry_ptr, new_len); + + /* as we haven't updated the cache data structures for + * for the flush or flush destroy yet, the entry should + * be in the slist. Thus update it for the size change. + */ + HDassert(entry_ptr->in_slist); + H5C__UPDATE_SLIST_FOR_SIZE_CHANGE(cache_ptr, entry_ptr->size, \ + new_len); + + /* if defined, update *entry_size_change_ptr for the + * change in entry size. + */ + if ( entry_size_change_ptr != NULL ) + { + *entry_size_change_ptr = (int64_t)new_len; + *entry_size_change_ptr -= (int64_t)(entry_ptr->size); + } + + /* finally, update the entry for its new size */ + entry_ptr->size = new_len; + } /* end if */ + + /* If required, udate the entry and the cache data structures + * for a move + */ + if(serialize_flags & H5C__SERIALIZE_MOVED_FLAG) { +#if H5C_DO_SANITY_CHECKS + int64_t saved_slist_len_increase; + int64_t saved_slist_size_increase; +#endif /* H5C_DO_SANITY_CHECKS */ + + H5C__UPDATE_STATS_FOR_MOVE(cache_ptr, entry_ptr); + + if ( entry_ptr->addr == old_addr ) { + /* we must update cache data structures for the + * change in address. + */ + + /* delete the entry from the hash table and the slist */ + H5C__DELETE_FROM_INDEX(cache_ptr, entry_ptr); + H5C__REMOVE_ENTRY_FROM_SLIST(cache_ptr, entry_ptr); + + /* update the entry for its new address */ + entry_ptr->addr = new_addr; + + /* and then reinsert in the index and slist */ + H5C__INSERT_IN_INDEX(cache_ptr, entry_ptr, FAIL); + +#if H5C_DO_SANITY_CHECKS + /* save cache_ptr->slist_len_increase and + * cache_ptr->slist_size_increase before the + * reinsertion into the slist, and restore + * them afterwards to avoid skewing our sanity + * checking. + */ + saved_slist_len_increase = cache_ptr->slist_len_increase; + saved_slist_size_increase = cache_ptr->slist_size_increase; +#endif /* H5C_DO_SANITY_CHECKS */ + + H5C__INSERT_ENTRY_IN_SLIST(cache_ptr, entry_ptr, FAIL); + +#if H5C_DO_SANITY_CHECKS + cache_ptr->slist_len_increase = saved_slist_len_increase; + cache_ptr->slist_size_increase = saved_slist_size_increase; +#endif /* H5C_DO_SANITY_CHECKS */ + } + else { + HDassert(entry_ptr->addr == new_addr); + } + } /* end if */ + + if ( serialize_flags & H5C__SERIALIZE_COMPRESSED_FLAG ) { + /* just save the new compressed entry size in + * entry_ptr->compressed_size. We don't need to + * do more, as compressed size is only used for I/O. + */ + HDassert(entry_ptr->compressed); + entry_ptr->compressed_size = new_compressed_len; + } + } /* end if ( serialize_flags != H5C__SERIALIZE_NO_FLAGS_SET ) */ + + /* Serialize object into buffer */ + { + size_t image_len; + + if ( entry_ptr->compressed ) + image_len = entry_ptr->compressed_size; + else + image_len = entry_ptr->size; + + /* reset cache_ptr->slist_changed so we can detect slist + * modifications in the serialize call. + */ + cache_ptr->slist_changed = FALSE; + + + if ( entry_ptr->type->serialize(f, entry_ptr->image_ptr, + image_len, + (void *)entry_ptr) < 0) { + HGOTO_ERROR(H5E_CACHE, H5E_CANTFLUSH, FAIL, \ + "unable to serialize entry"); + } + + /* set cache_ptr->slist_change_in_serialize if the + * slist was modified. + */ + if ( cache_ptr->slist_changed ) + cache_ptr->slist_change_in_pre_serialize = TRUE; + +#if H5C_DO_MEMORY_SANITY_CHECKS + + HDassert(0 == HDmemcmp(((uint8_t *)entry_ptr->image_ptr) + + image_len, + H5C_IMAGE_SANITY_VALUE, + H5C_IMAGE_EXTRA_SPACE)); + +#endif /* H5C_DO_MEMORY_SANITY_CHECKS */ + + entry_ptr->image_up_to_date = TRUE; + } + +done: + FUNC_LEAVE_NOAPI(ret_value) +} /* H5C__generate_image */ |