summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordgp <dgp@users.sourceforge.net>2014-07-26 15:02:09 (GMT)
committerdgp <dgp@users.sourceforge.net>2014-07-26 15:02:09 (GMT)
commite038532fb8781902982dca87003dbd6121f22e3f (patch)
treeb1d5b32a063d57dd2590c4a1e56e48ac1087f9f6
parent938780a181842a5f7dfcd17eec986d342256cdb4 (diff)
downloadtcl-e038532fb8781902982dca87003dbd6121f22e3f.zip
tcl-e038532fb8781902982dca87003dbd6121f22e3f.tar.gz
tcl-e038532fb8781902982dca87003dbd6121f22e3f.tar.bz2
Extend the "move buffer" implementation to cover the async case.
-rw-r--r--generic/tclIO.c186
1 files changed, 175 insertions, 11 deletions
diff --git a/generic/tclIO.c b/generic/tclIO.c
index 6052c43..724c20f 100644
--- a/generic/tclIO.c
+++ b/generic/tclIO.c
@@ -183,6 +183,12 @@ static int CloseWrite(Tcl_Interp *interp, Channel *chanPtr);
static void CommonGetsCleanup(Channel *chanPtr);
static int CopyData(CopyState *csPtr, int mask);
static int MoveBytes(CopyState *csPtr);
+
+static void MBCallback(CopyState *csPtr, Tcl_Obj *errObj);
+static void MBError(CopyState *csPtr, int mask, int errorCode);
+static void MBRead(ClientData clientData, int mask);
+static void MBWrite(ClientData clientData, int mask);
+
static void CopyEventProc(ClientData clientData, int mask);
static void CreateScriptRecord(Tcl_Interp *interp,
Channel *chanPtr, int mask, Tcl_Obj *scriptPtr);
@@ -8831,17 +8837,15 @@ TclCopyChannel(
| CHANNEL_UNBUFFERED;
/*
- * Very strict set of conditions where we know we can just move bytes
- * from input channel to output channel with no transformation or even
- * examination of the bytes themselves.
- * TODO: Find ways to relax this.
+ * Test for conditions where we know we can just move bytes from input
+ * channel to output channel with no transformation or even examination
+ * of the bytes themselves.
*/
moveBytes = inStatePtr->inEofChar == '\0' /* No eofChar to stop input */
&& inStatePtr->inputTranslation == TCL_TRANSLATE_LF
&& outStatePtr->outputTranslation == TCL_TRANSLATE_LF
- && inStatePtr->encoding == outStatePtr->encoding
- && !nonBlocking; /* First draft do only blocking case */
+ && inStatePtr->encoding == outStatePtr->encoding;
/*
* Allocate a new CopyState to maintain info about the current copy in
@@ -8904,6 +8908,156 @@ TclCopyChannel(
*----------------------------------------------------------------------
*/
+static void
+MBCallback(
+ CopyState *csPtr,
+ Tcl_Obj *errObj)
+{
+ Tcl_Obj *cmdPtr = Tcl_DuplicateObj(csPtr->cmdPtr);
+ Tcl_WideInt total = csPtr->total;
+ Tcl_Interp *interp = csPtr->interp;
+ int code;
+
+ Tcl_IncrRefCount(cmdPtr);
+ StopCopy(csPtr);
+
+ /* TODO: What if cmdPtr is not a list?! */
+
+ Tcl_ListObjAppendElement(NULL, cmdPtr, Tcl_NewWideIntObj(total));
+ if (errObj) {
+ Tcl_ListObjAppendElement(NULL, cmdPtr, errObj);
+ }
+
+ Tcl_Preserve(interp);
+ code = Tcl_EvalObjEx(interp, cmdPtr, TCL_EVAL_GLOBAL);
+ if (code != TCL_OK) {
+ Tcl_BackgroundException(interp, code);
+ }
+ Tcl_Release(interp);
+ TclDecrRefCount(cmdPtr);
+}
+
+static void
+MBError(
+ CopyState *csPtr,
+ int mask,
+ int errorCode)
+{
+ Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr;
+ Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr;
+
+ Tcl_SetErrno(errorCode);
+ MBCallback(csPtr, Tcl_ObjPrintf( "error %sing \"%s\": %s",
+ (mask & TCL_READABLE) ? "read" : "writ",
+ Tcl_GetChannelName((mask & TCL_READABLE) ? inChan : outChan),
+ Tcl_PosixError(csPtr->interp)));
+}
+
+static void
+MBRead(
+ ClientData clientData,
+ int mask)
+{
+ CopyState *csPtr = (CopyState *) clientData;
+ ChannelState *inStatePtr = csPtr->readPtr->state;
+
+ Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr;
+ Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr;
+
+ int code = GetInput(inStatePtr->topChanPtr);
+
+ assert (mask & TCL_READABLE);
+
+ if (code == 0) {
+ /* Successful read -- set up to write the bytes we read */
+ Tcl_CreateChannelHandler(outChan, TCL_WRITABLE, MBWrite, csPtr);
+
+ /* When at least one full buffer is present, stop reading. */
+ if (IsBufferFull(inStatePtr->inQueueHead)
+ || !Tcl_InputBlocked(inChan)) {
+ Tcl_DeleteChannelHandler(inChan, MBRead, csPtr);
+ }
+ } else {
+ MBError(csPtr, mask, code);
+ }
+}
+
+static void
+MBWrite(
+ ClientData clientData,
+ int mask)
+{
+ CopyState *csPtr = (CopyState *) clientData;
+ ChannelState *inStatePtr = csPtr->readPtr->state;
+ ChannelState *outStatePtr = csPtr->writePtr->state;
+
+ Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr;
+ Tcl_Channel outChan = (Tcl_Channel) csPtr->writePtr;
+
+ ChannelBuffer *bufPtr = inStatePtr->inQueueHead;
+ ChannelBuffer *tail = NULL;
+ int code, inBytes = 0;
+
+ assert (mask & TCL_WRITABLE);
+
+ Tcl_DeleteChannelHandler(inChan, MBRead, csPtr);
+ Tcl_DeleteChannelHandler(outChan, MBWrite, csPtr);
+
+ /* Count up number of bytes waiting in the input queue */
+ while (bufPtr) {
+ inBytes += BytesLeft(bufPtr);
+ tail = bufPtr;
+ if (csPtr->toRead != -1 && csPtr->toRead < inBytes) {
+ /* Queue has enough bytes to complete the copy */
+ break;
+ }
+ bufPtr = bufPtr->nextPtr;
+ }
+
+ if (bufPtr) {
+ /* Split the overflowing buffer in two */
+ int extra = inBytes - csPtr->toRead;
+
+ bufPtr = AllocChannelBuffer(extra);
+
+ tail->nextAdded -= extra;
+ memcpy(InsertPoint(bufPtr), InsertPoint(tail), extra);
+ bufPtr->nextAdded += extra;
+ bufPtr->nextPtr = tail->nextPtr;
+ tail->nextPtr = NULL;
+ inBytes = csPtr->toRead;
+ }
+
+ /* Update the byte counts */
+ if (csPtr->toRead != -1) {
+ csPtr->toRead -= inBytes;
+ }
+ csPtr->total += inBytes;
+
+ /* Move buffers from input to output channels */
+ if (outStatePtr->outQueueTail) {
+ outStatePtr->outQueueTail->nextPtr = inStatePtr->inQueueHead;
+ } else {
+ outStatePtr->outQueueHead = inStatePtr->inQueueHead;
+ }
+ outStatePtr->outQueueTail = tail;
+ inStatePtr->inQueueHead = bufPtr;
+ if (bufPtr == NULL) {
+ inStatePtr->inQueueTail = NULL;
+ }
+
+ code = FlushChannel(csPtr->interp, outStatePtr->topChanPtr, 0);
+ if (code == 0) {
+ if (csPtr->toRead == 0 || Tcl_Eof(inChan)) {
+ MBCallback(csPtr, NULL);
+ } else {
+ Tcl_CreateChannelHandler(inChan, TCL_READABLE, MBRead, csPtr);
+ }
+ } else {
+ MBError(csPtr, mask, code);
+ }
+}
+
static int
MoveBytes(
CopyState *csPtr) /* State of copy operation. */
@@ -8927,6 +9081,12 @@ MoveBytes(
}
}
+ if (csPtr->cmdPtr) {
+ Tcl_Channel inChan = (Tcl_Channel) csPtr->readPtr;
+ Tcl_CreateChannelHandler(inChan, TCL_READABLE, MBRead, csPtr);
+ return TCL_OK;
+ }
+
while (csPtr->toRead != 0) {
ChannelBuffer *bufPtr = inStatePtr->inQueueHead;
ChannelBuffer *tail = NULL;
@@ -9536,12 +9696,16 @@ StopCopy(
CopyState *csPtr) /* State for bg copy to stop . */
{
ChannelState *inStatePtr, *outStatePtr;
+ Tcl_Channel inChan, outChan;
+
int nonBlocking;
if (!csPtr) {
return;
}
+ inChan = (Tcl_Channel) csPtr->readPtr;
+ outChan = (Tcl_Channel) csPtr->writePtr;
inStatePtr = csPtr->readPtr->state;
outStatePtr = csPtr->writePtr->state;
@@ -9566,12 +9730,12 @@ StopCopy(
csPtr->writeFlags & (CHANNEL_LINEBUFFERED | CHANNEL_UNBUFFERED);
if (csPtr->cmdPtr) {
- Tcl_DeleteChannelHandler((Tcl_Channel) csPtr->readPtr, CopyEventProc,
- csPtr);
- if (csPtr->readPtr != csPtr->writePtr) {
- Tcl_DeleteChannelHandler((Tcl_Channel) csPtr->writePtr,
- CopyEventProc, csPtr);
+ Tcl_DeleteChannelHandler(inChan, CopyEventProc, csPtr);
+ if (inChan != outChan) {
+ Tcl_DeleteChannelHandler(outChan, CopyEventProc, csPtr);
}
+ Tcl_DeleteChannelHandler(inChan, MBRead, csPtr);
+ Tcl_DeleteChannelHandler(outChan, MBWrite, csPtr);
TclDecrRefCount(csPtr->cmdPtr);
}
inStatePtr->csPtrR = NULL;