summaryrefslogtreecommitdiffstats
path: root/src/H5FDsubfiling/H5FDsubfile_int.c
blob: 68deb832adf7179119a8962ab16a39c646738833 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
 * Copyright by The HDF Group.                                               *
 * All rights reserved.                                                      *
 *                                                                           *
 * This file is part of HDF5.  The full HDF5 copyright notice, including     *
 * terms governing use, modification, and redistribution, is contained in    *
 * the COPYING file, which can be found at the root of the source code       *
 * distribution tree, or in https://www.hdfgroup.org/licenses.               *
 * If you do not have access to either file, you may request a copy from     *
 * help@hdfgroup.org.                                                        *
 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */

/*
 * Purpose:     This is part of a parallel subfiling I/O driver.
 *
 */

/***********/
/* Headers */
/***********/

#include "H5FDsubfiling_priv.h"

/*-------------------------------------------------------------------------
 * Function:    H5FD__subfiling__truncate_sub_files
 *
 *              Note: This code should be moved -- most likely to the IOC
 *                    code files.
 *
 * Purpose:     Apply a truncate operation to the subfiles.
 *
 *              In the context of the I/O concentrators, the eof must be
 *              translated into the appropriate value for each of the
 *              subfiles, and then applied to same.
 *
 *              Further, we must ensure that all prior I/O requests complete
 *              before the truncate is applied.
 *
 *              We do this as follows:
 *
 *              1) Run a barrier on entry.
 *
 *              2) Determine if this rank is a IOC.  If it is, compute
 *                 the correct EOF for this subfile, and send a truncate
 *                 request to the IOC.
 *
 *              3) On the IOC thread, allow all pending I/O requests
 *                 received prior to the truncate request to complete
 *                 before performing the truncate.
 *
 *              4) Run a barrier on exit.
 *
 *              Observe that the barrier on entry ensures that any prior
 *              I/O requests will have been queue before the truncate
 *              request is sent to the IOC.
 *
 *              Similarly, the barrier on exit ensures that no subsequent
 *              I/O request will reach the IOC before the truncate request
 *              has been queued.
 *
 * Return:      SUCCEED/FAIL
 *
 *-------------------------------------------------------------------------
 */
