summaryrefslogtreecommitdiffstats
path: root/tools/lib/h5diff.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/lib/h5diff.c')
-rw-r--r--tools/lib/h5diff.c167
1 files changed, 123 insertions, 44 deletions
diff --git a/tools/lib/h5diff.c b/tools/lib/h5diff.c
index 037a7bb..621d23d 100644
--- a/tools/lib/h5diff.c
+++ b/tools/lib/h5diff.c
@@ -526,9 +526,52 @@ diff_match (hid_t file1_id,
/* if there are any outstanding print requests, let's handle one. */
if(busyTasks > 0)
{
- int incomingMessage;
- /* check if any tasks freed up, and didn't need to print. */
- MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &incomingMessage, &Status);
+ int incomingMessage;
+ /* check if any tasks freed up, and didn't need to print. */
+ MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &incomingMessage, &Status);
+
+ if(incomingMessage)
+ {
+ workerTasks[Status.MPI_SOURCE-1] = 1;
+ MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status);
+ nfound += nFoundbyWorker.nfound;
+ options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp;
+ busyTasks--;
+ }
+
+ /* check to see if the print token was returned. */
+ if(!havePrintToken)
+ {
+ /* If we don't have the token, someone is probably sending us output */
+ print_incoming_data();
+
+ /* check incoming queue for token */
+ MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status);
+
+ /* incoming token implies free task. */
+ if(incomingMessage)
+ {
+ workerTasks[Status.MPI_SOURCE-1] = 1;
+ MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
+ nfound += nFoundbyWorker.nfound;
+ options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp;
+ busyTasks--;
+ havePrintToken = 1;
+ }
+ }
+
+ /* check to see if anyone needs the print token. */
+ if(havePrintToken)
+ {
+ /* check incoming queue for print token requests */
+ MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &incomingMessage, &Status);
+ if(incomingMessage)
+ {
+ MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status);
+ MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD);
+ havePrintToken = 0;
+ }
+ }
if(incomingMessage)
{
@@ -598,48 +641,66 @@ diff_match (hid_t file1_id,
if(!workerFound)
{
- /* if they were all busy, we've got to wait for one free up before we can move on.
- * if we don't have the token, some task is currently printing so we'll wait for that task to return it. */
- if(!havePrintToken)
- {
- MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
- havePrintToken = 1;
- nfound += nFoundbyWorker.nfound;
- options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp;
- /* send this task the work unit. */
- MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD);
- }
+ /* if they were all busy, we've got to wait for one free up before we can move on.
+ * if we don't have the token, some task is currently printing so we'll wait for that task to return it. */
+
+ if(!havePrintToken)
+ {
+ while(!havePrintToken) {
+ int incomingMessage;
+ print_incoming_data();
+ MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status);
+ if(incomingMessage)
+ {
+ MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
+ havePrintToken = 1;
+ nfound += nFoundbyWorker.nfound;
+ options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp;
+ /* send this task the work unit. */
+ MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD);
+ }
+ }
+ }
/* if we do have the token, check for task to free up, or wait for a task to request it */
- else
- {
- /* But first print all the data in our incoming queue */
- print_incoming_data();
-
- MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status);
- if(Status.MPI_TAG == MPI_TAG_DONE)
- {
- MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status);
- nfound += nFoundbyWorker.nfound;
- options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp;
- MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD);
- }
- else if(Status.MPI_TAG == MPI_TAG_TOK_REQUEST)
- {
- MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status);
- MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD);
-
- MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
- nfound += nFoundbyWorker.nfound;
- options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp;
- MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD);
- }
- else
- {
- printf("ERROR: Invalid tag (%d) received \n", Status.MPI_TAG);
- MPI_Abort(MPI_COMM_WORLD, 0);
- MPI_Finalize();
- }
- }
+ else
+ {
+ /* But first print all the data in our incoming queue */
+ print_incoming_data();
+ MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status);
+ if(Status.MPI_TAG == MPI_TAG_DONE)
+ {
+ MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status);
+ nfound += nFoundbyWorker.nfound;
+ options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp;
+ MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD);
+ }
+ else if(Status.MPI_TAG == MPI_TAG_TOK_REQUEST)
+ {
+ int incomingMessage;
+ MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status);
+ MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD);
+
+ do
+ {
+ MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status);
+
+ print_incoming_data();
+ }
+ while(!incomingMessage);
+
+
+ MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
+ nfound += nFoundbyWorker.nfound;
+ options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp;
+ MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD);
+ }
+ else
+ {
+ printf("ERROR: Invalid tag (%d) received \n", Status.MPI_TAG);
+ MPI_Abort(MPI_COMM_WORLD, 0);
+ MPI_Finalize();
+ }
+ }
}
}
#endif
@@ -672,7 +733,15 @@ diff_match (hid_t file1_id,
MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status);
if(havePrintToken)
{
+ int incomingMessage;
MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD);
+do
+{
+ MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status);
+
+ print_incoming_data();
+}
+while(!incomingMessage);
MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
nfound += nFoundbyWorker.nfound;
options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp;
@@ -681,6 +750,16 @@ diff_match (hid_t file1_id,
else /* someone else must have it...wait for them to return it, then give it to the task that just asked for it. */
{
int source = Status.MPI_SOURCE;
+ int incomingMessage;
+do
+{
+ MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status);
+
+ print_incoming_data();
+}
+while(!incomingMessage);
+
+
MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
nfound += nFoundbyWorker.nfound;
options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp;