diff options
-rw-r--r-- | tools/lib/h5diff.c | 262 |
1 files changed, 117 insertions, 145 deletions
diff --git a/tools/lib/h5diff.c b/tools/lib/h5diff.c index 621d23d..7209e14 100644 --- a/tools/lib/h5diff.c +++ b/tools/lib/h5diff.c @@ -17,6 +17,15 @@ #include "H5private.h" #include "ph5diff.h" +/* + * Debug printf macros. The prefix allows output filtering by test scripts. + */ +#define h5diffdebug(x) fprintf(stderr, "h5diff debug: " x) +#define h5diffdebug2(x1, x2) fprintf(stderr, "h5diff debug: " x1, x2) +#define h5diffdebug3(x1, x2, x3) fprintf(stderr, "h5diff debug: " x1, x2, x3) +#define h5diffdebug4(x1, x2, x3, x4) fprintf(stderr, "h5diff debug: " x1, x2, x3, x4) +#define h5diffdebug5(x1, x2, x3, x4, x5) fprintf(stderr, "h5diff debug: " x1, x2, x3, x4, x5) +#define h5diffdebug6(x1, x2, x3, x4, x5, x6) fprintf(stderr, "h5diff debug: " x1, x2, x3, x4, x5, x6) @@ -474,16 +483,16 @@ diff_match (hid_t file1_id, */ { #ifdef H5_HAVE_PARALLEL - char* workerTasks = malloc((g_nTasks-1) * sizeof(char)); - int n; - int busyTasks=0; - struct diffs_found nFoundbyWorker; - struct diff_args args; - int havePrintToken = 1; - MPI_Status Status; - - /*set all tasks as free */ - memset(workerTasks, 1, g_nTasks-1); + char* workerTasks = malloc((g_nTasks-1) * sizeof(char)); + int n; + int busyTasks=0; + struct diffs_found nFoundbyWorker; + struct diff_args args; + int havePrintToken = 1; + MPI_Status Status; + + /*set all tasks as free */ + memset(workerTasks, 1, g_nTasks-1); #endif for (i = 0; i < table->nobjs; i++) @@ -504,6 +513,7 @@ diff_match (hid_t file1_id, #ifdef H5_HAVE_PARALLEL else { +h5diffdebug("beginning of big else block\n"); /* We're in parallel mode */ /* Since the data type of diff value is hsize_t which can * be arbitary large such that there is no MPI type that @@ -515,14 +525,15 @@ diff_match (hid_t file1_id, /*Set up args to pass to worker task. */ if(strlen(table->objs[i].name) > 255) { - printf("The parallel diff only supports object names up to 255 characters\n"); - MPI_Abort(MPI_COMM_WORLD, 0); + printf("The parallel diff only supports object names up to 255 characters\n"); + MPI_Abort(MPI_COMM_WORLD, 0); } strcpy(args.name, table->objs[i].name); args.options = *options; args.type= table->objs[i].type; +h5diffdebug2("busyTasks=%d\n", busyTasks); /* if there are any outstanding print requests, let's handle one. */ if(busyTasks > 0) { @@ -530,6 +541,7 @@ diff_match (hid_t file1_id, /* check if any tasks freed up, and didn't need to print. */ MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &incomingMessage, &Status); + /* first block*/ if(incomingMessage) { workerTasks[Status.MPI_SOURCE-1] = 1; @@ -572,49 +584,6 @@ diff_match (hid_t file1_id, havePrintToken = 0; } } - - if(incomingMessage) - { - workerTasks[Status.MPI_SOURCE-1] = 1; - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - } - - /* check to see if the print token was returned. */ - if(!havePrintToken) - { - /* check incoming queue for token */ - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); - - /* incoming token implies free task. */ - if(incomingMessage) - { - workerTasks[Status.MPI_SOURCE-1] = 1; - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - havePrintToken = 1; - } - } - - /* check to see if anyone needs the print token. */ - if(havePrintToken) - { - /* check incoming queue for print token requests */ - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &incomingMessage, &Status); - if(incomingMessage) - { - MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status); - MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); - havePrintToken = 0; - } - } - - /* Print all the data in our incoming queue */ - print_incoming_data(); } /* check array of tasks to see which ones are free. @@ -623,22 +592,23 @@ diff_match (hid_t file1_id, for(n=1; (n<g_nTasks) && !workerFound; n++) { - if(workerTasks[n-1]) - { - /* send file id's and names to first free worker */ - MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, n, MPI_TAG_ARGS, MPI_COMM_WORLD); + if(workerTasks[n-1]) + { + /* send file id's and names to first free worker */ + MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, n, MPI_TAG_ARGS, MPI_COMM_WORLD); - /* increment counter for total number of prints. */ - busyTasks++; + /* increment counter for total number of prints. */ + busyTasks++; - /* mark worker as busy */ - workerTasks[n-1] = 0; - workerFound = 1; - } + /* mark worker as busy */ + workerTasks[n-1] = 0; + workerFound = 1; + } } + h5diffdebug2("workerfound is %d \n", workerFound); if(!workerFound) { /* if they were all busy, we've got to wait for one free up before we can move on. @@ -703,104 +673,106 @@ diff_match (hid_t file1_id, } } } -#endif +#endif /* H5_HAVE_PARALLEL */ } } + h5diffdebug("done with for loop\n"); #ifdef H5_HAVE_PARALLEL if(g_Parallel) { while(busyTasks > 0) /* make sure all tasks are done */ { - MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status); - if(Status.MPI_TAG == MPI_TAG_DONE) - { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - } - else if(Status.MPI_TAG == MPI_TAG_TOK_RETURN) - { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - havePrintToken = 1; - } - else if(Status.MPI_TAG == MPI_TAG_TOK_REQUEST) - { - MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status); - if(havePrintToken) - { - int incomingMessage; - MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); -do -{ - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); - - print_incoming_data(); -} -while(!incomingMessage); - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - } - else /* someone else must have it...wait for them to return it, then give it to the task that just asked for it. */ - { - int source = Status.MPI_SOURCE; - int incomingMessage; -do -{ - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); - - print_incoming_data(); -} -while(!incomingMessage); - - - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - MPI_Send(NULL, 0, MPI_BYTE, source, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); - } - } - else if(Status.MPI_TAG == MPI_TAG_TOK_RETURN) - { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - havePrintToken = 1; - } - else if(Status.MPI_TAG == MPI_TAG_PRINT_DATA) - { - char data[PRINT_DATA_MAX_SIZE+1]; - memset(data, 0, PRINT_DATA_MAX_SIZE+1); - - MPI_Recv(data, PRINT_DATA_MAX_SIZE, MPI_CHAR, Status.MPI_SOURCE, MPI_TAG_PRINT_DATA, MPI_COMM_WORLD, &Status); - - printf("%s", data); - } - else - { - printf("ph5diff-manager: ERROR!! Invalid tag (%d) received \n", Status.MPI_TAG); - MPI_Abort(MPI_COMM_WORLD, 0); - } + MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status); + if(Status.MPI_TAG == MPI_TAG_DONE) + { + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status); + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + busyTasks--; + } + else if(Status.MPI_TAG == MPI_TAG_TOK_RETURN) + { + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status); + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + busyTasks--; + havePrintToken = 1; + } + else if(Status.MPI_TAG == MPI_TAG_TOK_REQUEST) + { + MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status); + if(havePrintToken) + { + int incomingMessage; + MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); + do + { + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); + + print_incoming_data(); + } + while(!incomingMessage); + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + busyTasks--; + } + else /* someone else must have it...wait for them to return it, then give it to the task that just asked for it. */ + { + int source = Status.MPI_SOURCE; + int incomingMessage; + do + { + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); + + print_incoming_data(); + } + while(!incomingMessage); + + + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + busyTasks--; + MPI_Send(NULL, 0, MPI_BYTE, source, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); + } + } + else if(Status.MPI_TAG == MPI_TAG_TOK_RETURN) + { + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + busyTasks--; + havePrintToken = 1; + } + else if(Status.MPI_TAG == MPI_TAG_PRINT_DATA) + { + char data[PRINT_DATA_MAX_SIZE+1]; + memset(data, 0, PRINT_DATA_MAX_SIZE+1); + + MPI_Recv(data, PRINT_DATA_MAX_SIZE, MPI_CHAR, Status.MPI_SOURCE, MPI_TAG_PRINT_DATA, MPI_COMM_WORLD, &Status); + + printf("%s", data); + } + else + { + printf("ph5diff-manager: ERROR!! Invalid tag (%d) received \n", Status.MPI_TAG); + MPI_Abort(MPI_COMM_WORLD, 0); + } } for(i=1; i<g_nTasks; i++) - MPI_Send(NULL, 0, MPI_BYTE, i, MPI_TAG_END, MPI_COMM_WORLD); + MPI_Send(NULL, 0, MPI_BYTE, i, MPI_TAG_END, MPI_COMM_WORLD); /* Print any final data waiting in our queue */ print_incoming_data(); } +h5diffdebug("done with if block\n"); free(workerTasks); -#endif +#endif /* H5_HAVE_PARALLEL */ } /* free table */ trav_table_free (table); |