From e038532fb8781902982dca87003dbd6121f22e3f Mon Sep 17 00:00:00 2001 From: dgp Date: Sat, 26 Jul 2014 15:02:09 +0000 Subject: Extend the "move buffer" implementation to cover the async case. --- generic/tclIO.c | 186 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 175 insertions(+), 11 deletions(-) diff --git a/generic/tclIO.c b/generic/tclIO.c index 6052c43..724c20f 100644 --- a/generic/tclIO.c +++ b/generic/tclIO.c @@ -183,6 +183,12 @@ static int CloseWrite(Tcl_Interp *interp, Channel *chanPtr); static void CommonGetsCleanup(Channel *chanPtr); static int CopyData(CopyState *csPtr, int mask); static int MoveBytes(CopyState *csPtr); + +static void MBCallback(CopyState *csPtr, Tcl_Obj *errObj); +static void MBError(CopyState *csPtr, int mask, int errorCode); +static void MBRead(ClientData clientData, int mask); +static void MBWrite(ClientData clientData, int mask); + static void CopyEventProc(ClientData clientData, int mask); static void CreateScriptRecord(Tcl_Interp *interp, Channel *chanPtr, int mask, Tcl_Obj *scriptPtr); @@ -8831,17 +8837,15 @@ TclCopyChannel( | CHANNEL_UNBUFFERED; /* - * Very strict set of conditions where we know we can just move bytes - * from input channel to output channel with no transformation or even - * examination of the bytes themselves. - * TODO: Find ways to relax this. + * Test for conditions where we know we can just move bytes from input + * channel to output channel with no transformation or even examination + * of the bytes themselves. */ moveBytes = inStatePtr->inEofChar == '\0' /* No eofChar to stop input */ && inStatePtr->inputTranslation == TCL_TRANSLATE_LF && outStatePtr->outputTranslation == TCL_TRANSLATE_LF - && inStatePtr->encoding == outStatePtr->encoding - && !nonBlocking; /* First draft do only blocking case */ + && inStatePtr->encoding == outStatePtr->encoding; /* * Allocate a new CopyState to maintain info about the current copy in @@ -8904,6 +8908,156 @@ TclCopyChannel( *---------------------------------------------------------------------- */ +static void +MBCallback( + CopyState *csPtr, + Tcl_Obj *errObj) +{ + Tcl_Obj *cmdPtr = Tcl_DuplicateObj(csPtr->cmdPtr); + Tcl_WideInt total = csPtr->total; + Tcl_Interp *interp = csPtr->interp; + int code; + + Tcl_IncrRefCount(cmdPtr); + StopCopy(csPtr); + + /* TODO: What if cmdPtr is not a list?! */ + + Tcl_ListObjAppendElement(NULL, cmdPtr, Tcl_NewWideIntObj(total)); + if (errObj) { + Tcl_ListObjAppendElement(NULL, cmdPtr, errObj); + } + + Tcl_Preserve(interp); + code = Tcl_EvalObjEx(interp, cmdPtr, TCL_EVAL_GLOBAL); + if (code != TCL_OK) { + Tcl_BackgroundException(interp, code); + } + Tcl_Release(interp); + TclDecrRefCount(cmdPtr); +} + +static void +MBError( + CopyState *csPtr, + int mask, + int errorCode) +{ + Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; + Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr; + + Tcl_SetErrno(errorCode); + MBCallback(csPtr, Tcl_ObjPrintf( "error %sing \"%s\": %s", + (mask & TCL_READABLE) ? "read" : "writ", + Tcl_GetChannelName((mask & TCL_READABLE) ? inChan : outChan), + Tcl_PosixError(csPtr->interp))); +} + +static void +MBRead( + ClientData clientData, + int mask) +{ + CopyState *csPtr = (CopyState *) clientData; + ChannelState *inStatePtr = csPtr->readPtr->state; + + Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; + Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr; + + int code = GetInput(inStatePtr->topChanPtr); + + assert (mask & TCL_READABLE); + + if (code == 0) { + /* Successful read -- set up to write the bytes we read */ + Tcl_CreateChannelHandler(outChan, TCL_WRITABLE, MBWrite, csPtr); + + /* When at least one full buffer is present, stop reading. */ + if (IsBufferFull(inStatePtr->inQueueHead) + || !Tcl_InputBlocked(inChan)) { + Tcl_DeleteChannelHandler(inChan, MBRead, csPtr); + } + } else { + MBError(csPtr, mask, code); + } +} + +static void +MBWrite( + ClientData clientData, + int mask) +{ + CopyState *csPtr = (CopyState *) clientData; + ChannelState *inStatePtr = csPtr->readPtr->state; + ChannelState *outStatePtr = csPtr->writePtr->state; + + Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; + Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr; + + ChannelBuffer *bufPtr = inStatePtr->inQueueHead; + ChannelBuffer *tail = NULL; + int code, inBytes = 0; + + assert (mask & TCL_WRITABLE); + + Tcl_DeleteChannelHandler(inChan, MBRead, csPtr); + Tcl_DeleteChannelHandler(outChan, MBWrite, csPtr); + + /* Count up number of bytes waiting in the input queue */ + while (bufPtr) { + inBytes += BytesLeft(bufPtr); + tail = bufPtr; + if (csPtr->toRead != -1 && csPtr->toRead < inBytes) { + /* Queue has enough bytes to complete the copy */ + break; + } + bufPtr = bufPtr->nextPtr; + } + + if (bufPtr) { + /* Split the overflowing buffer in two */ + int extra = inBytes - csPtr->toRead; + + bufPtr = AllocChannelBuffer(extra); + + tail->nextAdded -= extra; + memcpy(InsertPoint(bufPtr), InsertPoint(tail), extra); + bufPtr->nextAdded += extra; + bufPtr->nextPtr = tail->nextPtr; + tail->nextPtr = NULL; + inBytes = csPtr->toRead; + } + + /* Update the byte counts */ + if (csPtr->toRead != -1) { + csPtr->toRead -= inBytes; + } + csPtr->total += inBytes; + + /* Move buffers from input to output channels */ + if (outStatePtr->outQueueTail) { + outStatePtr->outQueueTail->nextPtr = inStatePtr->inQueueHead; + } else { + outStatePtr->outQueueHead = inStatePtr->inQueueHead; + } + outStatePtr->outQueueTail = tail; + inStatePtr->inQueueHead = bufPtr; + if (bufPtr == NULL) { + inStatePtr->inQueueTail = NULL; + } + + code = FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0); + if (code == 0) { + if (csPtr->toRead == 0 || Tcl_Eof(inChan)) { + MBCallback(csPtr, NULL); + } else { + Tcl_CreateChannelHandler(inChan, TCL_READABLE, MBRead, csPtr); + } + } else { + MBError(csPtr, mask, code); + } +} + static int MoveBytes( CopyState *csPtr) /* State of copy operation. */ @@ -8927,6 +9081,12 @@ MoveBytes( } } + if (csPtr->cmdPtr) { + Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr; + Tcl_CreateChannelHandler(inChan, TCL_READABLE, MBRead, csPtr); + return TCL_OK; + } + while (csPtr->toRead != 0) { ChannelBuffer *bufPtr = inStatePtr->inQueueHead; ChannelBuffer *tail = NULL; @@ -9536,12 +9696,16 @@ StopCopy( CopyState *csPtr) /* State for bg copy to stop . */ { ChannelState *inStatePtr, *outStatePtr; + Tcl_Channel inChan, outChan; + int nonBlocking; if (!csPtr) { return; } + inChan = (Tcl_Channel) csPtr->readPtr; + outChan = (Tcl_Channel) csPtr->writePtr; inStatePtr = csPtr->readPtr->state; outStatePtr = csPtr->writePtr->state; @@ -9566,12 +9730,12 @@ StopCopy( csPtr->writeFlags & (CHANNEL_LINEBUFFERED | CHANNEL_UNBUFFERED); if (csPtr->cmdPtr) { - Tcl_DeleteChannelHandler((Tcl_Channel) csPtr->readPtr, CopyEventProc, - csPtr); - if (csPtr->readPtr != csPtr->writePtr) { - Tcl_DeleteChannelHandler((Tcl_Channel) csPtr->writePtr, - CopyEventProc, csPtr); + Tcl_DeleteChannelHandler(inChan, CopyEventProc, csPtr); + if (inChan != outChan) { + Tcl_DeleteChannelHandler(outChan, CopyEventProc, csPtr); } + Tcl_DeleteChannelHandler(inChan, MBRead, csPtr); + Tcl_DeleteChannelHandler(outChan, MBWrite, csPtr); TclDecrRefCount(csPtr->cmdPtr); } inStatePtr->csPtrR = NULL; -- cgit v0.12