diff options
-rw-r--r-- | tools/h5diff/ph5diff_main.c | 1 | ||||
-rw-r--r-- | tools/lib/h5diff.c | 167 | ||||
-rw-r--r-- | tools/lib/h5diff_util.c | 19 | ||||
-rw-r--r-- | tools/lib/h5tools_utils.c | 1 | ||||
-rw-r--r-- | tools/lib/h5tools_utils.h | 3 | ||||
-rw-r--r-- | tools/lib/ph5diff.h | 9 |
6 files changed, 148 insertions, 52 deletions
diff --git a/tools/h5diff/ph5diff_main.c b/tools/h5diff/ph5diff_main.c index 94773c8..c469a4b 100644 --- a/tools/h5diff/ph5diff_main.c +++ b/tools/h5diff/ph5diff_main.c @@ -177,7 +177,6 @@ ph5diff_worker(int nID) if(outBuffOffset>0) { MPI_Send(NULL, 0, MPI_BYTE, 0, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD); - /*Wait for print token. */ MPI_Recv(NULL, 0, MPI_BYTE, 0, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD, &Status); diff --git a/tools/lib/h5diff.c b/tools/lib/h5diff.c index 037a7bb..621d23d 100644 --- a/tools/lib/h5diff.c +++ b/tools/lib/h5diff.c @@ -526,9 +526,52 @@ diff_match (hid_t file1_id, /* if there are any outstanding print requests, let's handle one. */ if(busyTasks > 0) { - int incomingMessage; - /* check if any tasks freed up, and didn't need to print. */ - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &incomingMessage, &Status); + int incomingMessage; + /* check if any tasks freed up, and didn't need to print. */ + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &incomingMessage, &Status); + + if(incomingMessage) + { + workerTasks[Status.MPI_SOURCE-1] = 1; + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status); + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + busyTasks--; + } + + /* check to see if the print token was returned. */ + if(!havePrintToken) + { + /* If we don't have the token, someone is probably sending us output */ + print_incoming_data(); + + /* check incoming queue for token */ + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); + + /* incoming token implies free task. */ + if(incomingMessage) + { + workerTasks[Status.MPI_SOURCE-1] = 1; + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + busyTasks--; + havePrintToken = 1; + } + } + + /* check to see if anyone needs the print token. */ + if(havePrintToken) + { + /* check incoming queue for print token requests */ + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &incomingMessage, &Status); + if(incomingMessage) + { + MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status); + MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); + havePrintToken = 0; + } + } if(incomingMessage) { @@ -598,48 +641,66 @@ diff_match (hid_t file1_id, if(!workerFound) { - /* if they were all busy, we've got to wait for one free up before we can move on. - * if we don't have the token, some task is currently printing so we'll wait for that task to return it. */ - if(!havePrintToken) - { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - havePrintToken = 1; - nfound += nFoundbyWorker.nfound; - options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; - /* send this task the work unit. */ - MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD); - } + /* if they were all busy, we've got to wait for one free up before we can move on. + * if we don't have the token, some task is currently printing so we'll wait for that task to return it. */ + + if(!havePrintToken) + { + while(!havePrintToken) { + int incomingMessage; + print_incoming_data(); + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); + if(incomingMessage) + { + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); + havePrintToken = 1; + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + /* send this task the work unit. */ + MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD); + } + } + } /* if we do have the token, check for task to free up, or wait for a task to request it */ - else - { - /* But first print all the data in our incoming queue */ - print_incoming_data(); - - MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status); - if(Status.MPI_TAG == MPI_TAG_DONE) - { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; - MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD); - } - else if(Status.MPI_TAG == MPI_TAG_TOK_REQUEST) - { - MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status); - MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); - - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; - MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD); - } - else - { - printf("ERROR: Invalid tag (%d) received \n", Status.MPI_TAG); - MPI_Abort(MPI_COMM_WORLD, 0); - MPI_Finalize(); - } - } + else + { + /* But first print all the data in our incoming queue */ + print_incoming_data(); + MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status); + if(Status.MPI_TAG == MPI_TAG_DONE) + { + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status); + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD); + } + else if(Status.MPI_TAG == MPI_TAG_TOK_REQUEST) + { + int incomingMessage; + MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status); + MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); + + do + { + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); + + print_incoming_data(); + } + while(!incomingMessage); + + + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); + nfound += nFoundbyWorker.nfound; + options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; + MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD); + } + else + { + printf("ERROR: Invalid tag (%d) received \n", Status.MPI_TAG); + MPI_Abort(MPI_COMM_WORLD, 0); + MPI_Finalize(); + } + } } } #endif @@ -672,7 +733,15 @@ diff_match (hid_t file1_id, MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &Status); if(havePrintToken) { + int incomingMessage; MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); +do +{ + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); + + print_incoming_data(); +} +while(!incomingMessage); MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); nfound += nFoundbyWorker.nfound; options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; @@ -681,6 +750,16 @@ diff_match (hid_t file1_id, else /* someone else must have it...wait for them to return it, then give it to the task that just asked for it. */ { int source = Status.MPI_SOURCE; + int incomingMessage; +do +{ + MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status); + + print_incoming_data(); +} +while(!incomingMessage); + + MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); nfound += nFoundbyWorker.nfound; options->not_cmp = options->not_cmp | nFoundbyWorker.not_cmp; diff --git a/tools/lib/h5diff_util.c b/tools/lib/h5diff_util.c index 5370c1a..1fc710c 100644 --- a/tools/lib/h5diff_util.c +++ b/tools/lib/h5diff_util.c @@ -20,7 +20,7 @@ int g_nTasks = 1; unsigned char g_Parallel = 0; /*0 for serial, 1 for parallel */ char outBuff[OUTBUFF_SIZE]; -unsigned int outBuffOffset; +int outBuffOffset; FILE* overflow_file = NULL; /*------------------------------------------------------------------------- @@ -48,16 +48,27 @@ void parallel_print(const char* format, ...) if(overflow_file == NULL) /*no overflow has occurred yet */ { +#if 0 +printf("calling HDvsnprintf: OUTBUFF_SIZE=%ld, outBuffOffset=%ld, ", (long)OUTBUFF_SIZE, (long)outBuffOffset); +#endif bytes_written = HDvsnprintf(outBuff+outBuffOffset, OUTBUFF_SIZE-outBuffOffset, format, ap); - +#if 0 +printf("bytes_written=%ld\n", (long)bytes_written); +#endif va_end(ap); va_start(ap, format); +#if 0 +printf("Result: bytes_written=%ld, OUTBUFF_SIZE-outBuffOffset=%ld\n", (long)bytes_written, (long)OUTBUFF_SIZE-outBuffOffset); +#endif + + if ((bytes_written < 0) || #ifdef H5_VSNPRINTF_WORKS - if(bytes_written >= (OUTBUFF_SIZE-outBuffOffset)) + (bytes_written >= (OUTBUFF_SIZE-outBuffOffset)) #else - if((bytes_written+1) == (OUTBUFF_SIZE-outBuffOffset)) + ((bytes_written+1) == (OUTBUFF_SIZE-outBuffOffset)) #endif + ) { /* Terminate the outbuff at the end of the previous output */ outBuff[outBuffOffset] = '\0'; diff --git a/tools/lib/h5tools_utils.c b/tools/lib/h5tools_utils.c index e6211da..ad57f21 100644 --- a/tools/lib/h5tools_utils.c +++ b/tools/lib/h5tools_utils.c @@ -683,7 +683,6 @@ add_obj(table_t *table, haddr_t objno, char *objname) * *------------------------------------------------------------------------- */ -static FILE * tmpfile(void) { diff --git a/tools/lib/h5tools_utils.h b/tools/lib/h5tools_utils.h index a210c0f..6398e10 100644 --- a/tools/lib/h5tools_utils.h +++ b/tools/lib/h5tools_utils.h @@ -121,5 +121,8 @@ extern herr_t find_objs(hid_t group, const char *name, void *op_data); extern int search_obj(table_t *temp, haddr_t objno); extern void init_table(table_t **tbl); extern void init_prefix(char **temp, size_t); +#ifndef H5_HAVE_TMPFILE +extern FILE * tmpfile(void); +#endif #endif /* H5TOOLS_UTILS_H__ */ diff --git a/tools/lib/ph5diff.h b/tools/lib/ph5diff.h index 3fcc553..e5df82a 100644 --- a/tools/lib/ph5diff.h +++ b/tools/lib/ph5diff.h @@ -15,8 +15,13 @@ #ifndef _PH5DIFF_H__ #define _PH5DIFF_H__ +/* use a larger output buffer for Tflops which does not support vsnprintf. */ +#ifdef __PUMAGON__ #define PRINT_DATA_MAX_SIZE 512 -#define OUTBUFF_SIZE PRINT_DATA_MAX_SIZE*4 +#else +#define PRINT_DATA_MAX_SIZE 512 +#endif +#define OUTBUFF_SIZE (PRINT_DATA_MAX_SIZE*4) /* Send from manager to workers */ #define MPI_TAG_ARGS 1 #define MPI_TAG_PRINT_TOK 2 @@ -34,7 +39,7 @@ extern int g_nTasks; extern unsigned char g_Parallel; extern char outBuff[]; -extern unsigned int outBuffOffset; +extern int outBuffOffset; extern FILE* overflow_file; struct diff_args |