diff options
Diffstat (limited to 'tools/lib/h5diff.c')
-rw-r--r-- | tools/lib/h5diff.c | 167 |
1 files changed, 123 insertions, 44 deletions
diff --git a/tools/lib/h5diff.c b/tools/lib/h5diff.c index 037a7bb..621d23d 100644 --- a/tools/lib/h5diff.c +++ b/tools/lib/h5diff.c @@ -526,9 +526,52 @@ diff_match (hid_t file1_id, /* if there are any outstanding print requests, let's handle one. */ if(busyTasks > 0) { - int incomingMessage; - /* check if any tasks freed up, and didn't need to print. */ - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &incomingMessage, &Status); + int incomingMessage; + /* check if any tasks freed up, and didn't need to print. */ + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &incomingMessage, &Status); + + if(incomingMessage) + { + workerTasks[Status.MPI_SOURCE-1] = 1; + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status); + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + busyTasks--; + } + + /* check to see if the print token was returned. */ + if(!havePrintToken) + { + /* If we don't have the token, someone is probably sending us output */ + print_incoming_data(); + + /* check incoming queue for token */ + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); + + /* incoming token implies free task. */ + if(incomingMessage) + { + workerTasks[Status.MPI_SOURCE-1] = 1; + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + busyTasks--; + havePrintToken = 1; + } + } + + /* check to see if anyone needs the print token. */ + if(havePrintToken) + { + /* check incoming queue for print token requests */ + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &incomingMessage, &Status); + if(incomingMessage) + { + MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status); + MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); + havePrintToken = 0; + } + } if(incomingMessage) { @@ -598,48 +641,66 @@ diff_match (hid_t file1_id, if(!workerFound) { - /* if they were all busy, we've got to wait for one free up before we can move on. - * if we don't have the token, some task is currently printing so we'll wait for that task to return it. */ - if(!havePrintToken) - { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - havePrintToken = 1; - nfound += nFoundbyWorker.nfound; - options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; - /* send this task the work unit. */ - MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD); - } + /* if they were all busy, we've got to wait for one free up before we can move on. + * if we don't have the token, some task is currently printing so we'll wait for that task to return it. */ + + if(!havePrintToken) + { + while(!havePrintToken) { + int incomingMessage; + print_incoming_data(); + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); + if(incomingMessage) + { + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); + havePrintToken = 1; + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + /* send this task the work unit. */ + MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD); + } + } + } /* if we do have the token, check for task to free up, or wait for a task to request it */ - else - { - /* But first print all the data in our incoming queue */ - print_incoming_data(); - - MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status); - if(Status.MPI_TAG == MPI_TAG_DONE) - { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; - MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD); - } - else if(Status.MPI_TAG == MPI_TAG_TOK_REQUEST) - { - MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status); - MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); - - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; - MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD); - } - else - { - printf("ERROR: Invalid tag (%d) received \n", Status.MPI_TAG); - MPI_Abort(MPI_COMM_WORLD, 0); - MPI_Finalize(); - } - } + else + { + /* But first print all the data in our incoming queue */ + print_incoming_data(); + MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status); + if(Status.MPI_TAG == MPI_TAG_DONE) + { + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status); + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD); + } + else if(Status.MPI_TAG == MPI_TAG_TOK_REQUEST) + { + int incomingMessage; + MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status); + MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); + + do + { + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); + + print_incoming_data(); + } + while(!incomingMessage); + + + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD); + } + else + { + printf("ERROR: Invalid tag (%d) received \n", Status.MPI_TAG); + MPI_Abort(MPI_COMM_WORLD, 0); + MPI_Finalize(); + } + } } } #endif @@ -672,7 +733,15 @@ diff_match (hid_t file1_id, MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status); if(havePrintToken) { + int incomingMessage; MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); +do +{ + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); + + print_incoming_data(); +} +while(!incomingMessage); MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); nfound += nFoundbyWorker.nfound; options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; @@ -681,6 +750,16 @@ diff_match (hid_t file1_id, else /* someone else must have it...wait for them to return it, then give it to the task that just asked for it. */ { int source = Status.MPI_SOURCE; + int incomingMessage; +do +{ + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); + + print_incoming_data(); +} +while(!incomingMessage); + + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); nfound += nFoundbyWorker.nfound; options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; |