herr_t
H5FD__subfiling__truncate_sub_files(hid_t context_id, int64_t logical_file_eof, MPI_Comm comm)
{
    subfiling_context_t *sf_context = NULL;
    MPI_Request         *recv_reqs  = NULL;
    int64_t              msg[3]     = {0};
    int64_t             *recv_msgs  = NULL;
    int                  mpi_size;
    int                  mpi_code;
    herr_t               ret_value = SUCCEED;

    if (MPI_SUCCESS != (mpi_code = MPI_Comm_size(comm, &mpi_size)))
        H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Comm_size failed", mpi_code);

    /* Barrier on entry */
    if (mpi_size > 1)
        if (MPI_SUCCESS != (mpi_code = MPI_Barrier(comm)))
            H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code);

    if (NULL == (sf_context = (subfiling_context_t *)H5_get_subfiling_object(context_id)))
        H5_SUBFILING_GOTO_ERROR(H5E_FILE, H5E_BADVALUE, FAIL, "can't get subfile context");

    if (sf_context->topology->rank_is_ioc) {
        int64_t num_full_stripes;
        int64_t num_leftover_stripes;
        int64_t partial_stripe_len;
        int     num_subfiles_owned;

        num_full_stripes     = logical_file_eof / sf_context->sf_blocksize_per_stripe;
        partial_stripe_len   = logical_file_eof % sf_context->sf_blocksize_per_stripe;
        num_leftover_stripes = partial_stripe_len / sf_context->sf_stripe_size;

        num_subfiles_owned = sf_context->sf_num_fids;

        if (NULL == (recv_reqs = malloc((size_t)num_subfiles_owned * sizeof(*recv_reqs))))
            H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL,
                                    "can't allocate receive requests array");
        if (NULL == (recv_msgs = malloc((size_t)num_subfiles_owned * 3 * sizeof(*recv_msgs))))
            H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "can't allocate message array");

        /*
         * Post early receives for messages from the IOC main
         * thread that will signal completion of the truncate
         * operation
         */
        for (int i = 0; i < num_subfiles_owned; i++) {
            if (MPI_SUCCESS !=
                (mpi_code = MPI_Irecv(&recv_msgs[3 * i], 1, H5_subfiling_rpc_msg_type,
                                      sf_context->topology->io_concentrators[sf_context->topology->ioc_idx],
                                      TRUNC_COMPLETED, sf_context->sf_eof_comm, &recv_reqs[i])))
                H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code);
        }

        /* Compute the EOF for each subfile this IOC owns */
        for (int i = 0; i < num_subfiles_owned; i++) {
            int64_t subfile_eof = num_full_stripes * sf_context->sf_stripe_size;
            int64_t global_subfile_idx;

            global_subfile_idx =
                (i * sf_context->topology->n_io_concentrators) + sf_context->topology->ioc_idx;

            if (global_subfile_idx < num_leftover_stripes) {
                subfile_eof += sf_context->sf_stripe_size;
            }
            else if (global_subfile_idx == num_leftover_stripes) {
                subfile_eof += partial_stripe_len % sf_context->sf_stripe_size;
            }

            /* Direct the IOC to truncate this subfile to the correct EOF */
            msg[0] = subfile_eof;
            msg[1] = i;
            msg[2] = -1; /* padding -- not used in this message */

            if (MPI_SUCCESS !=
                (mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type,
                                     sf_context->topology->io_concentrators[sf_context->topology->ioc_idx],
                                     TRUNC_OP, sf_context->sf_msg_comm)))
                H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
        }

        /* Wait for truncate operations to complete */
        if (MPI_SUCCESS != (mpi_code = MPI_Waitall(num_subfiles_owned, recv_reqs, MPI_STATUSES_IGNORE)))
            H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Waitall", mpi_code);

            /* sanity check -- compute the file eof using the same mechanism used to
             * compute the subfile eof.  Assert that the computed value and the
             * actual value match.
             *
             * Do this only for debug builds -- probably delete this before release.
             *
             *                                           JRM -- 12/15/21
             */

#ifndef NDEBUG
        {
            int64_t test_file_eof = 0;

            for (int i = 0; i < sf_context->sf_num_subfiles; i++) {
                test_file_eof += num_full_stripes * sf_context->sf_stripe_size;

                if (i < num_leftover_stripes) {
                    test_file_eof += sf_context->sf_stripe_size;
                }
                else if (i == num_leftover_stripes) {
                    test_file_eof += partial_stripe_len % sf_context->sf_stripe_size;
                }
            }

            assert(test_file_eof == logical_file_eof);
        }
#endif /* NDEBUG */
    }

    /* Barrier on exit */
    if (mpi_size > 1)
        if (MPI_SUCCESS != (mpi_code = MPI_Barrier(comm)))
            H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code);

done:
    free(recv_msgs);
    free(recv_reqs);

    H5_SUBFILING_FUNC_LEAVE;
} /* H5FD__subfiling__truncate_sub_files() */

