summaryrefslogtreecommitdiffstats
path: root/Utilities/cmlibuv/src/win/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'Utilities/cmlibuv/src/win/pipe.c')
-rw-r--r--Utilities/cmlibuv/src/win/pipe.c1130
1 files changed, 647 insertions, 483 deletions
diff --git a/Utilities/cmlibuv/src/win/pipe.c b/Utilities/cmlibuv/src/win/pipe.c
index 83ee4f9..277f649 100644
--- a/Utilities/cmlibuv/src/win/pipe.c
+++ b/Utilities/cmlibuv/src/win/pipe.c
@@ -21,39 +21,28 @@
#include <assert.h>
#include <io.h>
-#include <string.h>
#include <stdio.h>
#include <stdlib.h>
+#include <string.h>
-#include "uv.h"
-#include "internal.h"
#include "handle-inl.h"
-#include "stream-inl.h"
+#include "internal.h"
#include "req-inl.h"
+#include "stream-inl.h"
+#include "uv-common.h"
+#include "uv.h"
#include <aclapi.h>
#include <accctrl.h>
-typedef struct uv__ipc_queue_item_s uv__ipc_queue_item_t;
-
-struct uv__ipc_queue_item_s {
- /*
- * NOTE: It is important for socket_info_ex to be the first field,
- * because we will we assigning it to the pending_ipc_info.socket_info
- */
- uv__ipc_socket_info_ex socket_info_ex;
- QUEUE member;
- int tcp_connection;
-};
-
/* A zero-size buffer for use by uv_pipe_read */
static char uv_zero_[] = "";
/* Null uv_buf_t */
static const uv_buf_t uv_null_buf_ = { 0, NULL };
-/* The timeout that the pipe will wait for the remote end to write data */
-/* when the local ends wants to shut it down. */
+/* The timeout that the pipe will wait for the remote end to write data when
+ * the local ends wants to shut it down. */
static const int64_t eof_timeout = 50; /* ms */
static const int default_pending_pipe_instances = 4;
@@ -62,22 +51,44 @@ static const int default_pending_pipe_instances = 4;
static char pipe_prefix[] = "\\\\?\\pipe";
static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
-/* IPC protocol flags. */
-#define UV_IPC_RAW_DATA 0x0001
-#define UV_IPC_TCP_SERVER 0x0002
-#define UV_IPC_TCP_CONNECTION 0x0004
+/* IPC incoming xfer queue item. */
+typedef struct {
+ uv__ipc_socket_xfer_type_t xfer_type;
+ uv__ipc_socket_xfer_info_t xfer_info;
+ QUEUE member;
+} uv__ipc_xfer_queue_item_t;
+
+/* IPC frame header flags. */
+/* clang-format off */
+enum {
+ UV__IPC_FRAME_HAS_DATA = 0x01,
+ UV__IPC_FRAME_HAS_SOCKET_XFER = 0x02,
+ UV__IPC_FRAME_XFER_IS_TCP_CONNECTION = 0x04,
+ /* These are combinations of the flags above. */
+ UV__IPC_FRAME_XFER_FLAGS = 0x06,
+ UV__IPC_FRAME_VALID_FLAGS = 0x07
+};
+/* clang-format on */
/* IPC frame header. */
typedef struct {
- int flags;
- uint64_t raw_data_length;
-} uv_ipc_frame_header_t;
-
-/* IPC frame, which contains an imported TCP socket stream. */
+ uint32_t flags;
+ uint32_t reserved1; /* Ignored. */
+ uint32_t data_length; /* Must be zero if there is no data. */
+ uint32_t reserved2; /* Must be zero. */
+} uv__ipc_frame_header_t;
+
+/* To implement the IPC protocol correctly, these structures must have exactly
+ * the right size. */
+STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
+STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
+
+/* Coalesced write request. */
typedef struct {
- uv_ipc_frame_header_t header;
- uv__ipc_socket_info_ex socket_info_ex;
-} uv_ipc_frame_uv_stream;
+ uv_write_t req; /* Internal heap-allocated write request. */
+ uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
+} uv__coalesced_write_t;
+
static void eof_timer_init(uv_pipe_t* pipe);
static void eof_timer_start(uv_pipe_t* pipe);
@@ -98,15 +109,12 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
handle->reqs_pending = 0;
handle->handle = INVALID_HANDLE_VALUE;
handle->name = NULL;
- handle->pipe.conn.ipc_pid = 0;
- handle->pipe.conn.remaining_ipc_rawdata_bytes = 0;
- QUEUE_INIT(&handle->pipe.conn.pending_ipc_info.queue);
- handle->pipe.conn.pending_ipc_info.queue_len = 0;
+ handle->pipe.conn.ipc_remote_pid = 0;
+ handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
+ QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue);
+ handle->pipe.conn.ipc_xfer_queue_length = 0;
handle->ipc = ipc;
handle->pipe.conn.non_overlapped_writes_tail = NULL;
- handle->pipe.conn.readfile_thread = NULL;
-
- UV_REQ_INIT(&handle->pipe.conn.ipc_header_write_req, UV_UNKNOWN_REQ);
return 0;
}
@@ -117,10 +125,9 @@ static void uv_pipe_connection_init(uv_pipe_t* handle) {
handle->read_req.data = handle;
handle->pipe.conn.eof_timer = NULL;
assert(!(handle->flags & UV_HANDLE_PIPESERVER));
- if (pCancelSynchronousIo &&
- handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
- uv_mutex_init(&handle->pipe.conn.readfile_mutex);
- handle->flags |= UV_HANDLE_PIPE_READ_CANCELABLE;
+ if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
+ handle->pipe.conn.readfile_thread_handle = NULL;
+ InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
}
}
@@ -347,12 +354,7 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_PIPE_LOCAL_INFORMATION pipe_info;
- uv__ipc_queue_item_t* item;
-
- if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
- handle->flags &= ~UV_HANDLE_PIPE_READ_CANCELABLE;
- uv_mutex_destroy(&handle->pipe.conn.readfile_mutex);
- }
+ uv__ipc_xfer_queue_item_t* xfer_queue_item;
if ((handle->flags & UV_HANDLE_CONNECTION) &&
handle->stream.conn.shutdown_req != NULL &&
@@ -362,7 +364,7 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
/* Clear the shutdown_req field so we don't go here again. */
handle->stream.conn.shutdown_req = NULL;
- if (handle->flags & UV__HANDLE_CLOSING) {
+ if (handle->flags & UV_HANDLE_CLOSING) {
UNREGISTER_HANDLE_REQ(loop, handle, req);
/* Already closing. Cancel the shutdown. */
@@ -423,33 +425,33 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
}
}
- if (handle->flags & UV__HANDLE_CLOSING &&
+ if (handle->flags & UV_HANDLE_CLOSING &&
handle->reqs_pending == 0) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
if (handle->flags & UV_HANDLE_CONNECTION) {
/* Free pending sockets */
- while (!QUEUE_EMPTY(&handle->pipe.conn.pending_ipc_info.queue)) {
+ while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) {
QUEUE* q;
SOCKET socket;
- q = QUEUE_HEAD(&handle->pipe.conn.pending_ipc_info.queue);
+ q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue);
QUEUE_REMOVE(q);
- item = QUEUE_DATA(q, uv__ipc_queue_item_t, member);
+ xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
/* Materialize socket and close it */
socket = WSASocketW(FROM_PROTOCOL_INFO,
FROM_PROTOCOL_INFO,
FROM_PROTOCOL_INFO,
- &item->socket_info_ex.socket_info,
+ &xfer_queue_item->xfer_info.socket_info,
0,
WSA_FLAG_OVERLAPPED);
- uv__free(item);
+ uv__free(xfer_queue_item);
if (socket != INVALID_SOCKET)
closesocket(socket);
}
- handle->pipe.conn.pending_ipc_info.queue_len = 0;
+ handle->pipe.conn.ipc_xfer_queue_length = 0;
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
@@ -461,6 +463,9 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
handle->read_req.event_handle = NULL;
}
}
+
+ if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
+ DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
}
if (handle->flags & UV_HANDLE_PIPESERVER) {
@@ -595,8 +600,8 @@ static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
loop = handle->loop;
assert(loop);
- /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. */
- /* We wait for the pipe to become available with WaitNamedPipe. */
+ /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait
+ * for the pipe to become available with WaitNamedPipe. */
while (WaitNamedPipeW(handle->name, 30000)) {
/* The pipe is now available, try to connect. */
pipeHandle = open_named_pipe(handle->name, &duplex_flags);
@@ -706,55 +711,68 @@ error:
}
-void uv__pipe_pause_read(uv_pipe_t* handle) {
- if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
- /* Pause the ReadFile task briefly, to work
- around the Windows kernel bug that causes
- any access to a NamedPipe to deadlock if
- any process has called ReadFile */
- HANDLE h;
- uv_mutex_lock(&handle->pipe.conn.readfile_mutex);
- h = handle->pipe.conn.readfile_thread;
- while (h) {
- /* spinlock: we expect this to finish quickly,
- or we are probably about to deadlock anyways
- (in the kernel), so it doesn't matter */
- pCancelSynchronousIo(h);
- SwitchToThread(); /* yield thread control briefly */
- h = handle->pipe.conn.readfile_thread;
- }
- }
-}
+void uv__pipe_interrupt_read(uv_pipe_t* handle) {
+ BOOL r;
+
+ if (!(handle->flags & UV_HANDLE_READ_PENDING))
+ return; /* No pending reads. */
+ if (handle->flags & UV_HANDLE_CANCELLATION_PENDING)
+ return; /* Already cancelled. */
+ if (handle->handle == INVALID_HANDLE_VALUE)
+ return; /* Pipe handle closed. */
+ if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) {
+ /* Cancel asynchronous read. */
+ r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
+ assert(r || GetLastError() == ERROR_NOT_FOUND);
-void uv__pipe_unpause_read(uv_pipe_t* handle) {
- if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
- uv_mutex_unlock(&handle->pipe.conn.readfile_mutex);
+ } else {
+ /* Cancel synchronous read (which is happening in the thread pool). */
+ HANDLE thread;
+ volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
+
+ EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock);
+
+ thread = *thread_ptr;
+ if (thread == NULL) {
+ /* The thread pool thread has not yet reached the point of blocking, we
+ * can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */
+ *thread_ptr = INVALID_HANDLE_VALUE;
+
+ } else {
+ /* Spin until the thread has acknowledged (by setting the thread to
+ * INVALID_HANDLE_VALUE) that it is past the point of blocking. */
+ while (thread != INVALID_HANDLE_VALUE) {
+ r = CancelSynchronousIo(thread);
+ assert(r || GetLastError() == ERROR_NOT_FOUND);
+ SwitchToThread(); /* Yield thread. */
+ thread = *thread_ptr;
+ }
+ }
+
+ LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock);
}
+
+ /* Set flag to indicate that read has been cancelled. */
+ handle->flags |= UV_HANDLE_CANCELLATION_PENDING;
}
-void uv__pipe_stop_read(uv_pipe_t* handle) {
- if (pCancelIoEx &&
- !(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) &&
- !(handle->flags & UV_HANDLE_EMULATE_IOCP) &&
- handle->flags & UV_HANDLE_READING &&
- handle->read_req.type == UV_READ) {
- pCancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
- }
+void uv__pipe_read_stop(uv_pipe_t* handle) {
handle->flags &= ~UV_HANDLE_READING;
- uv__pipe_pause_read((uv_pipe_t*)handle);
- uv__pipe_unpause_read((uv_pipe_t*)handle);
+ DECREASE_ACTIVE_COUNT(handle->loop, handle);
+
+ uv__pipe_interrupt_read(handle);
}
-/* Cleans up uv_pipe_t (server or connection) and all resources associated */
-/* with it. */
+/* Cleans up uv_pipe_t (server or connection) and all resources associated with
+ * it. */
void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
int i;
HANDLE pipeHandle;
- uv__pipe_stop_read(handle);
+ uv__pipe_interrupt_read(handle);
if (handle->name) {
uv__free(handle->name);
@@ -864,23 +882,22 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
uv_pipe_t* pipe_client;
uv_pipe_accept_t* req;
QUEUE* q;
- uv__ipc_queue_item_t* item;
+ uv__ipc_xfer_queue_item_t* item;
int err;
if (server->ipc) {
- if (QUEUE_EMPTY(&server->pipe.conn.pending_ipc_info.queue)) {
+ if (QUEUE_EMPTY(&server->pipe.conn.ipc_xfer_queue)) {
/* No valid pending sockets. */
return WSAEWOULDBLOCK;
}
- q = QUEUE_HEAD(&server->pipe.conn.pending_ipc_info.queue);
+ q = QUEUE_HEAD(&server->pipe.conn.ipc_xfer_queue);
QUEUE_REMOVE(q);
- server->pipe.conn.pending_ipc_info.queue_len--;
- item = QUEUE_DATA(q, uv__ipc_queue_item_t, member);
+ server->pipe.conn.ipc_xfer_queue_length--;
+ item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
- err = uv_tcp_import((uv_tcp_t*)client,
- &item->socket_info_ex,
- item->tcp_connection);
+ err = uv__tcp_xfer_import(
+ (uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
if (err != 0)
return err;
@@ -889,8 +906,8 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
} else {
pipe_client = (uv_pipe_t*)client;
- /* Find a connection instance that has been connected, but not yet */
- /* accepted. */
+ /* Find a connection instance that has been connected, but not yet
+ * accepted. */
req = server->pipe.serv.pending_accepts;
if (!req) {
@@ -908,7 +925,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
req->next_pending = NULL;
req->pipeHandle = INVALID_HANDLE_VALUE;
- if (!(server->flags & UV__HANDLE_CLOSING)) {
+ if (!(server->flags & UV_HANDLE_CLOSING)) {
uv_pipe_queue_accept(loop, server, req, FALSE);
}
}
@@ -953,74 +970,75 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
}
-static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
- int result;
- DWORD bytes;
- uv_read_t* req = (uv_read_t*) parameter;
+static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) {
+ uv_read_t* req = (uv_read_t*) arg;
uv_pipe_t* handle = (uv_pipe_t*) req->data;
uv_loop_t* loop = handle->loop;
- HANDLE hThread = NULL;
+ volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
+ CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock;
+ HANDLE thread;
+ DWORD bytes;
DWORD err;
- uv_mutex_t *m = &handle->pipe.conn.readfile_mutex;
- assert(req != NULL);
assert(req->type == UV_READ);
assert(handle->type == UV_NAMED_PIPE);
- if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
- uv_mutex_lock(m); /* mutex controls *setting* of readfile_thread */
- if (DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
- GetCurrentProcess(), &hThread,
- 0, FALSE, DUPLICATE_SAME_ACCESS)) {
- handle->pipe.conn.readfile_thread = hThread;
- } else {
- hThread = NULL;
- }
- uv_mutex_unlock(m);
+ err = 0;
+
+ /* Create a handle to the current thread. */
+ if (!DuplicateHandle(GetCurrentProcess(),
+ GetCurrentThread(),
+ GetCurrentProcess(),
+ &thread,
+ 0,
+ FALSE,
+ DUPLICATE_SAME_ACCESS)) {
+ err = GetLastError();
+ goto out1;
}
-restart_readfile:
- if (handle->flags & UV_HANDLE_READING) {
- result = ReadFile(handle->handle,
- &uv_zero_,
- 0,
- &bytes,
- NULL);
- if (!result) {
- err = GetLastError();
- if (err == ERROR_OPERATION_ABORTED &&
- handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
- if (handle->flags & UV_HANDLE_READING) {
- /* just a brief break to do something else */
- handle->pipe.conn.readfile_thread = NULL;
- /* resume after it is finished */
- uv_mutex_lock(m);
- handle->pipe.conn.readfile_thread = hThread;
- uv_mutex_unlock(m);
- goto restart_readfile;
- } else {
- result = 1; /* successfully stopped reading */
- }
- }
- }
+
+ /* The lock needs to be held when thread handle is modified. */
+ EnterCriticalSection(lock);
+ if (*thread_ptr == INVALID_HANDLE_VALUE) {
+ /* uv__pipe_interrupt_read() cancelled reading before we got here. */
+ err = ERROR_OPERATION_ABORTED;
} else {
- result = 1; /* successfully aborted read before it even started */
- }
- if (hThread) {
- assert(hThread == handle->pipe.conn.readfile_thread);
- /* mutex does not control clearing readfile_thread */
- handle->pipe.conn.readfile_thread = NULL;
- uv_mutex_lock(m);
- /* only when we hold the mutex lock is it safe to
- open or close the handle */
- CloseHandle(hThread);
- uv_mutex_unlock(m);
+ /* Let main thread know which worker thread is doing the blocking read. */
+ assert(*thread_ptr == NULL);
+ *thread_ptr = thread;
}
+ LeaveCriticalSection(lock);
- if (!result) {
- SET_REQ_ERROR(req, err);
- }
+ if (err)
+ goto out2;
+
+ /* Block the thread until data is available on the pipe, or the read is
+ * cancelled. */
+ if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL))
+ err = GetLastError();
+
+ /* Let the main thread know the worker is past the point of blocking. */
+ assert(thread == *thread_ptr);
+ *thread_ptr = INVALID_HANDLE_VALUE;
+
+ /* Briefly acquire the mutex. Since the main thread holds the lock while it
+ * is spinning trying to cancel this thread's I/O, we will block here until
+ * it stops doing that. */
+ EnterCriticalSection(lock);
+ LeaveCriticalSection(lock);
+out2:
+ /* Close the handle to the current thread. */
+ CloseHandle(thread);
+
+out1:
+ /* Set request status and post a completion record to the IOCP. */
+ if (err)
+ SET_REQ_ERROR(req, err);
+ else
+ SET_REQ_SUCCESS(req);
POST_COMPLETION_FOR_REQ(loop, req);
+
return 0;
}
@@ -1102,6 +1120,7 @@ static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
req = &handle->read_req;
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
+ handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */
if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
req,
WT_EXECUTELONGFUNCTION)) {
@@ -1169,8 +1188,8 @@ int uv_pipe_read_start(uv_pipe_t* handle,
handle->read_cb = read_cb;
handle->alloc_cb = alloc_cb;
- /* If reading was stopped and then started again, there could still be a */
- /* read request pending. */
+ /* If reading was stopped and then started again, there could still be a read
+ * request pending. */
if (!(handle->flags & UV_HANDLE_READ_PENDING))
uv_pipe_queue_read(loop, handle);
@@ -1226,154 +1245,110 @@ static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
}
-static int uv_pipe_write_impl(uv_loop_t* loop,
- uv_write_t* req,
- uv_pipe_t* handle,
- const uv_buf_t bufs[],
- unsigned int nbufs,
- uv_stream_t* send_handle,
- uv_write_cb cb) {
- int err;
- int result;
- uv_tcp_t* tcp_send_handle;
- uv_write_t* ipc_header_req = NULL;
- uv_ipc_frame_uv_stream ipc_frame;
+static int uv__build_coalesced_write_req(uv_write_t* user_req,
+ const uv_buf_t bufs[],
+ size_t nbufs,
+ uv_write_t** req_out,
+ uv_buf_t* write_buf_out) {
+ /* Pack into a single heap-allocated buffer:
+ * (a) a uv_write_t structure where libuv stores the actual state.
+ * (b) a pointer to the original uv_write_t.
+ * (c) data from all `bufs` entries.
+ */
+ char* heap_buffer;
+ size_t heap_buffer_length, heap_buffer_offset;
+ uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */
+ char* data_start; /* (c) */
+ size_t data_length;
+ unsigned int i;
+
+ /* Compute combined size of all combined buffers from `bufs`. */
+ data_length = 0;
+ for (i = 0; i < nbufs; i++)
+ data_length += bufs[i].len;
+
+ /* The total combined size of data buffers should not exceed UINT32_MAX,
+ * because WriteFile() won't accept buffers larger than that. */
+ if (data_length > UINT32_MAX)
+ return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
+
+ /* Compute heap buffer size. */
+ heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */
+ data_length; /* (c) */
+
+ /* Allocate buffer. */
+ heap_buffer = uv__malloc(heap_buffer_length);
+ if (heap_buffer == NULL)
+ return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
+
+ /* Copy uv_write_t information to the buffer. */
+ coalesced_write_req = (uv__coalesced_write_t*) heap_buffer;
+ coalesced_write_req->req = *user_req; /* copy (a) */
+ coalesced_write_req->req.coalesced = 1;
+ coalesced_write_req->user_req = user_req; /* copy (b) */
+ heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */
+
+ /* Copy data buffers to the heap buffer. */
+ data_start = &heap_buffer[heap_buffer_offset];
+ for (i = 0; i < nbufs; i++) {
+ memcpy(&heap_buffer[heap_buffer_offset],
+ bufs[i].base,
+ bufs[i].len); /* copy (c) */
+ heap_buffer_offset += bufs[i].len; /* offset (c) */
+ }
+ assert(heap_buffer_offset == heap_buffer_length);
+
+ /* Set out arguments and return. */
+ *req_out = &coalesced_write_req->req;
+ *write_buf_out = uv_buf_init(data_start, (unsigned int) data_length);
+ return 0;
+}
- if (nbufs != 1 && (nbufs != 0 || !send_handle)) {
- return ERROR_NOT_SUPPORTED;
- }
- /* Only TCP handles are supported for sharing. */
- if (send_handle && ((send_handle->type != UV_TCP) ||
- (!(send_handle->flags & UV_HANDLE_BOUND) &&
- !(send_handle->flags & UV_HANDLE_CONNECTION)))) {
- return ERROR_NOT_SUPPORTED;
- }
+static int uv__pipe_write_data(uv_loop_t* loop,
+ uv_write_t* req,
+ uv_pipe_t* handle,
+ const uv_buf_t bufs[],
+ size_t nbufs,
+ uv_write_cb cb,
+ int copy_always) {
+ int err;
+ int result;
+ uv_buf_t write_buf;
assert(handle->handle != INVALID_HANDLE_VALUE);
UV_REQ_INIT(req, UV_WRITE);
req->handle = (uv_stream_t*) handle;
+ req->send_handle = NULL;
req->cb = cb;
- req->ipc_header = 0;
+ /* Private fields. */
+ req->coalesced = 0;
req->event_handle = NULL;
req->wait_handle = INVALID_HANDLE_VALUE;
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
-
- if (handle->ipc) {
- assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
- ipc_frame.header.flags = 0;
-
- /* Use the IPC framing protocol. */
- if (send_handle) {
- tcp_send_handle = (uv_tcp_t*)send_handle;
-
- if (handle->pipe.conn.ipc_pid == 0) {
- handle->pipe.conn.ipc_pid = uv_current_pid();
- }
-
- err = uv_tcp_duplicate_socket(tcp_send_handle, handle->pipe.conn.ipc_pid,
- &ipc_frame.socket_info_ex.socket_info);
- if (err) {
- return err;
- }
-
- ipc_frame.socket_info_ex.delayed_error = tcp_send_handle->delayed_error;
-
- ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
-
- if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
- ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
- }
- }
-
- if (nbufs == 1) {
- ipc_frame.header.flags |= UV_IPC_RAW_DATA;
- ipc_frame.header.raw_data_length = bufs[0].len;
- }
-
- /*
- * Use the provided req if we're only doing a single write.
- * If we're doing multiple writes, use ipc_header_write_req to do
- * the first write, and then use the provided req for the second write.
- */
- if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
- ipc_header_req = req;
- } else {
- /*
- * Try to use the preallocated write req if it's available.
- * Otherwise allocate a new one.
- */
- if (handle->pipe.conn.ipc_header_write_req.type != UV_WRITE) {
- ipc_header_req = (uv_write_t*)&handle->pipe.conn.ipc_header_write_req;
- } else {
- ipc_header_req = (uv_write_t*)uv__malloc(sizeof(uv_write_t));
- if (!ipc_header_req) {
- uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
- }
- }
-
- UV_REQ_INIT(ipc_header_req, UV_WRITE);
- ipc_header_req->handle = (uv_stream_t*) handle;
- ipc_header_req->cb = NULL;
- ipc_header_req->ipc_header = 1;
- }
-
- /* Write the header or the whole frame. */
- memset(&ipc_header_req->u.io.overlapped, 0,
- sizeof(ipc_header_req->u.io.overlapped));
-
- /* Using overlapped IO, but wait for completion before returning.
- This write is blocking because ipc_frame is on stack. */
- ipc_header_req->u.io.overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
- if (!ipc_header_req->u.io.overlapped.hEvent) {
- uv_fatal_error(GetLastError(), "CreateEvent");
- }
-
- result = WriteFile(handle->handle,
- &ipc_frame,
- ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
- sizeof(ipc_frame) : sizeof(ipc_frame.header),
- NULL,
- &ipc_header_req->u.io.overlapped);
- if (!result && GetLastError() != ERROR_IO_PENDING) {
- err = GetLastError();
- CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
+ req->write_buffer = uv_null_buf_;
+
+ if (nbufs == 0) {
+ /* Write empty buffer. */
+ write_buf = uv_null_buf_;
+ } else if (nbufs == 1 && !copy_always) {
+ /* Write directly from bufs[0]. */
+ write_buf = bufs[0];
+ } else {
+ /* Coalesce all `bufs` into one big buffer. This also creates a new
+ * write-request structure that replaces the old one. */
+ err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf);
+ if (err != 0)
return err;
- }
-
- if (!result) {
- /* Request not completed immediately. Wait for it.*/
- if (WaitForSingleObject(ipc_header_req->u.io.overlapped.hEvent, INFINITE) !=
- WAIT_OBJECT_0) {
- err = GetLastError();
- CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
- return err;
- }
- }
- ipc_header_req->u.io.queued_bytes = 0;
- CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
- ipc_header_req->u.io.overlapped.hEvent = NULL;
-
- REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
- handle->reqs_pending++;
- handle->stream.conn.write_reqs_pending++;
-
- /* If we don't have any raw data to write - we're done. */
- if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
- return 0;
- }
}
if ((handle->flags &
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
DWORD bytes;
- result = WriteFile(handle->handle,
- bufs[0].base,
- bufs[0].len,
- &bytes,
- NULL);
+ result =
+ WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL);
if (!result) {
err = GetLastError();
@@ -1389,14 +1364,14 @@ static int uv_pipe_write_impl(uv_loop_t* loop,
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
} else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
- req->write_buffer = bufs[0];
+ req->write_buffer = write_buf;
uv_insert_non_overlapped_write_req(handle, req);
if (handle->stream.conn.write_reqs_pending == 0) {
uv_queue_non_overlapped_write(handle);
}
/* Request queued by the kernel. */
- req->u.io.queued_bytes = bufs[0].len;
+ req->u.io.queued_bytes = write_buf.len;
handle->write_queue_size += req->u.io.queued_bytes;
} else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
/* Using overlapped IO, but wait for completion before returning */
@@ -1406,8 +1381,8 @@ static int uv_pipe_write_impl(uv_loop_t* loop,
}
result = WriteFile(handle->handle,
- bufs[0].base,
- bufs[0].len,
+ write_buf.base,
+ write_buf.len,
NULL,
&req->u.io.overlapped);
@@ -1422,13 +1397,13 @@ static int uv_pipe_write_impl(uv_loop_t* loop,
req->u.io.queued_bytes = 0;
} else {
/* Request queued by the kernel. */
- req->u.io.queued_bytes = bufs[0].len;
+ req->u.io.queued_bytes = write_buf.len;
handle->write_queue_size += req->u.io.queued_bytes;
if (WaitForSingleObject(req->u.io.overlapped.hEvent, INFINITE) !=
WAIT_OBJECT_0) {
err = GetLastError();
CloseHandle(req->u.io.overlapped.hEvent);
- return uv_translate_sys_error(err);
+ return err;
}
}
CloseHandle(req->u.io.overlapped.hEvent);
@@ -1439,8 +1414,8 @@ static int uv_pipe_write_impl(uv_loop_t* loop,
return 0;
} else {
result = WriteFile(handle->handle,
- bufs[0].base,
- bufs[0].len,
+ write_buf.base,
+ write_buf.len,
NULL,
&req->u.io.overlapped);
@@ -1453,7 +1428,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop,
req->u.io.queued_bytes = 0;
} else {
/* Request queued by the kernel. */
- req->u.io.queued_bytes = bufs[0].len;
+ req->u.io.queued_bytes = write_buf.len;
handle->write_queue_size += req->u.io.queued_bytes;
}
@@ -1478,35 +1453,143 @@ static int uv_pipe_write_impl(uv_loop_t* loop,
}
-int uv_pipe_write(uv_loop_t* loop,
- uv_write_t* req,
- uv_pipe_t* handle,
- const uv_buf_t bufs[],
- unsigned int nbufs,
- uv_write_cb cb) {
- return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, NULL, cb);
+static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
+ DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
+
+ /* If the both ends of the IPC pipe are owned by the same process,
+ * the remote end pid may not yet be set. If so, do it here.
+ * TODO: this is weird; it'd probably better to use a handshake. */
+ if (*pid == 0)
+ *pid = GetCurrentProcessId();
+
+ return *pid;
+}
+
+
+int uv__pipe_write_ipc(uv_loop_t* loop,
+ uv_write_t* req,
+ uv_pipe_t* handle,
+ const uv_buf_t data_bufs[],
+ size_t data_buf_count,
+ uv_stream_t* send_handle,
+ uv_write_cb cb) {
+ uv_buf_t stack_bufs[6];
+ uv_buf_t* bufs;
+ size_t buf_count, buf_index;
+ uv__ipc_frame_header_t frame_header;
+ uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
+ uv__ipc_socket_xfer_info_t xfer_info;
+ uint64_t data_length;
+ size_t i;
+ int err;
+
+ /* Compute the combined size of data buffers. */
+ data_length = 0;
+ for (i = 0; i < data_buf_count; i++)
+ data_length += data_bufs[i].len;
+ if (data_length > UINT32_MAX)
+ return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
+
+ /* Prepare the frame's socket xfer payload. */
+ if (send_handle != NULL) {
+ uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
+
+ /* Verify that `send_handle` it is indeed a tcp handle. */
+ if (send_tcp_handle->type != UV_TCP)
+ return ERROR_NOT_SUPPORTED;
+
+ /* Export the tcp handle. */
+ err = uv__tcp_xfer_export(send_tcp_handle,
+ uv__pipe_get_ipc_remote_pid(handle),
+ &xfer_type,
+ &xfer_info);
+ if (err != 0)
+ return err;
+ }
+
+ /* Compute the number of uv_buf_t's required. */
+ buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
+ if (send_handle != NULL)
+ buf_count += 1; /* One extra for the socket xfer information. */
+
+ /* Use the on-stack buffer array if it is big enough; otherwise allocate
+ * space for it on the heap. */
+ if (buf_count < ARRAY_SIZE(stack_bufs)) {
+ /* Use on-stack buffer array. */
+ bufs = stack_bufs;
+ } else {
+ /* Use heap-allocated buffer array. */
+ bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
+ if (bufs == NULL)
+ return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
+ }
+ buf_index = 0;
+
+ /* Initialize frame header and add it to the buffers list. */
+ memset(&frame_header, 0, sizeof frame_header);
+ bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
+
+ if (send_handle != NULL) {
+ /* Add frame header flags. */
+ switch (xfer_type) {
+ case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
+ frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
+ UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
+ break;
+ case UV__IPC_SOCKET_XFER_TCP_SERVER:
+ frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
+ break;
+ default:
+ assert(0); /* Unreachable. */
+ }
+ /* Add xfer info buffer. */
+ bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
+ }
+
+ if (data_length > 0) {
+ /* Update frame header. */
+ frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
+ frame_header.data_length = (uint32_t) data_length;
+ /* Add data buffers to buffers list. */
+ for (i = 0; i < data_buf_count; i++)
+ bufs[buf_index++] = data_bufs[i];
+ }
+
+ /* Write buffers. We set the `always_copy` flag, so it is not a problem that
+ * some of the written data lives on the stack. */
+ err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
+
+ /* If we had to heap-allocate the bufs array, free it now. */
+ if (bufs != stack_bufs) {
+ uv__free(bufs);
+ }
+
+ return err;
}
-int uv_pipe_write2(uv_loop_t* loop,
+int uv__pipe_write(uv_loop_t* loop,
uv_write_t* req,
uv_pipe_t* handle,
const uv_buf_t bufs[],
- unsigned int nbufs,
+ size_t nbufs,
uv_stream_t* send_handle,
uv_write_cb cb) {
- if (!handle->ipc) {
- return WSAEINVAL;
+ if (handle->ipc) {
+ /* IPC pipe write: use framing protocol. */
+ return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
+ } else {
+ /* Non-IPC pipe write: put data on the wire directly. */
+ assert(send_handle == NULL);
+ return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
}
-
- return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, send_handle, cb);
}
static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
uv_buf_t buf) {
- /* If there is an eof timer running, we don't need it any more, */
- /* so discard it. */
+ /* If there is an eof timer running, we don't need it any more, so discard
+ * it. */
eof_timer_destroy(handle);
handle->flags &= ~UV_HANDLE_READABLE;
@@ -1518,8 +1601,8 @@ static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
uv_buf_t buf) {
- /* If there is an eof timer running, we don't need it any more, */
- /* so discard it. */
+ /* If there is an eof timer running, we don't need it any more, so discard
+ * it. */
eof_timer_destroy(handle);
uv_read_stop((uv_stream_t*) handle);
@@ -1530,10 +1613,7 @@ static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
int error, uv_buf_t buf) {
- if (error == ERROR_OPERATION_ABORTED) {
- /* do nothing (equivalent to EINTR) */
- }
- else if (error == ERROR_BROKEN_PIPE) {
+ if (error == ERROR_BROKEN_PIPE) {
uv_pipe_read_eof(loop, handle, buf);
} else {
uv_pipe_read_error(loop, handle, error, buf);
@@ -1541,152 +1621,228 @@ static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
}
-void uv__pipe_insert_pending_socket(uv_pipe_t* handle,
- uv__ipc_socket_info_ex* info,
- int tcp_connection) {
- uv__ipc_queue_item_t* item;
+static void uv__pipe_queue_ipc_xfer_info(
+ uv_pipe_t* handle,
+ uv__ipc_socket_xfer_type_t xfer_type,
+ uv__ipc_socket_xfer_info_t* xfer_info) {
+ uv__ipc_xfer_queue_item_t* item;
- item = (uv__ipc_queue_item_t*) uv__malloc(sizeof(*item));
+ item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
if (item == NULL)
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
- memcpy(&item->socket_info_ex, info, sizeof(item->socket_info_ex));
- item->tcp_connection = tcp_connection;
- QUEUE_INSERT_TAIL(&handle->pipe.conn.pending_ipc_info.queue, &item->member);
- handle->pipe.conn.pending_ipc_info.queue_len++;
+ item->xfer_type = xfer_type;
+ item->xfer_info = *xfer_info;
+
+ QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member);
+ handle->pipe.conn.ipc_xfer_queue_length++;
+}
+
+
+/* Read an exact number of bytes from a pipe. If an error or end-of-file is
+ * encountered before the requested number of bytes are read, an error is
+ * returned. */
+static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
+ DWORD bytes_read, bytes_read_now;
+
+ bytes_read = 0;
+ while (bytes_read < count) {
+ if (!ReadFile(h,
+ (char*) buffer + bytes_read,
+ count - bytes_read,
+ &bytes_read_now,
+ NULL)) {
+ return GetLastError();
+ }
+
+ bytes_read += bytes_read_now;
+ }
+
+ assert(bytes_read == count);
+ return 0;
}
-void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
- uv_req_t* req) {
- DWORD bytes, avail;
+static DWORD uv__pipe_read_data(uv_loop_t* loop,
+ uv_pipe_t* handle,
+ DWORD suggested_bytes,
+ DWORD max_bytes) {
+ DWORD bytes_read;
uv_buf_t buf;
- uv_ipc_frame_uv_stream ipc_frame;
+ /* Ask the user for a buffer to read data into. */
+ buf = uv_buf_init(NULL, 0);
+ handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
+ if (buf.base == NULL || buf.len == 0) {
+ handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
+ return 0; /* Break out of read loop. */
+ }
+
+ /* Ensure we read at most the smaller of:
+ * (a) the length of the user-allocated buffer.
+ * (b) the maximum data length as specified by the `max_bytes` argument.
+ */
+ if (max_bytes > buf.len)
+ max_bytes = buf.len;
+
+ /* Read into the user buffer. */
+ if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
+ uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
+ return 0; /* Break out of read loop. */
+ }
+
+ /* Call the read callback. */
+ handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
+
+ return bytes_read;
+}
+
+
+static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
+ uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
+ int err;
+
+ if (*data_remaining > 0) {
+ /* Read frame data payload. */
+ DWORD bytes_read =
+ uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
+ *data_remaining -= bytes_read;
+ return bytes_read;
+
+ } else {
+ /* Start of a new IPC frame. */
+ uv__ipc_frame_header_t frame_header;
+ uint32_t xfer_flags;
+ uv__ipc_socket_xfer_type_t xfer_type;
+ uv__ipc_socket_xfer_info_t xfer_info;
+
+ /* Read the IPC frame header. */
+ err = uv__pipe_read_exactly(
+ handle->handle, &frame_header, sizeof frame_header);
+ if (err)
+ goto error;
+
+ /* Validate that flags are valid. */
+ if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
+ goto invalid;
+ /* Validate that reserved2 is zero. */
+ if (frame_header.reserved2 != 0)
+ goto invalid;
+
+ /* Parse xfer flags. */
+ xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
+ if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
+ /* Socket coming -- determine the type. */
+ xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
+ ? UV__IPC_SOCKET_XFER_TCP_CONNECTION
+ : UV__IPC_SOCKET_XFER_TCP_SERVER;
+ } else if (xfer_flags == 0) {
+ /* No socket. */
+ xfer_type = UV__IPC_SOCKET_XFER_NONE;
+ } else {
+ /* Invalid flags. */
+ goto invalid;
+ }
+
+ /* Parse data frame information. */
+ if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
+ *data_remaining = frame_header.data_length;
+ } else if (frame_header.data_length != 0) {
+ /* Data length greater than zero but data flag not set -- invalid. */
+ goto invalid;
+ }
+
+ /* If no socket xfer info follows, return here. Data will be read in a
+ * subsequent invocation of uv__pipe_read_ipc(). */
+ if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
+ return sizeof frame_header; /* Number of bytes read. */
+
+ /* Read transferred socket information. */
+ err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
+ if (err)
+ goto error;
+
+ /* Store the pending socket info. */
+ uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
+
+ /* Return number of bytes read. */
+ return sizeof frame_header + sizeof xfer_info;
+ }
+
+invalid:
+ /* Invalid frame. */
+ err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
+
+error:
+ uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
+ return 0; /* Break out of read loop. */
+}
+
+
+void uv_process_pipe_read_req(uv_loop_t* loop,
+ uv_pipe_t* handle,
+ uv_req_t* req) {
assert(handle->type == UV_NAMED_PIPE);
- handle->flags &= ~UV_HANDLE_READ_PENDING;
+ handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
+ DECREASE_PENDING_REQ_COUNT(handle);
eof_timer_stop(handle);
- if (!REQ_SUCCESS(req)) {
- /* An error occurred doing the 0-read. */
- if (handle->flags & UV_HANDLE_READING) {
- uv_pipe_read_error_or_eof(loop,
- handle,
- GET_REQ_ERROR(req),
- uv_null_buf_);
- }
- } else {
- /* Do non-blocking reads until the buffer is empty */
- while (handle->flags & UV_HANDLE_READING) {
- if (!PeekNamedPipe(handle->handle,
- NULL,
- 0,
- NULL,
- &avail,
- NULL)) {
- uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
- break;
- }
+ /* At this point, we're done with bookkeeping. If the user has stopped
+ * reading the pipe in the meantime, there is nothing left to do, since there
+ * is no callback that we can call. */
+ if (!(handle->flags & UV_HANDLE_READING))
+ return;
- if (avail == 0) {
- /* There is nothing to read after all. */
- break;
- }
+ if (!REQ_SUCCESS(req)) {
+ /* An error occurred doing the zero-read. */
+ DWORD err = GET_REQ_ERROR(req);
- if (handle->ipc) {
- /* Use the IPC framing protocol to read the incoming data. */
- if (handle->pipe.conn.remaining_ipc_rawdata_bytes == 0) {
- /* We're reading a new frame. First, read the header. */
- assert(avail >= sizeof(ipc_frame.header));
-
- if (!ReadFile(handle->handle,
- &ipc_frame.header,
- sizeof(ipc_frame.header),
- &bytes,
- NULL)) {
- uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
- uv_null_buf_);
- break;
- }
-
- assert(bytes == sizeof(ipc_frame.header));
- assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
- UV_IPC_TCP_CONNECTION));
-
- if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
- assert(avail - sizeof(ipc_frame.header) >=
- sizeof(ipc_frame.socket_info_ex));
-
- /* Read the TCP socket info. */
- if (!ReadFile(handle->handle,
- &ipc_frame.socket_info_ex,
- sizeof(ipc_frame) - sizeof(ipc_frame.header),
- &bytes,
- NULL)) {
- uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
- uv_null_buf_);
- break;
- }
-
- assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
-
- /* Store the pending socket info. */
- uv__pipe_insert_pending_socket(
- handle,
- &ipc_frame.socket_info_ex,
- ipc_frame.header.flags & UV_IPC_TCP_CONNECTION);
- }
-
- if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
- handle->pipe.conn.remaining_ipc_rawdata_bytes =
- ipc_frame.header.raw_data_length;
- continue;
- }
- } else {
- avail = min(avail, (DWORD)handle->pipe.conn.remaining_ipc_rawdata_bytes);
- }
- }
+ /* If the read was cancelled by uv__pipe_interrupt_read(), the request may
+ * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
+ * the user; we'll start a new zero-read at the end of this function. */
+ if (err != ERROR_OPERATION_ABORTED)
+ uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
- buf = uv_buf_init(NULL, 0);
- handle->alloc_cb((uv_handle_t*) handle, avail, &buf);
- if (buf.base == NULL || buf.len == 0) {
- handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
+ } else {
+ /* The zero-read completed without error, indicating there is data
+ * available in the kernel buffer. */
+ DWORD avail;
+
+ /* Get the number of bytes available. */
+ avail = 0;
+ if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
+ uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
+
+ /* Read until we've either read all the bytes available, or the 'reading'
+ * flag is cleared. */
+ while (avail > 0 && handle->flags & UV_HANDLE_READING) {
+ /* Depending on the type of pipe, read either IPC frames or raw data. */
+ DWORD bytes_read =
+ handle->ipc ? uv__pipe_read_ipc(loop, handle)
+ : uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
+
+ /* If no bytes were read, treat this as an indication that an error
+ * occurred, and break out of the read loop. */
+ if (bytes_read == 0)
break;
- }
- assert(buf.base != NULL);
-
- if (ReadFile(handle->handle,
- buf.base,
- min(buf.len, avail),
- &bytes,
- NULL)) {
- /* Successful read */
- if (handle->ipc) {
- assert(handle->pipe.conn.remaining_ipc_rawdata_bytes >= bytes);
- handle->pipe.conn.remaining_ipc_rawdata_bytes =
- handle->pipe.conn.remaining_ipc_rawdata_bytes - bytes;
- }
- handle->read_cb((uv_stream_t*)handle, bytes, &buf);
- /* Read again only if bytes == buf.len */
- if (bytes <= buf.len) {
- break;
- }
- } else {
- uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
+ /* It is possible that more bytes were read than we thought were
+ * available. To prevent `avail` from underflowing, break out of the loop
+ * if this is the case. */
+ if (bytes_read > avail)
break;
- }
- }
- /* Post another 0-read if still reading and not closing. */
- if ((handle->flags & UV_HANDLE_READING) &&
- !(handle->flags & UV_HANDLE_READ_PENDING)) {
- uv_pipe_queue_read(loop, handle);
+ /* Recompute the number of bytes available. */
+ avail -= bytes_read;
}
}
- DECREASE_PENDING_REQ_COUNT(handle);
+ /* Start another zero-read request if necessary. */
+ if ((handle->flags & UV_HANDLE_READING) &&
+ !(handle->flags & UV_HANDLE_READ_PENDING)) {
+ uv_pipe_queue_read(loop, handle);
+ }
}
@@ -1712,17 +1868,19 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
}
}
- if (req->ipc_header) {
- if (req == &handle->pipe.conn.ipc_header_write_req) {
- req->type = UV_UNKNOWN_REQ;
- } else {
- uv__free(req);
- }
- } else {
- if (req->cb) {
- err = GET_REQ_ERROR(req);
- req->cb(req, uv_translate_sys_error(err));
- }
+ err = GET_REQ_ERROR(req);
+
+ /* If this was a coalesced write, extract pointer to the user_provided
+ * uv_write_t structure so we can pass the expected pointer to the callback,
+ * then free the heap-allocated write req. */
+ if (req->coalesced) {
+ uv__coalesced_write_t* coalesced_write =
+ container_of(req, uv__coalesced_write_t, req);
+ req = coalesced_write->user_req;
+ uv__free(coalesced_write);
+ }
+ if (req->cb) {
+ req->cb(req, uv_translate_sys_error(err));
}
handle->stream.conn.write_reqs_pending--;
@@ -1748,7 +1906,7 @@ void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
assert(handle->type == UV_NAMED_PIPE);
- if (handle->flags & UV__HANDLE_CLOSING) {
+ if (handle->flags & UV_HANDLE_CLOSING) {
/* The req->pipeHandle should be freed already in uv_pipe_cleanup(). */
assert(req->pipeHandle == INVALID_HANDLE_VALUE);
DECREASE_PENDING_REQ_COUNT(handle);
@@ -1768,7 +1926,7 @@ void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
CloseHandle(req->pipeHandle);
req->pipeHandle = INVALID_HANDLE_VALUE;
}
- if (!(handle->flags & UV__HANDLE_CLOSING)) {
+ if (!(handle->flags & UV_HANDLE_CLOSING)) {
uv_pipe_queue_accept(loop, handle, req, FALSE);
}
}
@@ -1806,19 +1964,19 @@ void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
UNREGISTER_HANDLE_REQ(loop, handle, req);
if (handle->flags & UV_HANDLE_READABLE) {
- /* Initialize and optionally start the eof timer. Only do this if the */
- /* pipe is readable and we haven't seen EOF come in ourselves. */
+ /* Initialize and optionally start the eof timer. Only do this if the pipe
+ * is readable and we haven't seen EOF come in ourselves. */
eof_timer_init(handle);
- /* If reading start the timer right now. */
- /* Otherwise uv_pipe_queue_read will start it. */
+ /* If reading start the timer right now. Otherwise uv_pipe_queue_read will
+ * start it. */
if (handle->flags & UV_HANDLE_READ_PENDING) {
eof_timer_start(handle);
}
} else {
- /* This pipe is not readable. We can just close it to let the other end */
- /* know that we're done writing. */
+ /* This pipe is not readable. We can just close it to let the other end
+ * know that we're done writing. */
close_pipe(handle);
}
@@ -1869,17 +2027,16 @@ static void eof_timer_cb(uv_timer_t* timer) {
assert(pipe->type == UV_NAMED_PIPE);
- /* This should always be true, since we start the timer only */
- /* in uv_pipe_queue_read after successfully calling ReadFile, */
- /* or in uv_process_pipe_shutdown_req if a read is pending, */
- /* and we always immediately stop the timer in */
- /* uv_process_pipe_read_req. */
+ /* This should always be true, since we start the timer only in
+ * uv_pipe_queue_read after successfully calling ReadFile, or in
+ * uv_process_pipe_shutdown_req if a read is pending, and we always
+ * immediately stop the timer in uv_process_pipe_read_req. */
assert(pipe->flags & UV_HANDLE_READ_PENDING);
- /* If there are many packets coming off the iocp then the timer callback */
- /* may be called before the read request is coming off the queue. */
- /* Therefore we check here if the read request has completed but will */
- /* be processed later. */
+ /* If there are many packets coming off the iocp then the timer callback may
+ * be called before the read request is coming off the queue. Therefore we
+ * check here if the read request has completed but will be processed later.
+ */
if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
return;
@@ -1888,12 +2045,12 @@ static void eof_timer_cb(uv_timer_t* timer) {
/* Force both ends off the pipe. */
close_pipe(pipe);
- /* Stop reading, so the pending read that is going to fail will */
- /* not be reported to the user. */
+ /* Stop reading, so the pending read that is going to fail will not be
+ * reported to the user. */
uv_read_stop((uv_stream_t*) pipe);
- /* Report the eof and update flags. This will get reported even if the */
- /* user stopped reading in the meantime. TODO: is that okay? */
+ /* Report the eof and update flags. This will get reported even if the user
+ * stopped reading in the meantime. TODO: is that okay? */
uv_pipe_read_eof(loop, pipe, uv_null_buf_);
}
@@ -1980,8 +2137,8 @@ int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
if (pipe->ipc) {
assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
- pipe->pipe.conn.ipc_pid = uv_os_getppid();
- assert(pipe->pipe.conn.ipc_pid != -1);
+ pipe->pipe.conn.ipc_remote_pid = uv_os_getppid();
+ assert(pipe->pipe.conn.ipc_remote_pid != (DWORD) -1);
}
return 0;
}
@@ -2006,7 +2163,15 @@ static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size)
return UV_EINVAL;
}
- uv__pipe_pause_read((uv_pipe_t*)handle); /* cast away const warning */
+ /* NtQueryInformationFile will block if another thread is performing a
+ * blocking operation on the queried handle. If the pipe handle is
+ * synchronous, there may be a worker thread currently calling ReadFile() on
+ * the pipe handle, which could cause a deadlock. To avoid this, interrupt
+ * the read. */
+ if (handle->flags & UV_HANDLE_CONNECTION &&
+ handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
+ uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */
+ }
nt_status = pNtQueryInformationFile(handle->handle,
&io_status,
@@ -2097,7 +2262,6 @@ error:
uv__free(name_info);
cleanup:
- uv__pipe_unpause_read((uv_pipe_t*)handle); /* cast away const warning */
return err;
}
@@ -2105,7 +2269,7 @@ cleanup:
int uv_pipe_pending_count(uv_pipe_t* handle) {
if (!handle->ipc)
return 0;
- return handle->pipe.conn.pending_ipc_info.queue_len;
+ return handle->pipe.conn.ipc_xfer_queue_length;
}
@@ -2138,14 +2302,14 @@ int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
if (!handle->ipc)
return UV_UNKNOWN_HANDLE;
- if (handle->pipe.conn.pending_ipc_info.queue_len == 0)
+ if (handle->pipe.conn.ipc_xfer_queue_length == 0)
return UV_UNKNOWN_HANDLE;
else
return UV_TCP;
}
int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
- SID_IDENTIFIER_AUTHORITY sid_world = SECURITY_WORLD_SID_AUTHORITY;
+ SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY };
PACL old_dacl, new_dacl;
PSECURITY_DESCRIPTOR sd;
EXPLICIT_ACCESS ea;
@@ -2180,7 +2344,7 @@ int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
error = GetLastError();
goto clean_sid;
}
-
+
memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
if (mode & UV_READABLE)
ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;