summaryrefslogtreecommitdiffstats
path: root/Utilities/cmlibuv/src/win/pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'Utilities/cmlibuv/src/win/pipe.c')
-rw-r--r--Utilities/cmlibuv/src/win/pipe.c556
1 files changed, 304 insertions, 252 deletions
diff --git a/Utilities/cmlibuv/src/win/pipe.c b/Utilities/cmlibuv/src/win/pipe.c
index 984b766..9984618 100644
--- a/Utilities/cmlibuv/src/win/pipe.c
+++ b/Utilities/cmlibuv/src/win/pipe.c
@@ -98,13 +98,13 @@ static void eof_timer_destroy(uv_pipe_t* pipe);
static void eof_timer_close_cb(uv_handle_t* handle);
-static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
+static void uv__unique_pipe_name(char* ptr, char* name, size_t size) {
snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
}
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
- uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
+ uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
handle->reqs_pending = 0;
handle->handle = INVALID_HANDLE_VALUE;
@@ -120,15 +120,11 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
}
-static void uv_pipe_connection_init(uv_pipe_t* handle) {
- uv_connection_init((uv_stream_t*) handle);
+static void uv__pipe_connection_init(uv_pipe_t* handle) {
+ assert(!(handle->flags & UV_HANDLE_PIPESERVER));
+ uv__connection_init((uv_stream_t*) handle);
handle->read_req.data = handle;
handle->pipe.conn.eof_timer = NULL;
- assert(!(handle->flags & UV_HANDLE_PIPESERVER));
- if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
- handle->pipe.conn.readfile_thread_handle = NULL;
- InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
- }
}
@@ -209,7 +205,7 @@ static int uv__pipe_server(
int err;
for (;;) {
- uv_unique_pipe_name(random, name, nameSize);
+ uv__unique_pipe_name(random, name, nameSize);
pipeHandle = CreateNamedPipeA(name,
access | FILE_FLAG_FIRST_PIPE_INSTANCE,
@@ -393,6 +389,8 @@ int uv__create_stdio_pipe_pair(uv_loop_t* loop,
unsigned int client_flags;
int err;
+ uv__pipe_connection_init(parent_pipe);
+
server_pipe = INVALID_HANDLE_VALUE;
client_pipe = INVALID_HANDLE_VALUE;
@@ -427,7 +425,6 @@ int uv__create_stdio_pipe_pair(uv_loop_t* loop,
goto error;
}
- uv_pipe_connection_init(parent_pipe);
parent_pipe->handle = server_pipe;
*child_pipe_ptr = client_pipe;
@@ -450,11 +447,11 @@ int uv__create_stdio_pipe_pair(uv_loop_t* loop,
}
-static int uv_set_pipe_handle(uv_loop_t* loop,
- uv_pipe_t* handle,
- HANDLE pipeHandle,
- int fd,
- DWORD duplex_flags) {
+static int uv__set_pipe_handle(uv_loop_t* loop,
+ uv_pipe_t* handle,
+ HANDLE pipeHandle,
+ int fd,
+ DWORD duplex_flags) {
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_MODE_INFORMATION mode_info;
@@ -462,7 +459,9 @@ static int uv_set_pipe_handle(uv_loop_t* loop,
DWORD current_mode = 0;
DWORD err = 0;
- if (handle->flags & UV_HANDLE_PIPESERVER)
+ assert(handle->flags & UV_HANDLE_CONNECTION);
+ assert(!(handle->flags & UV_HANDLE_PIPESERVER));
+ if (handle->flags & UV_HANDLE_CLOSING)
return UV_EINVAL;
if (handle->handle != INVALID_HANDLE_VALUE)
return UV_EBUSY;
@@ -478,18 +477,17 @@ static int uv_set_pipe_handle(uv_loop_t* loop,
*/
if (!GetNamedPipeHandleState(pipeHandle, &current_mode, NULL, NULL,
NULL, NULL, 0)) {
- return -1;
+ return uv_translate_sys_error(GetLastError());
} else if (current_mode & PIPE_NOWAIT) {
- SetLastError(ERROR_ACCESS_DENIED);
- return -1;
+ return UV_EACCES;
}
} else {
/* If this returns ERROR_INVALID_PARAMETER we probably opened
* something that is not a pipe. */
if (err == ERROR_INVALID_PARAMETER) {
- SetLastError(WSAENOTSOCK);
+ return UV_ENOTSOCK;
}
- return -1;
+ return uv_translate_sys_error(err);
}
}
@@ -500,13 +498,15 @@ static int uv_set_pipe_handle(uv_loop_t* loop,
sizeof(mode_info),
FileModeInformation);
if (nt_status != STATUS_SUCCESS) {
- return -1;
+ return uv_translate_sys_error(err);
}
if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
/* Non-overlapped pipe. */
handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
+ handle->pipe.conn.readfile_thread_handle = NULL;
+ InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
} else {
/* Overlapped pipe. Try to associate with IOCP. */
if (CreateIoCompletionPort(pipeHandle,
@@ -578,135 +578,109 @@ static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
}
-void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
- int err;
+void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) {
DWORD result;
- uv_shutdown_t* req;
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_PIPE_LOCAL_INFORMATION pipe_info;
- uv__ipc_xfer_queue_item_t* xfer_queue_item;
-
- if ((handle->flags & UV_HANDLE_CONNECTION) &&
- handle->stream.conn.shutdown_req != NULL &&
- handle->stream.conn.write_reqs_pending == 0) {
- req = handle->stream.conn.shutdown_req;
-
- /* Clear the shutdown_req field so we don't go here again. */
- handle->stream.conn.shutdown_req = NULL;
-
- if (handle->flags & UV_HANDLE_CLOSING) {
- UNREGISTER_HANDLE_REQ(loop, handle, req);
-
- /* Already closing. Cancel the shutdown. */
- if (req->cb) {
- req->cb(req, UV_ECANCELED);
- }
-
- DECREASE_PENDING_REQ_COUNT(handle);
- return;
- }
- /* Try to avoid flushing the pipe buffer in the thread pool. */
- nt_status = pNtQueryInformationFile(handle->handle,
- &io_status,
- &pipe_info,
- sizeof pipe_info,
- FilePipeLocalInformation);
-
- if (nt_status != STATUS_SUCCESS) {
- /* Failure */
- UNREGISTER_HANDLE_REQ(loop, handle, req);
-
- handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
- if (req->cb) {
- err = pRtlNtStatusToDosError(nt_status);
- req->cb(req, uv_translate_sys_error(err));
- }
-
- DECREASE_PENDING_REQ_COUNT(handle);
- return;
- }
+ assert(handle->flags & UV_HANDLE_CONNECTION);
+ assert(req != NULL);
+ assert(handle->stream.conn.write_reqs_pending == 0);
+ SET_REQ_SUCCESS(req);
- if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
- /* Short-circuit, no need to call FlushFileBuffers. */
- uv_insert_pending_req(loop, (uv_req_t*) req);
- return;
- }
+ if (handle->flags & UV_HANDLE_CLOSING) {
+ uv__insert_pending_req(loop, (uv_req_t*) req);
+ return;
+ }
- /* Run FlushFileBuffers in the thread pool. */
- result = QueueUserWorkItem(pipe_shutdown_thread_proc,
- req,
- WT_EXECUTELONGFUNCTION);
- if (result) {
- return;
+ /* Try to avoid flushing the pipe buffer in the thread pool. */
+ nt_status = pNtQueryInformationFile(handle->handle,
+ &io_status,
+ &pipe_info,
+ sizeof pipe_info,
+ FilePipeLocalInformation);
- } else {
- /* Failure. */
- UNREGISTER_HANDLE_REQ(loop, handle, req);
+ if (nt_status != STATUS_SUCCESS) {
+ SET_REQ_ERROR(req, pRtlNtStatusToDosError(nt_status));
+ handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
+ uv__insert_pending_req(loop, (uv_req_t*) req);
+ return;
+ }
- handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
- if (req->cb) {
- err = GetLastError();
- req->cb(req, uv_translate_sys_error(err));
- }
+ if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
+ /* Short-circuit, no need to call FlushFileBuffers:
+ * all writes have been read. */
+ uv__insert_pending_req(loop, (uv_req_t*) req);
+ return;
+ }
- DECREASE_PENDING_REQ_COUNT(handle);
- return;
- }
+ /* Run FlushFileBuffers in the thread pool. */
+ result = QueueUserWorkItem(pipe_shutdown_thread_proc,
+ req,
+ WT_EXECUTELONGFUNCTION);
+ if (!result) {
+ SET_REQ_ERROR(req, GetLastError());
+ handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
+ uv__insert_pending_req(loop, (uv_req_t*) req);
+ return;
}
+}
- if (handle->flags & UV_HANDLE_CLOSING &&
- handle->reqs_pending == 0) {
- assert(!(handle->flags & UV_HANDLE_CLOSED));
- if (handle->flags & UV_HANDLE_CONNECTION) {
- /* Free pending sockets */
- while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) {
- QUEUE* q;
- SOCKET socket;
+void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
+ uv__ipc_xfer_queue_item_t* xfer_queue_item;
- q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue);
- QUEUE_REMOVE(q);
- xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
+ assert(handle->reqs_pending == 0);
+ assert(handle->flags & UV_HANDLE_CLOSING);
+ assert(!(handle->flags & UV_HANDLE_CLOSED));
- /* Materialize socket and close it */
- socket = WSASocketW(FROM_PROTOCOL_INFO,
- FROM_PROTOCOL_INFO,
- FROM_PROTOCOL_INFO,
- &xfer_queue_item->xfer_info.socket_info,
- 0,
- WSA_FLAG_OVERLAPPED);
- uv__free(xfer_queue_item);
+ if (handle->flags & UV_HANDLE_CONNECTION) {
+ /* Free pending sockets */
+ while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) {
+ QUEUE* q;
+ SOCKET socket;
+
+ q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue);
+ QUEUE_REMOVE(q);
+ xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
+
+ /* Materialize socket and close it */
+ socket = WSASocketW(FROM_PROTOCOL_INFO,
+ FROM_PROTOCOL_INFO,
+ FROM_PROTOCOL_INFO,
+ &xfer_queue_item->xfer_info.socket_info,
+ 0,
+ WSA_FLAG_OVERLAPPED);
+ uv__free(xfer_queue_item);
+
+ if (socket != INVALID_SOCKET)
+ closesocket(socket);
+ }
+ handle->pipe.conn.ipc_xfer_queue_length = 0;
- if (socket != INVALID_SOCKET)
- closesocket(socket);
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
+ UnregisterWait(handle->read_req.wait_handle);
+ handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
}
- handle->pipe.conn.ipc_xfer_queue_length = 0;
-
- if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
- if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
- UnregisterWait(handle->read_req.wait_handle);
- handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
- }
- if (handle->read_req.event_handle != NULL) {
- CloseHandle(handle->read_req.event_handle);
- handle->read_req.event_handle = NULL;
- }
+ if (handle->read_req.event_handle != NULL) {
+ CloseHandle(handle->read_req.event_handle);
+ handle->read_req.event_handle = NULL;
}
-
- if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
- DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
}
- if (handle->flags & UV_HANDLE_PIPESERVER) {
- assert(handle->pipe.serv.accept_reqs);
- uv__free(handle->pipe.serv.accept_reqs);
- handle->pipe.serv.accept_reqs = NULL;
- }
+ if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
+ DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
+ }
- uv__handle_close(handle);
+ if (handle->flags & UV_HANDLE_PIPESERVER) {
+ assert(handle->pipe.serv.accept_reqs);
+ uv__free(handle->pipe.serv.accept_reqs);
+ handle->pipe.serv.accept_reqs = NULL;
}
+
+ uv__handle_close(handle);
}
@@ -731,7 +705,9 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
if (!name) {
return UV_EINVAL;
}
-
+ if (uv__is_closing(handle)) {
+ return UV_EINVAL;
+ }
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
handle->pipe.serv.pending_instances = default_pending_pipe_instances;
}
@@ -815,7 +791,7 @@ static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
assert(loop);
/* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait
- * for the pipe to become available with WaitNamedPipe. */
+ * up to 30 seconds for the pipe to become available with WaitNamedPipe. */
while (WaitNamedPipeW(handle->name, 30000)) {
/* The pipe is now available, try to connect. */
pipeHandle = open_named_pipe(handle->name, &duplex_flags);
@@ -825,9 +801,10 @@ static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
SwitchToThread();
}
- if (pipeHandle != INVALID_HANDLE_VALUE &&
- !uv_set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags)) {
+ if (pipeHandle != INVALID_HANDLE_VALUE) {
SET_REQ_SUCCESS(req);
+ req->u.connect.pipeHandle = pipeHandle;
+ req->u.connect.duplex_flags = duplex_flags;
} else {
SET_REQ_ERROR(req, GetLastError());
}
@@ -849,6 +826,18 @@ void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
UV_REQ_INIT(req, UV_CONNECT);
req->handle = (uv_stream_t*) handle;
req->cb = cb;
+ req->u.connect.pipeHandle = INVALID_HANDLE_VALUE;
+ req->u.connect.duplex_flags = 0;
+
+ if (handle->flags & UV_HANDLE_PIPESERVER) {
+ err = ERROR_INVALID_PARAMETER;
+ goto error;
+ }
+ if (handle->flags & UV_HANDLE_CONNECTION) {
+ err = ERROR_PIPE_BUSY;
+ goto error;
+ }
+ uv__pipe_connection_init(handle);
/* Convert name to UTF16. */
nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
@@ -888,19 +877,10 @@ void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
goto error;
}
- assert(pipeHandle != INVALID_HANDLE_VALUE);
-
- if (uv_set_pipe_handle(loop,
- (uv_pipe_t*) req->handle,
- pipeHandle,
- -1,
- duplex_flags)) {
- err = GetLastError();
- goto error;
- }
-
+ req->u.connect.pipeHandle = pipeHandle;
+ req->u.connect.duplex_flags = duplex_flags;
SET_REQ_SUCCESS(req);
- uv_insert_pending_req(loop, (uv_req_t*) req);
+ uv__insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
return;
@@ -916,7 +896,7 @@ error:
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, err);
- uv_insert_pending_req(loop, (uv_req_t*) req);
+ uv__insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
return;
@@ -937,7 +917,7 @@ void uv__pipe_interrupt_read(uv_pipe_t* handle) {
/* Cancel asynchronous read. */
r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
assert(r || GetLastError() == ERROR_NOT_FOUND);
-
+ (void) r;
} else {
/* Cancel synchronous read (which is happening in the thread pool). */
HANDLE thread;
@@ -973,17 +953,30 @@ void uv__pipe_interrupt_read(uv_pipe_t* handle) {
void uv__pipe_read_stop(uv_pipe_t* handle) {
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(handle->loop, handle);
-
uv__pipe_interrupt_read(handle);
}
/* Cleans up uv_pipe_t (server or connection) and all resources associated with
* it. */
-void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
+void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
int i;
HANDLE pipeHandle;
+ if (handle->flags & UV_HANDLE_READING) {
+ handle->flags &= ~UV_HANDLE_READING;
+ DECREASE_ACTIVE_COUNT(loop, handle);
+ }
+
+ if (handle->flags & UV_HANDLE_LISTENING) {
+ handle->flags &= ~UV_HANDLE_LISTENING;
+ DECREASE_ACTIVE_COUNT(loop, handle);
+ }
+
+ handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
+
+ uv__handle_closing(handle);
+
uv__pipe_interrupt_read(handle);
if (handle->name) {
@@ -1003,45 +996,27 @@ void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
}
if (handle->flags & UV_HANDLE_CONNECTION) {
- handle->flags &= ~UV_HANDLE_WRITABLE;
eof_timer_destroy(handle);
}
if ((handle->flags & UV_HANDLE_CONNECTION)
- && handle->handle != INVALID_HANDLE_VALUE)
+ && handle->handle != INVALID_HANDLE_VALUE) {
+ /* This will eventually destroy the write queue for us too. */
close_pipe(handle);
-}
-
-
-void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
- if (handle->flags & UV_HANDLE_READING) {
- handle->flags &= ~UV_HANDLE_READING;
- DECREASE_ACTIVE_COUNT(loop, handle);
- }
-
- if (handle->flags & UV_HANDLE_LISTENING) {
- handle->flags &= ~UV_HANDLE_LISTENING;
- DECREASE_ACTIVE_COUNT(loop, handle);
}
- uv_pipe_cleanup(loop, handle);
-
- if (handle->reqs_pending == 0) {
- uv_want_endgame(loop, (uv_handle_t*) handle);
- }
-
- handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
- uv__handle_closing(handle);
+ if (handle->reqs_pending == 0)
+ uv__want_endgame(loop, (uv_handle_t*) handle);
}
-static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
+static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
uv_pipe_accept_t* req, BOOL firstInstance) {
assert(handle->flags & UV_HANDLE_LISTENING);
if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
SET_REQ_ERROR(req, GetLastError());
- uv_insert_pending_req(loop, (uv_req_t*) req);
+ uv__insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
return;
}
@@ -1061,7 +1036,7 @@ static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, GetLastError());
}
- uv_insert_pending_req(loop, (uv_req_t*) req);
+ uv__insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
return;
}
@@ -1071,7 +1046,7 @@ static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
}
-int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
+int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
uv_loop_t* loop = server->loop;
uv_pipe_t* pipe_client;
uv_pipe_accept_t* req;
@@ -1099,6 +1074,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
} else {
pipe_client = (uv_pipe_t*) client;
+ uv__pipe_connection_init(pipe_client);
/* Find a connection instance that has been connected, but not yet
* accepted. */
@@ -1110,7 +1086,6 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
}
/* Initialize the client handle and copy the pipeHandle to the client */
- uv_pipe_connection_init(pipe_client);
pipe_client->handle = req->pipeHandle;
pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
@@ -1121,7 +1096,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
server->handle = INVALID_HANDLE_VALUE;
if (!(server->flags & UV_HANDLE_CLOSING)) {
- uv_pipe_queue_accept(loop, server, req, FALSE);
+ uv__pipe_queue_accept(loop, server, req, FALSE);
}
}
@@ -1130,7 +1105,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
/* Starts listening for connections for the given pipe. */
-int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
+int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
uv_loop_t* loop = handle->loop;
int i;
@@ -1162,7 +1137,7 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
- uv_pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
+ uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
}
return 0;
@@ -1306,7 +1281,7 @@ static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out
}
-static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
+static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
uv_read_t* req;
int result;
@@ -1365,15 +1340,15 @@ static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
return;
error:
- uv_insert_pending_req(loop, (uv_req_t*)req);
+ uv__insert_pending_req(loop, (uv_req_t*)req);
handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
}
-int uv_pipe_read_start(uv_pipe_t* handle,
- uv_alloc_cb alloc_cb,
- uv_read_cb read_cb) {
+int uv__pipe_read_start(uv_pipe_t* handle,
+ uv_alloc_cb alloc_cb,
+ uv_read_cb read_cb) {
uv_loop_t* loop = handle->loop;
handle->flags |= UV_HANDLE_READING;
@@ -1391,14 +1366,14 @@ int uv_pipe_read_start(uv_pipe_t* handle,
uv_fatal_error(GetLastError(), "CreateEvent");
}
}
- uv_pipe_queue_read(loop, handle);
+ uv__pipe_queue_read(loop, handle);
}
return 0;
}
-static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
+static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle,
uv_write_t* req) {
req->next_req = NULL;
if (handle->pipe.conn.non_overlapped_writes_tail) {
@@ -1434,7 +1409,7 @@ static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
}
-static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
+static void uv__queue_non_overlapped_write(uv_pipe_t* handle) {
uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
if (req) {
if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
@@ -1575,9 +1550,9 @@ static int uv__pipe_write_data(uv_loop_t* loop,
return 0;
} else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
req->write_buffer = write_buf;
- uv_insert_non_overlapped_write_req(handle, req);
+ uv__insert_non_overlapped_write_req(handle, req);
if (handle->stream.conn.write_reqs_pending == 0) {
- uv_queue_non_overlapped_write(handle);
+ uv__queue_non_overlapped_write(handle);
}
/* Request queued by the kernel. */
@@ -1790,7 +1765,7 @@ int uv__pipe_write(uv_loop_t* loop,
}
-static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
+static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
uv_buf_t buf) {
/* If there is an eof timer running, we don't need it any more, so discard
* it. */
@@ -1802,7 +1777,7 @@ static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
}
-static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
+static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
uv_buf_t buf) {
/* If there is an eof timer running, we don't need it any more, so discard
* it. */
@@ -1814,12 +1789,12 @@ static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
}
-static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
+static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
int error, uv_buf_t buf) {
if (error == ERROR_BROKEN_PIPE) {
- uv_pipe_read_eof(loop, handle, buf);
+ uv__pipe_read_eof(loop, handle, buf);
} else {
- uv_pipe_read_error(loop, handle, error, buf);
+ uv__pipe_read_error(loop, handle, error, buf);
}
}
@@ -1890,7 +1865,7 @@ static DWORD uv__pipe_read_data(uv_loop_t* loop,
/* Read into the user buffer. */
if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
- uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
+ uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
return 0; /* Break out of read loop. */
}
@@ -1977,14 +1952,14 @@ invalid:
err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
error:
- uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
+ uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
return 0; /* Break out of read loop. */
}
-void uv_process_pipe_read_req(uv_loop_t* loop,
- uv_pipe_t* handle,
- uv_req_t* req) {
+void uv__process_pipe_read_req(uv_loop_t* loop,
+ uv_pipe_t* handle,
+ uv_req_t* req) {
assert(handle->type == UV_NAMED_PIPE);
handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
@@ -2005,7 +1980,7 @@ void uv_process_pipe_read_req(uv_loop_t* loop,
* indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
* the user; we'll start a new zero-read at the end of this function. */
if (err != ERROR_OPERATION_ABORTED)
- uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
+ uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
} else {
/* The zero-read completed without error, indicating there is data
@@ -2015,7 +1990,7 @@ void uv_process_pipe_read_req(uv_loop_t* loop,
/* Get the number of bytes available. */
avail = 0;
if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
- uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
+ uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
/* Read until we've either read all the bytes available, or the 'reading'
* flag is cleared. */
@@ -2044,12 +2019,12 @@ void uv_process_pipe_read_req(uv_loop_t* loop,
/* Start another zero-read request if necessary. */
if ((handle->flags & UV_HANDLE_READING) &&
!(handle->flags & UV_HANDLE_READ_PENDING)) {
- uv_pipe_queue_read(loop, handle);
+ uv__pipe_queue_read(loop, handle);
}
}
-void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
+void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_write_t* req) {
int err;
@@ -2091,26 +2066,25 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
handle->pipe.conn.non_overlapped_writes_tail) {
assert(handle->stream.conn.write_reqs_pending > 0);
- uv_queue_non_overlapped_write(handle);
+ uv__queue_non_overlapped_write(handle);
}
- if (handle->stream.conn.shutdown_req != NULL &&
- handle->stream.conn.write_reqs_pending == 0) {
- uv_want_endgame(loop, (uv_handle_t*)handle);
- }
+ if (handle->stream.conn.write_reqs_pending == 0)
+ if (handle->flags & UV_HANDLE_SHUTTING)
+ uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req);
DECREASE_PENDING_REQ_COUNT(handle);
}
-void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
+void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_req_t* raw_req) {
uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
assert(handle->type == UV_NAMED_PIPE);
if (handle->flags & UV_HANDLE_CLOSING) {
- /* The req->pipeHandle should be freed already in uv_pipe_cleanup(). */
+ /* The req->pipeHandle should be freed already in uv__pipe_close(). */
assert(req->pipeHandle == INVALID_HANDLE_VALUE);
DECREASE_PENDING_REQ_COUNT(handle);
return;
@@ -2130,7 +2104,7 @@ void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
req->pipeHandle = INVALID_HANDLE_VALUE;
}
if (!(handle->flags & UV_HANDLE_CLOSING)) {
- uv_pipe_queue_accept(loop, handle, req, FALSE);
+ uv__pipe_queue_accept(loop, handle, req, FALSE);
}
}
@@ -2138,54 +2112,74 @@ void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
}
-void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
+void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_connect_t* req) {
+ HANDLE pipeHandle;
+ DWORD duplex_flags;
int err;
assert(handle->type == UV_NAMED_PIPE);
UNREGISTER_HANDLE_REQ(loop, handle, req);
- if (req->cb) {
- err = 0;
- if (REQ_SUCCESS(req)) {
- uv_pipe_connection_init(handle);
- } else {
- err = GET_REQ_ERROR(req);
- }
- req->cb(req, uv_translate_sys_error(err));
+ err = 0;
+ if (REQ_SUCCESS(req)) {
+ pipeHandle = req->u.connect.pipeHandle;
+ duplex_flags = req->u.connect.duplex_flags;
+ err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags);
+ if (err)
+ CloseHandle(pipeHandle);
+ } else {
+ err = uv_translate_sys_error(GET_REQ_ERROR(req));
}
+ if (req->cb)
+ req->cb(req, err);
+
DECREASE_PENDING_REQ_COUNT(handle);
}
-void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
+
+void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_shutdown_t* req) {
+ int err;
+
assert(handle->type == UV_NAMED_PIPE);
+ /* Clear the shutdown_req field so we don't go here again. */
+ handle->stream.conn.shutdown_req = NULL;
+ handle->flags &= ~UV_HANDLE_SHUTTING;
UNREGISTER_HANDLE_REQ(loop, handle, req);
- if (handle->flags & UV_HANDLE_READABLE) {
- /* Initialize and optionally start the eof timer. Only do this if the pipe
- * is readable and we haven't seen EOF come in ourselves. */
- eof_timer_init(handle);
+ if (handle->flags & UV_HANDLE_CLOSING) {
+ /* Already closing. Cancel the shutdown. */
+ err = UV_ECANCELED;
+ } else if (!REQ_SUCCESS(req)) {
+ /* An error occurred in trying to shutdown gracefully. */
+ err = uv_translate_sys_error(GET_REQ_ERROR(req));
+ } else {
+ if (handle->flags & UV_HANDLE_READABLE) {
+ /* Initialize and optionally start the eof timer. Only do this if the pipe
+ * is readable and we haven't seen EOF come in ourselves. */
+ eof_timer_init(handle);
+
+ /* If reading start the timer right now. Otherwise uv__pipe_queue_read will
+ * start it. */
+ if (handle->flags & UV_HANDLE_READ_PENDING) {
+ eof_timer_start(handle);
+ }
- /* If reading start the timer right now. Otherwise uv_pipe_queue_read will
- * start it. */
- if (handle->flags & UV_HANDLE_READ_PENDING) {
- eof_timer_start(handle);
+ } else {
+ /* This pipe is not readable. We can just close it to let the other end
+ * know that we're done writing. */
+ close_pipe(handle);
}
-
- } else {
- /* This pipe is not readable. We can just close it to let the other end
- * know that we're done writing. */
- close_pipe(handle);
+ err = 0;
}
- if (req->cb) {
- req->cb(req, 0);
- }
+ if (req->cb)
+ req->cb(req, err);
DECREASE_PENDING_REQ_COUNT(handle);
}
@@ -2200,7 +2194,8 @@ static void eof_timer_init(uv_pipe_t* pipe) {
pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
- assert(r == 0); /* timers can't fail */
+ assert(r == 0); /* timers can't fail */
+ (void) r;
pipe->pipe.conn.eof_timer->data = pipe;
uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
}
@@ -2231,9 +2226,9 @@ static void eof_timer_cb(uv_timer_t* timer) {
assert(pipe->type == UV_NAMED_PIPE);
/* This should always be true, since we start the timer only in
- * uv_pipe_queue_read after successfully calling ReadFile, or in
- * uv_process_pipe_shutdown_req if a read is pending, and we always
- * immediately stop the timer in uv_process_pipe_read_req. */
+ * uv__pipe_queue_read after successfully calling ReadFile, or in
+ * uv__process_pipe_shutdown_req if a read is pending, and we always
+ * immediately stop the timer in uv__process_pipe_read_req. */
assert(pipe->flags & UV_HANDLE_READ_PENDING);
/* If there are many packets coming off the iocp then the timer callback may
@@ -2254,7 +2249,7 @@ static void eof_timer_cb(uv_timer_t* timer) {
/* Report the eof and update flags. This will get reported even if the user
* stopped reading in the meantime. TODO: is that okay? */
- uv_pipe_read_eof(loop, pipe, uv_null_buf_);
+ uv__pipe_read_eof(loop, pipe, uv_null_buf_);
}
@@ -2280,10 +2275,16 @@ int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
IO_STATUS_BLOCK io_status;
FILE_ACCESS_INFORMATION access;
DWORD duplex_flags = 0;
+ int err;
if (os_handle == INVALID_HANDLE_VALUE)
return UV_EBADF;
+ if (pipe->flags & UV_HANDLE_PIPESERVER)
+ return UV_EINVAL;
+ if (pipe->flags & UV_HANDLE_CONNECTION)
+ return UV_EBUSY;
+ uv__pipe_connection_init(pipe);
uv__once_init();
/* In order to avoid closing a stdio file descriptor 0-2, duplicate the
* underlying OS handle and forget about the original fd.
@@ -2300,6 +2301,7 @@ int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
FALSE,
DUPLICATE_SAME_ACCESS))
return uv_translate_sys_error(GetLastError());
+ assert(os_handle != INVALID_HANDLE_VALUE);
file = -1;
}
@@ -2327,17 +2329,17 @@ int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
if (access.AccessFlags & FILE_READ_DATA)
duplex_flags |= UV_HANDLE_READABLE;
- if (os_handle == INVALID_HANDLE_VALUE ||
- uv_set_pipe_handle(pipe->loop,
- pipe,
- os_handle,
- file,
- duplex_flags) == -1) {
- return UV_EINVAL;
+ err = uv__set_pipe_handle(pipe->loop,
+ pipe,
+ os_handle,
+ file,
+ duplex_flags);
+ if (err) {
+ if (file == -1)
+ CloseHandle(os_handle);
+ return err;
}
- uv_pipe_connection_init(pipe);
-
if (pipe->ipc) {
assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
pipe->pipe.conn.ipc_remote_pid = uv_os_getppid();
@@ -2361,6 +2363,51 @@ static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size)
uv__once_init();
name_info = NULL;
+ if (handle->name != NULL) {
+ /* The user might try to query the name before we are connected,
+ * and this is just easier to return the cached value if we have it. */
+ name_buf = handle->name;
+ name_len = wcslen(name_buf);
+
+ /* check how much space we need */
+ addrlen = WideCharToMultiByte(CP_UTF8,
+ 0,
+ name_buf,
+ name_len,
+ NULL,
+ 0,
+ NULL,
+ NULL);
+ if (!addrlen) {
+ *size = 0;
+ err = uv_translate_sys_error(GetLastError());
+ return err;
+ } else if (addrlen >= *size) {
+ *size = addrlen + 1;
+ err = UV_ENOBUFS;
+ goto error;
+ }
+
+ addrlen = WideCharToMultiByte(CP_UTF8,
+ 0,
+ name_buf,
+ name_len,
+ buffer,
+ addrlen,
+ NULL,
+ NULL);
+ if (!addrlen) {
+ *size = 0;
+ err = uv_translate_sys_error(GetLastError());
+ return err;
+ }
+
+ *size = addrlen;
+ buffer[addrlen] = '\0';
+
+ return 0;
+ }
+
if (handle->handle == INVALID_HANDLE_VALUE) {
*size = 0;
return UV_EINVAL;
@@ -2498,6 +2545,11 @@ int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
if (handle->handle != INVALID_HANDLE_VALUE)
return uv__pipe_getname(handle, buffer, size);
+ if (handle->flags & UV_HANDLE_CONNECTION) {
+ if (handle->name != NULL)
+ return uv__pipe_getname(handle, buffer, size);
+ }
+
return UV_EBADF;
}