/*-------------------------------------------------------------------------
 * Function:    H5FD__subfiling__get_real_eof
 *
 *              Note: This code should be moved -- most likely to the IOC
 *                    code files.
 *
 * Purpose:     Query each subfile to get its local EOF, and then use this
 *              data to calculate the actual EOF.
 *
 *              Do this as follows:
 *
 *              1) allocate an array of int64_t of length equal to the
 *                 the number of subfiles, and initialize all fields to -1.
 *
 *              2) Send each subfile's IOC a message requesting that
 *                 subfile's EOF.
 *
 *              3) Await reply from each IOC, storing the reply in
 *                 the appropriate entry in the array allocated in 1.
 *
 *              4) After all IOCs have replied, compute the offset of
 *                 each subfile in the logical file.  Take the maximum
 *                 of these values, and report this value as the overall
 *                 EOF.
 *
 *              Note that this operation is not collective, and can return
 *              invalid data if other ranks perform writes while this
 *              operation is in progress.
 *
 *              SUBFILING NOTE:
 *              The EOF calculation for subfiling is somewhat different
 *              than for the more traditional HDF5 file implementations.
 *              This statement derives from the fact that unlike "normal"
 *              HDF5 files, subfiling introduces a multi-file representation
 *              of a single HDF5 file.  The plurality of subfiles represents
 *              a software RAID-0 based HDF5 file.  As such, each subfile
 *              contains a designated portion of the address space of the
 *              virtual HDF5 storage.  We have no notion of HDF5 datatypes,
 *              datasets, metadata, or other HDF5 structures; only BYTES.
 *
 *              The organization of the bytes within subfiles is consistent
 *              with the RAID-0 striping, i.e. there are IO Concentrators
 *              (IOCs) which correspond to a stripe-count (in Lustre) as
 *              well as a stripe_size.  The combination of these two
 *              variables determines the "address" (a combination of IOC
 *              and a file offset) of any storage operation.
 *
 *              Having a defined storage layout, the virtual file EOF
 *              calculation should be the MAXIMUM value returned by the
 *              collection of IOCs.  Every MPI rank which hosts an IOC
 *              maintains its own EOF by updating that value for each
 *              WRITE operation that completes, i.e. if a new local EOF
 *              is greater than the existing local EOF, the new EOF
 *              will replace the old.  The local EOF calculation is as
 *              follows.
 *              1. At file creation, each IOC is assigned a rank value
 *                 (0 to N-1, where N is the total number of IOCs) and
 *                 a 'sf_base_addr' = 'ioc_idx' * 'sf_stripe_size')
 *                 we also determine the 'sf_blocksize_per_stripe' which
 *                 is simply the 'sf_stripe_size' * 'n_ioc_concentrators'
 *
 *              2. For every write operation, the IOC receives a message
 *                 containing a file_offset and the data_size.
 *
 *              3. The file_offset + data_size are in turn used to
 *                 create a stripe_id:
 *                   IOC-(ioc_rank)       IOC-(ioc_rank+1)
 *                   |<- sf_base_address  |<- sf_base_address  |
 *                ID +--------------------+--------------------+
 *                 0:|<- sf_stripe_size ->|<- sf_stripe_size ->|
 *                 1:|<- sf_stripe_size ->|<- sf_stripe_size ->|
 *                   ~                    ~                    ~
 *                 N:|<- sf_stripe_size ->|<- sf_stripe_size ->|
 *                   +--------------------+--------------------+
 *
 *                The new 'stripe_id' is then used to calculate a
 *                potential new EOF:
 *                sf_eof = (stripe_id * sf_blocksize_per_stripe) + sf_base_addr
 *                         + ((file_offset + data_size) % sf_stripe_size)
 *
 *              4. If (sf_eof > current_sf_eof), then current_sf_eof = sf_eof.
 *
 * Return:      SUCCEED/FAIL
 *
 *-------------------------------------------------------------------------
 */
