From 18b04fd803de5db258180ee7d4d4d128a77fb03d Mon Sep 17 00:00:00 2001 From: dgp Date: Wed, 9 Jul 2014 14:44:13 +0000 Subject: First draft of using buffer moves in place of buffer copies to create an efficient [chan copy]. --- generic/tclIO.c | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 117 insertions(+), 2 deletions(-) diff --git a/generic/tclIO.c b/generic/tclIO.c index 5cdf2c3..1938173 100644 --- a/generic/tclIO.c +++ b/generic/tclIO.c @@ -182,6 +182,7 @@ static int CloseChannelPart(Tcl_Interp *interp, Channel *chanPtr, static int CloseWrite(Tcl_Interp *interp, Channel *chanPtr); static void CommonGetsCleanup(Channel *chanPtr); static int CopyData(CopyState *csPtr, int mask); +static int MoveBytes(CopyState *csPtr); static void CopyEventProc(ClientData clientData, int mask); static void CreateScriptRecord(Tcl_Interp *interp, Channel *chanPtr, int mask, Tcl_Obj *scriptPtr); @@ -8778,6 +8779,7 @@ TclCopyChannel( int readFlags, writeFlags; CopyState *csPtr; int nonBlocking = (cmdPtr) ? CHANNEL_NONBLOCKING : 0; + int moveBytes; inStatePtr = inPtr->state; outStatePtr = outPtr->state; @@ -8829,13 +8831,27 @@ TclCopyChannel( | CHANNEL_UNBUFFERED; /* + * Very strict set of conditions where we know we can just move bytes + * from input channel to output channel with no transformation or even + * examination of the bytes themselves. + * TODO: Find ways to relax this. + */ + + moveBytes = inStatePtr->inEofChar == '\0' /* No eofChar to stop input */ + && inStatePtr->inputTranslation == TCL_TRANSLATE_LF + && outStatePtr->outputTranslation == TCL_TRANSLATE_LF + && inStatePtr->encoding == NULL + && outStatePtr->encoding == NULL + && !nonBlocking; /* First draft do only blocking case */ + + /* * Allocate a new CopyState to maintain info about the current copy in * progress. This structure will be deallocated when the copy is * completed. */ - csPtr = ckalloc(sizeof(CopyState) + inStatePtr->bufSize); - csPtr->bufSize = inStatePtr->bufSize; + csPtr = ckalloc(sizeof(CopyState) + !moveBytes * inStatePtr->bufSize); + csPtr->bufSize = !moveBytes * inStatePtr->bufSize; csPtr->readPtr = inPtr; csPtr->writePtr = outPtr; csPtr->readFlags = readFlags; @@ -8851,6 +8867,10 @@ TclCopyChannel( inStatePtr->csPtrR = csPtr; outStatePtr->csPtrW = csPtr; + if (moveBytes) { + return MoveBytes(csPtr); + } + /* * Special handling of -size 0 async transfers, so that the -command is * still called asynchronously. @@ -8886,6 +8906,101 @@ TclCopyChannel( */ static int +MoveBytes( + CopyState *csPtr) /* State of copy operation. */ +{ + ChannelState *inStatePtr = csPtr->readPtr->state; + ChannelState *outStatePtr = csPtr->writePtr->state; + ChannelBuffer *bufPtr = outStatePtr->curOutPtr; + int code = TCL_OK; + + if (bufPtr && BytesLeft(bufPtr)) { + /* If we start with unflushed bytes in the destination + * channel, flush them out of the way first. */ + + if (0 != FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0)) { + code = TCL_ERROR; + goto done; + } + } + + while (csPtr->toRead != 0) { + ChannelBuffer *bufPtr = inStatePtr->inQueueHead; + ChannelBuffer *tail = NULL; + int inBytes = 0; + + if (bufPtr == NULL || BytesLeft(bufPtr) == 0) { + /* Nothing in the input queue; Get more input. */ + + if (0 != GetInput(inStatePtr->topChanPtr)) { + code = TCL_ERROR; + break; + } + bufPtr = inStatePtr->inQueueHead; + } + + /* Count up number of bytes waiting in the input queue */ + while (bufPtr) { + inBytes += BytesLeft(bufPtr); + tail = bufPtr; + if (csPtr->toRead != -1 && csPtr->toRead < inBytes) { + /* Queue has enough bytes to complete the copy */ + break; + } + bufPtr = bufPtr->nextPtr; + } + + if (bufPtr) { + /* Split the overflowing buffer in two */ + int extra = inBytes - csPtr->toRead; + + bufPtr = AllocChannelBuffer(extra); + + tail->nextAdded -= extra; + memcpy(InsertPoint(bufPtr), InsertPoint(tail), extra); + bufPtr->nextAdded += extra; + bufPtr->nextPtr = tail->nextPtr; + tail->nextPtr = NULL; + inBytes = csPtr->toRead; + } + + /* Update the byte counts */ + if (csPtr->toRead != -1) { + csPtr->toRead -= inBytes; + } + csPtr->total += inBytes; + + /* Move buffers from input to output channels */ + if (outStatePtr->outQueueTail) { + outStatePtr->outQueueTail->nextPtr = inStatePtr->inQueueHead; + } else { + outStatePtr->outQueueHead = inStatePtr->inQueueHead; + } + outStatePtr->outQueueTail = tail; + inStatePtr->inQueueHead = bufPtr; + if (bufPtr == NULL) { + inStatePtr->inQueueTail = NULL; + } + + /* Flush destination */ + if (0 != FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0)) { + code = TCL_ERROR; + break; + } + if (GotFlag(inStatePtr, CHANNEL_EOF)) { + break; + } + } + + if (code == TCL_OK) { + Tcl_SetObjResult(csPtr->interp, Tcl_NewWideIntObj(csPtr->total)); + } + done: + StopCopy(csPtr); + return code; +} + +static int CopyData( CopyState *csPtr, /* State of copy operation. */ int mask) /* Current channel event flags. */ -- cgit v0.12 From e2d932c2e380daab4ccf6f1a2fa6a0a43e66f425 Mon Sep 17 00:00:00 2001 From: dgp Date: Wed, 23 Jul 2014 17:10:26 +0000 Subject: Extend the buffer move optimization to the "same encodings" case. --- generic/tclIO.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/generic/tclIO.c b/generic/tclIO.c index 256b411..6718788 100644 --- a/generic/tclIO.c +++ b/generic/tclIO.c @@ -8840,8 +8840,9 @@ TclCopyChannel( moveBytes = inStatePtr->inEofChar == '\0' /* No eofChar to stop input */ && inStatePtr->inputTranslation == TCL_TRANSLATE_LF && outStatePtr->outputTranslation == TCL_TRANSLATE_LF - && inStatePtr->encoding == NULL - && outStatePtr->encoding == NULL + && ((inStatePtr->encoding == NULL + && outStatePtr->encoding == NULL) + || (inStatePtr->encoding == outStatePtr->encoding)) && !nonBlocking; /* First draft do only blocking case */ /* -- cgit v0.12 From f3e831dfbc16475f7b102489a0beb702801519a4 Mon Sep 17 00:00:00 2001 From: dgp Date: Thu, 24 Jul 2014 15:02:09 +0000 Subject: Make sure MoveBytes records read error messages. --- generic/tclIO.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/generic/tclIO.c b/generic/tclIO.c index 6718788..017494e 100644 --- a/generic/tclIO.c +++ b/generic/tclIO.c @@ -8931,9 +8931,13 @@ MoveBytes( int inBytes = 0; if (bufPtr == NULL || BytesLeft(bufPtr) == 0) { - /* Nothing in the input queue; Get more input. */ + /* Nothing in the input queue; Get more input. */ if (0 != GetInput(inStatePtr->topChanPtr)) { + Tcl_SetObjResult(csPtr->interp, Tcl_ObjPrintf( + "error reading \"%s\": %s", + Tcl_GetChannelName((Tcl_Channel)csPtr->readPtr), + Tcl_PosixError(csPtr->interp))); code = TCL_ERROR; break; } -- cgit v0.12 From 8b939b74f8a4c8617410768e840ab04abbaec043 Mon Sep 17 00:00:00 2001 From: dgp Date: Thu, 24 Jul 2014 15:24:11 +0000 Subject: Make sure MoveBytes error reporting reproduces what CopyData does. Bugward compatibility! --- generic/tclIO.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/generic/tclIO.c b/generic/tclIO.c index 017494e..7e793a9 100644 --- a/generic/tclIO.c +++ b/generic/tclIO.c @@ -8920,6 +8920,10 @@ MoveBytes( * channel, flush them out of the way first. */ if (0 != FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0)) { + Tcl_SetObjResult(csPtr->interp, Tcl_ObjPrintf( + "error writing \"%s\": %s", + Tcl_GetChannelName((Tcl_Channel)csPtr->writePtr), + Tcl_PosixError(csPtr->interp))); code = TCL_ERROR; goto done; } @@ -8989,6 +8993,10 @@ MoveBytes( /* Flush destination */ if (0 != FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0)) { + Tcl_SetObjResult(csPtr->interp, Tcl_ObjPrintf( + "error writing \"%s\": %s", + Tcl_GetChannelName((Tcl_Channel)csPtr->writePtr), + Tcl_PosixError(csPtr->interp))); code = TCL_ERROR; break; } -- cgit v0.12 From de9c0d8432dc3afcfc1d0442c602f42983c134ca Mon Sep 17 00:00:00 2001 From: dgp Date: Thu, 24 Jul 2014 15:34:05 +0000 Subject: simplify moveBytes selection logic --- generic/tclIO.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/generic/tclIO.c b/generic/tclIO.c index 7e793a9..6052c43 100644 --- a/generic/tclIO.c +++ b/generic/tclIO.c @@ -8840,9 +8840,7 @@ TclCopyChannel( moveBytes = inStatePtr->inEofChar == '\0' /* No eofChar to stop input */ && inStatePtr->inputTranslation == TCL_TRANSLATE_LF && outStatePtr->outputTranslation == TCL_TRANSLATE_LF - && ((inStatePtr->encoding == NULL - && outStatePtr->encoding == NULL) - || (inStatePtr->encoding == outStatePtr->encoding)) + && inStatePtr->encoding == outStatePtr->encoding && !nonBlocking; /* First draft do only blocking case */ /* -- cgit v0.12 From e038532fb8781902982dca87003dbd6121f22e3f Mon Sep 17 00:00:00 2001 From: dgp Date: Sat, 26 Jul 2014 15:02:09 +0000 Subject: Extend the "move buffer" implementation to cover the async case. --- generic/tclIO.c | 186 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 175 insertions(+), 11 deletions(-) diff --git a/generic/tclIO.c b/generic/tclIO.c index 6052c43..724c20f 100644 --- a/generic/tclIO.c +++ b/generic/tclIO.c @@ -183,6 +183,12 @@ static int CloseWrite(Tcl_Interp *interp, Channel *chanPtr); static void CommonGetsCleanup(Channel *chanPtr); static int CopyData(CopyState *csPtr, int mask); static int MoveBytes(CopyState *csPtr); + +static void MBCallback(CopyState *csPtr, Tcl_Obj *errObj); +static void MBError(CopyState *csPtr, int mask, int errorCode); +static void MBRead(ClientData clientData, int mask); +static void MBWrite(ClientData clientData, int mask); + static void CopyEventProc(ClientData clientData, int mask); static void CreateScriptRecord(Tcl_Interp *interp, Channel *chanPtr, int mask, Tcl_Obj *scriptPtr); @@ -8831,17 +8837,15 @@ TclCopyChannel( | CHANNEL_UNBUFFERED; /* - * Very strict set of conditions where we know we can just move bytes - * from input channel to output channel with no transformation or even - * examination of the bytes themselves. - * TODO: Find ways to relax this. + * Test for conditions where we know we can just move bytes from input + * channel to output channel with no transformation or even examination + * of the bytes themselves. */ moveBytes = inStatePtr->inEofChar == '\0' /* No eofChar to stop input */ && inStatePtr->inputTranslation == TCL_TRANSLATE_LF && outStatePtr->outputTranslation == TCL_TRANSLATE_LF - && inStatePtr->encoding == outStatePtr->encoding - && !nonBlocking; /* First draft do only blocking case */ + && inStatePtr->encoding == outStatePtr->encoding; /* * Allocate a new CopyState to maintain info about the current copy in @@ -8904,6 +8908,156 @@ TclCopyChannel( *---------------------------------------------------------------------- */ +static void +MBCallback( + CopyState *csPtr, + Tcl_Obj *errObj) +{ + Tcl_Obj *cmdPtr = Tcl_DuplicateObj(csPtr->cmdPtr); + Tcl_WideInt total = csPtr->total; + Tcl_Interp *interp = csPtr->interp; + int code; + + Tcl_IncrRefCount(cmdPtr); + StopCopy(csPtr); + + /* TODO: What if cmdPtr is not a list?! */ + + Tcl_ListObjAppendElement(NULL, cmdPtr, Tcl_NewWideIntObj(total)); + if (errObj) { + Tcl_ListObjAppendElement(NULL, cmdPtr, errObj); + } + + Tcl_Preserve(interp); + code = Tcl_EvalObjEx(interp, cmdPtr, TCL_EVAL_GLOBAL); + if (code != TCL_OK) { + Tcl_BackgroundException(interp, code); + } + Tcl_Release(interp); + TclDecrRefCount(cmdPtr); +} + +static void +MBError( + CopyState *csPtr, + int mask, + int errorCode) +{ + Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; + Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr; + + Tcl_SetErrno(errorCode); + MBCallback(csPtr, Tcl_ObjPrintf( "error %sing \"%s\": %s", + (mask & TCL_READABLE) ? "read" : "writ", + Tcl_GetChannelName((mask & TCL_READABLE) ? inChan : outChan), + Tcl_PosixError(csPtr->interp))); +} + +static void +MBRead( + ClientData clientData, + int mask) +{ + CopyState *csPtr = (CopyState *) clientData; + ChannelState *inStatePtr = csPtr->readPtr->state; + + Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; + Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr; + + int code = GetInput(inStatePtr->topChanPtr); + + assert (mask & TCL_READABLE); + + if (code == 0) { + /* Successful read -- set up to write the bytes we read */ + Tcl_CreateChannelHandler(outChan, TCL_WRITABLE, MBWrite, csPtr); + + /* When at least one full buffer is present, stop reading. */ + if (IsBufferFull(inStatePtr->inQueueHead) + || !Tcl_InputBlocked(inChan)) { + Tcl_DeleteChannelHandler(inChan, MBRead, csPtr); + } + } else { + MBError(csPtr, mask, code); + } +} + +static void +MBWrite( + ClientData clientData, + int mask) +{ + CopyState *csPtr = (CopyState *) clientData; + ChannelState *inStatePtr = csPtr->readPtr->state; + ChannelState *outStatePtr = csPtr->writePtr->state; + + Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; + Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr; + + ChannelBuffer *bufPtr = inStatePtr->inQueueHead; + ChannelBuffer *tail = NULL; + int code, inBytes = 0; + + assert (mask & TCL_WRITABLE); + + Tcl_DeleteChannelHandler(inChan, MBRead, csPtr); + Tcl_DeleteChannelHandler(outChan, MBWrite, csPtr); + + /* Count up number of bytes waiting in the input queue */ + while (bufPtr) { + inBytes += BytesLeft(bufPtr); + tail = bufPtr; + if (csPtr->toRead != -1 && csPtr->toRead < inBytes) { + /* Queue has enough bytes to complete the copy */ + break; + } + bufPtr = bufPtr->nextPtr; + } + + if (bufPtr) { + /* Split the overflowing buffer in two */ + int extra = inBytes - csPtr->toRead; + + bufPtr = AllocChannelBuffer(extra); + + tail->nextAdded -= extra; + memcpy(InsertPoint(bufPtr), InsertPoint(tail), extra); + bufPtr->nextAdded += extra; + bufPtr->nextPtr = tail->nextPtr; + tail->nextPtr = NULL; + inBytes = csPtr->toRead; + } + + /* Update the byte counts */ + if (csPtr->toRead != -1) { + csPtr->toRead -= inBytes; + } + csPtr->total += inBytes; + + /* Move buffers from input to output channels */ + if (outStatePtr->outQueueTail) { + outStatePtr->outQueueTail->nextPtr = inStatePtr->inQueueHead; + } else { + outStatePtr->outQueueHead = inStatePtr->inQueueHead; + } + outStatePtr->outQueueTail = tail; + inStatePtr->inQueueHead = bufPtr; + if (bufPtr == NULL) { + inStatePtr->inQueueTail = NULL; + } + + code = FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0); + if (code == 0) { + if (csPtr->toRead == 0 || Tcl_Eof(inChan)) { + MBCallback(csPtr, NULL); + } else { + Tcl_CreateChannelHandler(inChan, TCL_READABLE, MBRead, csPtr); + } + } else { + MBError(csPtr, mask, code); + } +} + static int MoveBytes( CopyState *csPtr) /* State of copy operation. */ @@ -8927,6 +9081,12 @@ MoveBytes( } } + if (csPtr->cmdPtr) { + Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; + Tcl_CreateChannelHandler(inChan, TCL_READABLE, MBRead, csPtr); + return TCL_OK; + } + while (csPtr->toRead != 0) { ChannelBuffer *bufPtr = inStatePtr->inQueueHead; ChannelBuffer *tail = NULL; @@ -9536,12 +9696,16 @@ StopCopy( CopyState *csPtr) /* State for bg copy to stop . */ { ChannelState *inStatePtr, *outStatePtr; + Tcl_Channel inChan, outChan; + int nonBlocking; if (!csPtr) { return; } + inChan = (Tcl_Channel) csPtr->readPtr; + outChan = (Tcl_Channel) csPtr->writePtr; inStatePtr = csPtr->readPtr->state; outStatePtr = csPtr->writePtr->state; @@ -9566,12 +9730,12 @@ StopCopy( csPtr->writeFlags & (CHANNEL_LINEBUFFERED | CHANNEL_UNBUFFERED); if (csPtr->cmdPtr) { - Tcl_DeleteChannelHandler((Tcl_Channel) csPtr->readPtr, CopyEventProc, - csPtr); - if (csPtr->readPtr != csPtr->writePtr) { - Tcl_DeleteChannelHandler((Tcl_Channel) csPtr->writePtr, - CopyEventProc, csPtr); + Tcl_DeleteChannelHandler(inChan, CopyEventProc, csPtr); + if (inChan != outChan) { + Tcl_DeleteChannelHandler(outChan, CopyEventProc, csPtr); } + Tcl_DeleteChannelHandler(inChan, MBRead, csPtr); + Tcl_DeleteChannelHandler(outChan, MBWrite, csPtr); TclDecrRefCount(csPtr->cmdPtr); } inStatePtr->csPtrR = NULL; -- cgit v0.12 From 7e8a2b53273eff3984db888b49f2067fc442dd2d Mon Sep 17 00:00:00 2001 From: dgp Date: Sat, 26 Jul 2014 15:44:32 +0000 Subject: Use common MBError() routine for sync and async operations. --- generic/tclIO.c | 49 ++++++++++++++++++++++++------------------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/generic/tclIO.c b/generic/tclIO.c index 724c20f..aed2abf 100644 --- a/generic/tclIO.c +++ b/generic/tclIO.c @@ -8945,12 +8945,21 @@ MBError( { Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr; + Tcl_Obj *errObj; Tcl_SetErrno(errorCode); - MBCallback(csPtr, Tcl_ObjPrintf( "error %sing \"%s\": %s", + + errObj = Tcl_ObjPrintf( "error %sing \"%s\": %s", (mask & TCL_READABLE) ? "read" : "writ", Tcl_GetChannelName((mask & TCL_READABLE) ? inChan : outChan), - Tcl_PosixError(csPtr->interp))); + Tcl_PosixError(csPtr->interp)); + + if (csPtr->cmdPtr) { + MBCallback(csPtr, errObj); + } else { + Tcl_SetObjResult(csPtr->interp, errObj); + StopCopy(csPtr); + } } static void @@ -9065,19 +9074,16 @@ MoveBytes( ChannelState *inStatePtr = csPtr->readPtr->state; ChannelState *outStatePtr = csPtr->writePtr->state; ChannelBuffer *bufPtr = outStatePtr->curOutPtr; - int code = TCL_OK; + int errorCode, code = TCL_OK; if (bufPtr && BytesLeft(bufPtr)) { /* If we start with unflushed bytes in the destination * channel, flush them out of the way first. */ - if (0 != FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0)) { - Tcl_SetObjResult(csPtr->interp, Tcl_ObjPrintf( - "error writing \"%s\": %s", - Tcl_GetChannelName((Tcl_Channel)csPtr->writePtr), - Tcl_PosixError(csPtr->interp))); - code = TCL_ERROR; - goto done; + errorCode = FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0); + if (errorCode != 0) { + MBError(csPtr, TCL_WRITABLE, errorCode); + return TCL_ERROR; } } @@ -9095,13 +9101,10 @@ MoveBytes( if (bufPtr == NULL || BytesLeft(bufPtr) == 0) { /* Nothing in the input queue; Get more input. */ - if (0 != GetInput(inStatePtr->topChanPtr)) { - Tcl_SetObjResult(csPtr->interp, Tcl_ObjPrintf( - "error reading \"%s\": %s", - Tcl_GetChannelName((Tcl_Channel)csPtr->readPtr), - Tcl_PosixError(csPtr->interp))); - code = TCL_ERROR; - break; + errorCode = GetInput(inStatePtr->topChanPtr); + if (errorCode != 0) { + MBError(csPtr, TCL_READABLE, errorCode); + return TCL_ERROR; } bufPtr = inStatePtr->inQueueHead; } @@ -9150,13 +9153,10 @@ MoveBytes( } /* Flush destination */ - if (0 != FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0)) { - Tcl_SetObjResult(csPtr->interp, Tcl_ObjPrintf( - "error writing \"%s\": %s", - Tcl_GetChannelName((Tcl_Channel)csPtr->writePtr), - Tcl_PosixError(csPtr->interp))); - code = TCL_ERROR; - break; + errorCode = FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0); + if (errorCode != 0) { + MBError(csPtr, TCL_WRITABLE, errorCode); + return TCL_ERROR; } if (GotFlag(inStatePtr, CHANNEL_EOF)) { break; @@ -9166,7 +9166,6 @@ MoveBytes( if (code == TCL_OK) { Tcl_SetObjResult(csPtr->interp, Tcl_NewWideIntObj(csPtr->total)); } - done: StopCopy(csPtr); return code; } -- cgit v0.12 From 2568ef5dacb9a7092df97e94312fd7fe7ddf60e2 Mon Sep 17 00:00:00 2001 From: dgp Date: Sun, 27 Jul 2014 17:20:32 +0000 Subject: Rework MBWrite() so it can be used in both sync and async modes. Reduce code duplication. --- generic/tclIO.c | 109 +++++++++++++++++++++----------------------------------- 1 file changed, 41 insertions(+), 68 deletions(-) diff --git a/generic/tclIO.c b/generic/tclIO.c index aed2abf..907a070 100644 --- a/generic/tclIO.c +++ b/generic/tclIO.c @@ -187,7 +187,8 @@ static int MoveBytes(CopyState *csPtr); static void MBCallback(CopyState *csPtr, Tcl_Obj *errObj); static void MBError(CopyState *csPtr, int mask, int errorCode); static void MBRead(ClientData clientData, int mask); -static void MBWrite(ClientData clientData, int mask); +static int MBWrite(CopyState *csPtr, int mask); +static void MBEvent(ClientData clientData, int mask); static void CopyEventProc(ClientData clientData, int mask); static void CreateScriptRecord(Tcl_Interp *interp, @@ -8963,6 +8964,20 @@ MBError( } static void +MBEvent( + ClientData clientData, + int mask) +{ + CopyState *csPtr = (CopyState *) clientData; + + if (mask & TCL_WRITABLE) { + (void) MBWrite(csPtr, mask); + } else if (mask & TCL_READABLE) { + (void) MBRead(clientData, mask); + } +} + +static void MBRead( ClientData clientData, int mask) @@ -8979,7 +8994,7 @@ MBRead( if (code == 0) { /* Successful read -- set up to write the bytes we read */ - Tcl_CreateChannelHandler(outChan, TCL_WRITABLE, MBWrite, csPtr); + Tcl_CreateChannelHandler(outChan, TCL_WRITABLE, MBEvent, csPtr); /* When at least one full buffer is present, stop reading. */ if (IsBufferFull(inStatePtr->inQueueHead) @@ -8991,12 +9006,11 @@ MBRead( } } -static void +static int MBWrite( - ClientData clientData, + CopyState *csPtr, int mask) { - CopyState *csPtr = (CopyState *) clientData; ChannelState *inStatePtr = csPtr->readPtr->state; ChannelState *outStatePtr = csPtr->writePtr->state; @@ -9010,7 +9024,7 @@ MBWrite( assert (mask & TCL_WRITABLE); Tcl_DeleteChannelHandler(inChan, MBRead, csPtr); - Tcl_DeleteChannelHandler(outChan, MBWrite, csPtr); + Tcl_DeleteChannelHandler(outChan, MBEvent, csPtr); /* Count up number of bytes waiting in the input queue */ while (bufPtr) { @@ -9058,12 +9072,21 @@ MBWrite( code = FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0); if (code == 0) { if (csPtr->toRead == 0 || Tcl_Eof(inChan)) { - MBCallback(csPtr, NULL); - } else { + if (csPtr->cmdPtr) { + MBCallback(csPtr, NULL); + } else { + Tcl_SetObjResult(csPtr->interp, + Tcl_NewWideIntObj(csPtr->total)); + StopCopy(csPtr); + } + return TCL_OK; + } else if (csPtr->cmdPtr) { Tcl_CreateChannelHandler(inChan, TCL_READABLE, MBRead, csPtr); } + return TCL_CONTINUE; } else { MBError(csPtr, mask, code); + return TCL_ERROR; } } @@ -9074,7 +9097,7 @@ MoveBytes( ChannelState *inStatePtr = csPtr->readPtr->state; ChannelState *outStatePtr = csPtr->writePtr->state; ChannelBuffer *bufPtr = outStatePtr->curOutPtr; - int errorCode, code = TCL_OK; + int errorCode; if (bufPtr && BytesLeft(bufPtr)) { /* If we start with unflushed bytes in the destination @@ -9093,10 +9116,9 @@ MoveBytes( return TCL_OK; } - while (csPtr->toRead != 0) { + while (1) { ChannelBuffer *bufPtr = inStatePtr->inQueueHead; - ChannelBuffer *tail = NULL; - int inBytes = 0; + int code; if (bufPtr == NULL || BytesLeft(bufPtr) == 0) { @@ -9108,66 +9130,17 @@ MoveBytes( } bufPtr = inStatePtr->inQueueHead; } - - /* Count up number of bytes waiting in the input queue */ - while (bufPtr) { - inBytes += BytesLeft(bufPtr); - tail = bufPtr; - if (csPtr->toRead != -1 && csPtr->toRead < inBytes) { - /* Queue has enough bytes to complete the copy */ - break; - } - bufPtr = bufPtr->nextPtr; - } - - if (bufPtr) { - /* Split the overflowing buffer in two */ - int extra = inBytes - csPtr->toRead; - - bufPtr = AllocChannelBuffer(extra); - - tail->nextAdded -= extra; - memcpy(InsertPoint(bufPtr), InsertPoint(tail), extra); - bufPtr->nextAdded += extra; - bufPtr->nextPtr = tail->nextPtr; - tail->nextPtr = NULL; - inBytes = csPtr->toRead; - } - - /* Update the byte counts */ - if (csPtr->toRead != -1) { - csPtr->toRead -= inBytes; + code = MBWrite(csPtr, TCL_WRITABLE); + if (code == TCL_CONTINUE) { + continue; } - csPtr->total += inBytes; - - /* Move buffers from input to output channels */ - if (outStatePtr->outQueueTail) { - outStatePtr->outQueueTail->nextPtr = inStatePtr->inQueueHead; + if (code == TCL_OK) { + return TCL_OK; } else { - outStatePtr->outQueueHead = inStatePtr->inQueueHead; - } - outStatePtr->outQueueTail = tail; - inStatePtr->inQueueHead = bufPtr; - if (bufPtr == NULL) { - inStatePtr->inQueueTail = NULL; - } - - /* Flush destination */ - errorCode = FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0); - if (errorCode != 0) { - MBError(csPtr, TCL_WRITABLE, errorCode); return TCL_ERROR; } - if (GotFlag(inStatePtr, CHANNEL_EOF)) { - break; - } } - - if (code == TCL_OK) { - Tcl_SetObjResult(csPtr->interp, Tcl_NewWideIntObj(csPtr->total)); - } - StopCopy(csPtr); - return code; + return TCL_OK; /* Silence compiler warnings */ } static int @@ -9734,7 +9707,7 @@ StopCopy( Tcl_DeleteChannelHandler(outChan, CopyEventProc, csPtr); } Tcl_DeleteChannelHandler(inChan, MBRead, csPtr); - Tcl_DeleteChannelHandler(outChan, MBWrite, csPtr); + Tcl_DeleteChannelHandler(outChan, MBEvent, csPtr); TclDecrRefCount(csPtr->cmdPtr); } inStatePtr->csPtrR = NULL; -- cgit v0.12 From 4217a749df3219697b93f646009792e35fe398db Mon Sep 17 00:00:00 2001 From: dgp Date: Sun, 27 Jul 2014 22:53:01 +0000 Subject: Push MBWrite() differences out to callers. --- generic/tclIO.c | 49 ++++++++++++++++++++++--------------------------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/generic/tclIO.c b/generic/tclIO.c index 907a070..73e1274 100644 --- a/generic/tclIO.c +++ b/generic/tclIO.c @@ -8969,9 +8969,20 @@ MBEvent( int mask) { CopyState *csPtr = (CopyState *) clientData; + Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; + Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr; if (mask & TCL_WRITABLE) { - (void) MBWrite(csPtr, mask); + Tcl_DeleteChannelHandler(inChan, MBRead, csPtr); + Tcl_DeleteChannelHandler(outChan, MBEvent, csPtr); + switch (MBWrite(csPtr, mask)) { + case TCL_OK: + MBCallback(csPtr, NULL); + break; + case TCL_CONTINUE: + Tcl_CreateChannelHandler(inChan, TCL_READABLE, MBRead, csPtr); + break; + } } else if (mask & TCL_READABLE) { (void) MBRead(clientData, mask); } @@ -9013,19 +9024,12 @@ MBWrite( { ChannelState *inStatePtr = csPtr->readPtr->state; ChannelState *outStatePtr = csPtr->writePtr->state; - - Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; - Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr; - ChannelBuffer *bufPtr = inStatePtr->inQueueHead; ChannelBuffer *tail = NULL; int code, inBytes = 0; assert (mask & TCL_WRITABLE); - Tcl_DeleteChannelHandler(inChan, MBRead, csPtr); - Tcl_DeleteChannelHandler(outChan, MBEvent, csPtr); - /* Count up number of bytes waiting in the input queue */ while (bufPtr) { inBytes += BytesLeft(bufPtr); @@ -9070,24 +9074,14 @@ MBWrite( } code = FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0); - if (code == 0) { - if (csPtr->toRead == 0 || Tcl_Eof(inChan)) { - if (csPtr->cmdPtr) { - MBCallback(csPtr, NULL); - } else { - Tcl_SetObjResult(csPtr->interp, - Tcl_NewWideIntObj(csPtr->total)); - StopCopy(csPtr); - } - return TCL_OK; - } else if (csPtr->cmdPtr) { - Tcl_CreateChannelHandler(inChan, TCL_READABLE, MBRead, csPtr); - } - return TCL_CONTINUE; - } else { + if (code) { MBError(csPtr, mask, code); return TCL_ERROR; } + if (csPtr->toRead == 0 || GotFlag(inStatePtr, CHANNEL_EOF)) { + return TCL_OK; + } + return TCL_CONTINUE; } static int @@ -9131,14 +9125,15 @@ MoveBytes( bufPtr = inStatePtr->inQueueHead; } code = MBWrite(csPtr, TCL_WRITABLE); - if (code == TCL_CONTINUE) { - continue; - } if (code == TCL_OK) { + Tcl_SetObjResult(csPtr->interp, Tcl_NewWideIntObj(csPtr->total)); + StopCopy(csPtr); return TCL_OK; - } else { + } + if (code == TCL_ERROR) { return TCL_ERROR; } + /* code == TCL_CONTINUE --> continue the loop */ } return TCL_OK; /* Silence compiler warnings */ } -- cgit v0.12 From ab81262de8a6b9f32445810c18f8a509c692384c Mon Sep 17 00:00:00 2001 From: dgp Date: Mon, 28 Jul 2014 01:42:42 +0000 Subject: Revise MBRead() to be used in both sync and async modes. --- generic/tclIO.c | 78 +++++++++++++++++++++++++-------------------------------- 1 file changed, 34 insertions(+), 44 deletions(-) diff --git a/generic/tclIO.c b/generic/tclIO.c index 73e1274..d9d37a2 100644 --- a/generic/tclIO.c +++ b/generic/tclIO.c @@ -186,8 +186,8 @@ static int MoveBytes(CopyState *csPtr); static void MBCallback(CopyState *csPtr, Tcl_Obj *errObj); static void MBError(CopyState *csPtr, int mask, int errorCode); -static void MBRead(ClientData clientData, int mask); -static int MBWrite(CopyState *csPtr, int mask); +static int MBRead(CopyState *csPtr); +static int MBWrite(CopyState *csPtr); static void MBEvent(ClientData clientData, int mask); static void CopyEventProc(ClientData clientData, int mask); @@ -8971,56 +8971,57 @@ MBEvent( CopyState *csPtr = (CopyState *) clientData; Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr; + ChannelState *inStatePtr = csPtr->readPtr->state; if (mask & TCL_WRITABLE) { - Tcl_DeleteChannelHandler(inChan, MBRead, csPtr); + Tcl_DeleteChannelHandler(inChan, MBEvent, csPtr); Tcl_DeleteChannelHandler(outChan, MBEvent, csPtr); - switch (MBWrite(csPtr, mask)) { + switch (MBWrite(csPtr)) { case TCL_OK: MBCallback(csPtr, NULL); break; case TCL_CONTINUE: - Tcl_CreateChannelHandler(inChan, TCL_READABLE, MBRead, csPtr); + Tcl_CreateChannelHandler(inChan, TCL_READABLE, MBEvent, csPtr); break; } } else if (mask & TCL_READABLE) { - (void) MBRead(clientData, mask); + if (TCL_OK == MBRead(csPtr)) { + /* When at least one full buffer is present, stop reading. */ + if (IsBufferFull(inStatePtr->inQueueHead) + || !Tcl_InputBlocked(inChan)) { + Tcl_DeleteChannelHandler(inChan, MBEvent, csPtr); + } + + /* Successful read -- set up to write the bytes we read */ + Tcl_CreateChannelHandler(outChan, TCL_WRITABLE, MBEvent, csPtr); + } } } -static void +static int MBRead( - ClientData clientData, - int mask) + CopyState *csPtr) { - CopyState *csPtr = (CopyState *) clientData; ChannelState *inStatePtr = csPtr->readPtr->state; + ChannelBuffer *bufPtr = inStatePtr->inQueueHead; + int code; - Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; - Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr; - - int code = GetInput(inStatePtr->topChanPtr); - - assert (mask & TCL_READABLE); + if (bufPtr && BytesLeft(bufPtr) > 0) { + return TCL_OK; + } + code = GetInput(inStatePtr->topChanPtr); if (code == 0) { - /* Successful read -- set up to write the bytes we read */ - Tcl_CreateChannelHandler(outChan, TCL_WRITABLE, MBEvent, csPtr); - - /* When at least one full buffer is present, stop reading. */ - if (IsBufferFull(inStatePtr->inQueueHead) - || !Tcl_InputBlocked(inChan)) { - Tcl_DeleteChannelHandler(inChan, MBRead, csPtr); - } + return TCL_OK; } else { - MBError(csPtr, mask, code); + MBError(csPtr, TCL_READABLE, code); + return TCL_ERROR; } } static int MBWrite( - CopyState *csPtr, - int mask) + CopyState *csPtr) { ChannelState *inStatePtr = csPtr->readPtr->state; ChannelState *outStatePtr = csPtr->writePtr->state; @@ -9028,8 +9029,6 @@ MBWrite( ChannelBuffer *tail = NULL; int code, inBytes = 0; - assert (mask & TCL_WRITABLE); - /* Count up number of bytes waiting in the input queue */ while (bufPtr) { inBytes += BytesLeft(bufPtr); @@ -9075,7 +9074,7 @@ MBWrite( code = FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0); if (code) { - MBError(csPtr, mask, code); + MBError(csPtr, TCL_WRITABLE, code); return TCL_ERROR; } if (csPtr->toRead == 0 || GotFlag(inStatePtr, CHANNEL_EOF)) { @@ -9088,7 +9087,6 @@ static int MoveBytes( CopyState *csPtr) /* State of copy operation. */ { - ChannelState *inStatePtr = csPtr->readPtr->state; ChannelState *outStatePtr = csPtr->writePtr->state; ChannelBuffer *bufPtr = outStatePtr->curOutPtr; int errorCode; @@ -9106,25 +9104,17 @@ MoveBytes( if (csPtr->cmdPtr) { Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; - Tcl_CreateChannelHandler(inChan, TCL_READABLE, MBRead, csPtr); + Tcl_CreateChannelHandler(inChan, TCL_READABLE, MBEvent, csPtr); return TCL_OK; } while (1) { - ChannelBuffer *bufPtr = inStatePtr->inQueueHead; int code; - if (bufPtr == NULL || BytesLeft(bufPtr) == 0) { - - /* Nothing in the input queue; Get more input. */ - errorCode = GetInput(inStatePtr->topChanPtr); - if (errorCode != 0) { - MBError(csPtr, TCL_READABLE, errorCode); - return TCL_ERROR; - } - bufPtr = inStatePtr->inQueueHead; + if (TCL_ERROR == MBRead(csPtr)) { + return TCL_ERROR; } - code = MBWrite(csPtr, TCL_WRITABLE); + code = MBWrite(csPtr); if (code == TCL_OK) { Tcl_SetObjResult(csPtr->interp, Tcl_NewWideIntObj(csPtr->total)); StopCopy(csPtr); @@ -9701,7 +9691,7 @@ StopCopy( if (inChan != outChan) { Tcl_DeleteChannelHandler(outChan, CopyEventProc, csPtr); } - Tcl_DeleteChannelHandler(inChan, MBRead, csPtr); + Tcl_DeleteChannelHandler(inChan, MBEvent, csPtr); Tcl_DeleteChannelHandler(outChan, MBEvent, csPtr); TclDecrRefCount(csPtr->cmdPtr); } -- cgit v0.12