herr_t
H5FD__subfiling__get_real_eof(hid_t context_id, int64_t *logical_eof_ptr)
{
    subfiling_context_t *sf_context  = NULL;
    MPI_Request         *recv_reqs   = NULL;
    int64_t             *recv_msg    = NULL;
    int64_t             *sf_eofs     = NULL; /* dynamically allocated array for subfile EOFs */
    int64_t              msg[3]      = {0, 0, 0};
    int64_t              logical_eof = 0;
    int64_t              sf_logical_eof;
    int                  n_io_concentrators = 0;
    int                  num_subfiles       = 0;
    int                  mpi_code;            /* MPI return code */
    herr_t               ret_value = SUCCEED; /* Return value */

    assert(logical_eof_ptr);

    if (NULL == (sf_context = (subfiling_context_t *)H5_get_subfiling_object(context_id)))
        H5_SUBFILING_GOTO_ERROR(H5E_FILE, H5E_BADVALUE, FAIL, "can't get subfile context");

    assert(sf_context->topology);

    n_io_concentrators = sf_context->topology->n_io_concentrators;
    num_subfiles       = sf_context->sf_num_subfiles;

    assert(n_io_concentrators > 0);
    assert(num_subfiles >= n_io_concentrators);

    if (NULL == (sf_eofs = malloc((size_t)num_subfiles * sizeof(int64_t))))
        H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "can't allocate subfile EOFs array");
    if (NULL == (recv_reqs = malloc((size_t)num_subfiles * sizeof(*recv_reqs))))
        H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "can't allocate receive requests array");
    if (NULL == (recv_msg = malloc((size_t)num_subfiles * sizeof(msg))))
        H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "can't allocate message array");

    for (int i = 0; i < num_subfiles; i++) {
        sf_eofs[i]   = -1;
        recv_reqs[i] = MPI_REQUEST_NULL;
    }

    /* Post early non-blocking receives for the EOF of each subfile */
    for (int i = 0; i < num_subfiles; i++) {
        int ioc_rank = sf_context->topology->io_concentrators[i % n_io_concentrators];

        if (MPI_SUCCESS != (mpi_code = MPI_Irecv(&recv_msg[3 * i], 1, H5_subfiling_rpc_msg_type, ioc_rank,
                                                 GET_EOF_COMPLETED, sf_context->sf_eof_comm, &recv_reqs[i])))
            H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv", mpi_code);
    }

    /* Send each subfile's IOC a message requesting that subfile's EOF */

    msg[1] = -1; /* padding -- not used in this message */
    msg[2] = -1; /* padding -- not used in this message */

    for (int i = 0; i < num_subfiles; i++) {
        int ioc_rank = sf_context->topology->io_concentrators[i % n_io_concentrators];

        /* Set subfile index for receiving IOC */
        msg[0] = i / n_io_concentrators;

        if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type, ioc_rank, GET_EOF_OP,
                                                sf_context->sf_msg_comm)))
            H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send", mpi_code);
    }

    /* Wait for EOF communication to complete */
    if (MPI_SUCCESS != (mpi_code = MPI_Waitall(num_subfiles, recv_reqs, MPI_STATUSES_IGNORE)))
        H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Waitall", mpi_code);

    for (int i = 0; i < num_subfiles; i++) {
        int ioc_rank = (int)recv_msg[3 * i];

        assert(ioc_rank >= 0);
        assert(ioc_rank < n_io_concentrators);
        assert(sf_eofs[i] == -1);

        sf_eofs[i] = recv_msg[(3 * i) + 1];
    }

    /* 4) After all IOCs have replied, compute the offset of
     *    each subfile in the logical file.  Take the maximum
     *    of these values, and report this value as the overall
     *    EOF.
     */

    for (int i = 0; i < num_subfiles; i++) {

        /* compute number of complete stripes */
        sf_logical_eof = sf_eofs[i] / sf_context->sf_stripe_size;

        /* multiply by stripe size */
        sf_logical_eof *= sf_context->sf_stripe_size * num_subfiles;

        /* if the subfile doesn't end on a stripe size boundary, must add in a partial stripe */
        if (sf_eofs[i] % sf_context->sf_stripe_size > 0) {

            /* add in the size of the partial stripe up to but not including this subfile */
            sf_logical_eof += i * sf_context->sf_stripe_size;

            /* finally, add in the number of bytes in the last partial stripe depth in the subfile */
            sf_logical_eof += sf_eofs[i] % sf_context->sf_stripe_size;
        }

        if (sf_logical_eof > logical_eof) {

            logical_eof = sf_logical_eof;
        }
    }

#ifdef H5_SUBFILING_DEBUG
    H5_subfiling_log(context_id, "%s: calculated logical EOF = %" PRId64 ".", __func__, logical_eof);
#endif

    *logical_eof_ptr = logical_eof;

done:
    if (ret_value < 0) {
        for (int i = 0; i < num_subfiles; i++) {
            if (recv_reqs && (recv_reqs[i] != MPI_REQUEST_NULL)) {
                if (MPI_SUCCESS != (mpi_code = MPI_Cancel(&recv_reqs[i])))
                    H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Cancel", mpi_code);
            }
        }
    }

    free(recv_msg);
    free(recv_reqs);
    free(sf_eofs);

    H5_SUBFILING_FUNC_LEAVE;
} /* H5FD__subfiling__get_real_eof() */