diff options
Diffstat (limited to 'Utilities/cmlibuv/src/win/pipe.c')
-rw-r--r-- | Utilities/cmlibuv/src/win/pipe.c | 556 |
1 files changed, 304 insertions, 252 deletions
diff --git a/Utilities/cmlibuv/src/win/pipe.c b/Utilities/cmlibuv/src/win/pipe.c index 984b766..9984618 100644 --- a/Utilities/cmlibuv/src/win/pipe.c +++ b/Utilities/cmlibuv/src/win/pipe.c @@ -98,13 +98,13 @@ static void eof_timer_destroy(uv_pipe_t* pipe); static void eof_timer_close_cb(uv_handle_t* handle); -static void uv_unique_pipe_name(char* ptr, char* name, size_t size) { +static void uv__unique_pipe_name(char* ptr, char* name, size_t size) { snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId()); } int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { - uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); + uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); handle->reqs_pending = 0; handle->handle = INVALID_HANDLE_VALUE; @@ -120,15 +120,11 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { } -static void uv_pipe_connection_init(uv_pipe_t* handle) { - uv_connection_init((uv_stream_t*) handle); +static void uv__pipe_connection_init(uv_pipe_t* handle) { + assert(!(handle->flags & UV_HANDLE_PIPESERVER)); + uv__connection_init((uv_stream_t*) handle); handle->read_req.data = handle; handle->pipe.conn.eof_timer = NULL; - assert(!(handle->flags & UV_HANDLE_PIPESERVER)); - if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { - handle->pipe.conn.readfile_thread_handle = NULL; - InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock); - } } @@ -209,7 +205,7 @@ static int uv__pipe_server( int err; for (;;) { - uv_unique_pipe_name(random, name, nameSize); + uv__unique_pipe_name(random, name, nameSize); pipeHandle = CreateNamedPipeA(name, access | FILE_FLAG_FIRST_PIPE_INSTANCE, @@ -393,6 +389,8 @@ int uv__create_stdio_pipe_pair(uv_loop_t* loop, unsigned int client_flags; int err; + uv__pipe_connection_init(parent_pipe); + server_pipe = INVALID_HANDLE_VALUE; client_pipe = INVALID_HANDLE_VALUE; @@ -427,7 +425,6 @@ int uv__create_stdio_pipe_pair(uv_loop_t* loop, goto error; } - uv_pipe_connection_init(parent_pipe); parent_pipe->handle = server_pipe; *child_pipe_ptr = client_pipe; @@ -450,11 +447,11 @@ int uv__create_stdio_pipe_pair(uv_loop_t* loop, } -static int uv_set_pipe_handle(uv_loop_t* loop, - uv_pipe_t* handle, - HANDLE pipeHandle, - int fd, - DWORD duplex_flags) { +static int uv__set_pipe_handle(uv_loop_t* loop, + uv_pipe_t* handle, + HANDLE pipeHandle, + int fd, + DWORD duplex_flags) { NTSTATUS nt_status; IO_STATUS_BLOCK io_status; FILE_MODE_INFORMATION mode_info; @@ -462,7 +459,9 @@ static int uv_set_pipe_handle(uv_loop_t* loop, DWORD current_mode = 0; DWORD err = 0; - if (handle->flags & UV_HANDLE_PIPESERVER) + assert(handle->flags & UV_HANDLE_CONNECTION); + assert(!(handle->flags & UV_HANDLE_PIPESERVER)); + if (handle->flags & UV_HANDLE_CLOSING) return UV_EINVAL; if (handle->handle != INVALID_HANDLE_VALUE) return UV_EBUSY; @@ -478,18 +477,17 @@ static int uv_set_pipe_handle(uv_loop_t* loop, */ if (!GetNamedPipeHandleState(pipeHandle, ¤t_mode, NULL, NULL, NULL, NULL, 0)) { - return -1; + return uv_translate_sys_error(GetLastError()); } else if (current_mode & PIPE_NOWAIT) { - SetLastError(ERROR_ACCESS_DENIED); - return -1; + return UV_EACCES; } } else { /* If this returns ERROR_INVALID_PARAMETER we probably opened * something that is not a pipe. */ if (err == ERROR_INVALID_PARAMETER) { - SetLastError(WSAENOTSOCK); + return UV_ENOTSOCK; } - return -1; + return uv_translate_sys_error(err); } } @@ -500,13 +498,15 @@ static int uv_set_pipe_handle(uv_loop_t* loop, sizeof(mode_info), FileModeInformation); if (nt_status != STATUS_SUCCESS) { - return -1; + return uv_translate_sys_error(err); } if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT || mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) { /* Non-overlapped pipe. */ handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE; + handle->pipe.conn.readfile_thread_handle = NULL; + InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock); } else { /* Overlapped pipe. Try to associate with IOCP. */ if (CreateIoCompletionPort(pipeHandle, @@ -578,135 +578,109 @@ static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) { } -void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { - int err; +void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) { DWORD result; - uv_shutdown_t* req; NTSTATUS nt_status; IO_STATUS_BLOCK io_status; FILE_PIPE_LOCAL_INFORMATION pipe_info; - uv__ipc_xfer_queue_item_t* xfer_queue_item; - - if ((handle->flags & UV_HANDLE_CONNECTION) && - handle->stream.conn.shutdown_req != NULL && - handle->stream.conn.write_reqs_pending == 0) { - req = handle->stream.conn.shutdown_req; - - /* Clear the shutdown_req field so we don't go here again. */ - handle->stream.conn.shutdown_req = NULL; - - if (handle->flags & UV_HANDLE_CLOSING) { - UNREGISTER_HANDLE_REQ(loop, handle, req); - - /* Already closing. Cancel the shutdown. */ - if (req->cb) { - req->cb(req, UV_ECANCELED); - } - - DECREASE_PENDING_REQ_COUNT(handle); - return; - } - /* Try to avoid flushing the pipe buffer in the thread pool. */ - nt_status = pNtQueryInformationFile(handle->handle, - &io_status, - &pipe_info, - sizeof pipe_info, - FilePipeLocalInformation); - - if (nt_status != STATUS_SUCCESS) { - /* Failure */ - UNREGISTER_HANDLE_REQ(loop, handle, req); - - handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */ - if (req->cb) { - err = pRtlNtStatusToDosError(nt_status); - req->cb(req, uv_translate_sys_error(err)); - } - - DECREASE_PENDING_REQ_COUNT(handle); - return; - } + assert(handle->flags & UV_HANDLE_CONNECTION); + assert(req != NULL); + assert(handle->stream.conn.write_reqs_pending == 0); + SET_REQ_SUCCESS(req); - if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) { - /* Short-circuit, no need to call FlushFileBuffers. */ - uv_insert_pending_req(loop, (uv_req_t*) req); - return; - } + if (handle->flags & UV_HANDLE_CLOSING) { + uv__insert_pending_req(loop, (uv_req_t*) req); + return; + } - /* Run FlushFileBuffers in the thread pool. */ - result = QueueUserWorkItem(pipe_shutdown_thread_proc, - req, - WT_EXECUTELONGFUNCTION); - if (result) { - return; + /* Try to avoid flushing the pipe buffer in the thread pool. */ + nt_status = pNtQueryInformationFile(handle->handle, + &io_status, + &pipe_info, + sizeof pipe_info, + FilePipeLocalInformation); - } else { - /* Failure. */ - UNREGISTER_HANDLE_REQ(loop, handle, req); + if (nt_status != STATUS_SUCCESS) { + SET_REQ_ERROR(req, pRtlNtStatusToDosError(nt_status)); + handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */ + uv__insert_pending_req(loop, (uv_req_t*) req); + return; + } - handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */ - if (req->cb) { - err = GetLastError(); - req->cb(req, uv_translate_sys_error(err)); - } + if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) { + /* Short-circuit, no need to call FlushFileBuffers: + * all writes have been read. */ + uv__insert_pending_req(loop, (uv_req_t*) req); + return; + } - DECREASE_PENDING_REQ_COUNT(handle); - return; - } + /* Run FlushFileBuffers in the thread pool. */ + result = QueueUserWorkItem(pipe_shutdown_thread_proc, + req, + WT_EXECUTELONGFUNCTION); + if (!result) { + SET_REQ_ERROR(req, GetLastError()); + handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */ + uv__insert_pending_req(loop, (uv_req_t*) req); + return; } +} - if (handle->flags & UV_HANDLE_CLOSING && - handle->reqs_pending == 0) { - assert(!(handle->flags & UV_HANDLE_CLOSED)); - if (handle->flags & UV_HANDLE_CONNECTION) { - /* Free pending sockets */ - while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) { - QUEUE* q; - SOCKET socket; +void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { + uv__ipc_xfer_queue_item_t* xfer_queue_item; - q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue); - QUEUE_REMOVE(q); - xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member); + assert(handle->reqs_pending == 0); + assert(handle->flags & UV_HANDLE_CLOSING); + assert(!(handle->flags & UV_HANDLE_CLOSED)); - /* Materialize socket and close it */ - socket = WSASocketW(FROM_PROTOCOL_INFO, - FROM_PROTOCOL_INFO, - FROM_PROTOCOL_INFO, - &xfer_queue_item->xfer_info.socket_info, - 0, - WSA_FLAG_OVERLAPPED); - uv__free(xfer_queue_item); + if (handle->flags & UV_HANDLE_CONNECTION) { + /* Free pending sockets */ + while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) { + QUEUE* q; + SOCKET socket; + + q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue); + QUEUE_REMOVE(q); + xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member); + + /* Materialize socket and close it */ + socket = WSASocketW(FROM_PROTOCOL_INFO, + FROM_PROTOCOL_INFO, + FROM_PROTOCOL_INFO, + &xfer_queue_item->xfer_info.socket_info, + 0, + WSA_FLAG_OVERLAPPED); + uv__free(xfer_queue_item); + + if (socket != INVALID_SOCKET) + closesocket(socket); + } + handle->pipe.conn.ipc_xfer_queue_length = 0; - if (socket != INVALID_SOCKET) - closesocket(socket); + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) { + UnregisterWait(handle->read_req.wait_handle); + handle->read_req.wait_handle = INVALID_HANDLE_VALUE; } - handle->pipe.conn.ipc_xfer_queue_length = 0; - - if (handle->flags & UV_HANDLE_EMULATE_IOCP) { - if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) { - UnregisterWait(handle->read_req.wait_handle); - handle->read_req.wait_handle = INVALID_HANDLE_VALUE; - } - if (handle->read_req.event_handle != NULL) { - CloseHandle(handle->read_req.event_handle); - handle->read_req.event_handle = NULL; - } + if (handle->read_req.event_handle != NULL) { + CloseHandle(handle->read_req.event_handle); + handle->read_req.event_handle = NULL; } - - if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) - DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock); } - if (handle->flags & UV_HANDLE_PIPESERVER) { - assert(handle->pipe.serv.accept_reqs); - uv__free(handle->pipe.serv.accept_reqs); - handle->pipe.serv.accept_reqs = NULL; - } + if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) + DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock); + } - uv__handle_close(handle); + if (handle->flags & UV_HANDLE_PIPESERVER) { + assert(handle->pipe.serv.accept_reqs); + uv__free(handle->pipe.serv.accept_reqs); + handle->pipe.serv.accept_reqs = NULL; } + + uv__handle_close(handle); } @@ -731,7 +705,9 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) { if (!name) { return UV_EINVAL; } - + if (uv__is_closing(handle)) { + return UV_EINVAL; + } if (!(handle->flags & UV_HANDLE_PIPESERVER)) { handle->pipe.serv.pending_instances = default_pending_pipe_instances; } @@ -815,7 +791,7 @@ static DWORD WINAPI pipe_connect_thread_proc(void* parameter) { assert(loop); /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait - * for the pipe to become available with WaitNamedPipe. */ + * up to 30 seconds for the pipe to become available with WaitNamedPipe. */ while (WaitNamedPipeW(handle->name, 30000)) { /* The pipe is now available, try to connect. */ pipeHandle = open_named_pipe(handle->name, &duplex_flags); @@ -825,9 +801,10 @@ static DWORD WINAPI pipe_connect_thread_proc(void* parameter) { SwitchToThread(); } - if (pipeHandle != INVALID_HANDLE_VALUE && - !uv_set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags)) { + if (pipeHandle != INVALID_HANDLE_VALUE) { SET_REQ_SUCCESS(req); + req->u.connect.pipeHandle = pipeHandle; + req->u.connect.duplex_flags = duplex_flags; } else { SET_REQ_ERROR(req, GetLastError()); } @@ -849,6 +826,18 @@ void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, UV_REQ_INIT(req, UV_CONNECT); req->handle = (uv_stream_t*) handle; req->cb = cb; + req->u.connect.pipeHandle = INVALID_HANDLE_VALUE; + req->u.connect.duplex_flags = 0; + + if (handle->flags & UV_HANDLE_PIPESERVER) { + err = ERROR_INVALID_PARAMETER; + goto error; + } + if (handle->flags & UV_HANDLE_CONNECTION) { + err = ERROR_PIPE_BUSY; + goto error; + } + uv__pipe_connection_init(handle); /* Convert name to UTF16. */ nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR); @@ -888,19 +877,10 @@ void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, goto error; } - assert(pipeHandle != INVALID_HANDLE_VALUE); - - if (uv_set_pipe_handle(loop, - (uv_pipe_t*) req->handle, - pipeHandle, - -1, - duplex_flags)) { - err = GetLastError(); - goto error; - } - + req->u.connect.pipeHandle = pipeHandle; + req->u.connect.duplex_flags = duplex_flags; SET_REQ_SUCCESS(req); - uv_insert_pending_req(loop, (uv_req_t*) req); + uv__insert_pending_req(loop, (uv_req_t*) req); handle->reqs_pending++; REGISTER_HANDLE_REQ(loop, handle, req); return; @@ -916,7 +896,7 @@ error: /* Make this req pending reporting an error. */ SET_REQ_ERROR(req, err); - uv_insert_pending_req(loop, (uv_req_t*) req); + uv__insert_pending_req(loop, (uv_req_t*) req); handle->reqs_pending++; REGISTER_HANDLE_REQ(loop, handle, req); return; @@ -937,7 +917,7 @@ void uv__pipe_interrupt_read(uv_pipe_t* handle) { /* Cancel asynchronous read. */ r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped); assert(r || GetLastError() == ERROR_NOT_FOUND); - + (void) r; } else { /* Cancel synchronous read (which is happening in the thread pool). */ HANDLE thread; @@ -973,17 +953,30 @@ void uv__pipe_interrupt_read(uv_pipe_t* handle) { void uv__pipe_read_stop(uv_pipe_t* handle) { handle->flags &= ~UV_HANDLE_READING; DECREASE_ACTIVE_COUNT(handle->loop, handle); - uv__pipe_interrupt_read(handle); } /* Cleans up uv_pipe_t (server or connection) and all resources associated with * it. */ -void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) { +void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) { int i; HANDLE pipeHandle; + if (handle->flags & UV_HANDLE_READING) { + handle->flags &= ~UV_HANDLE_READING; + DECREASE_ACTIVE_COUNT(loop, handle); + } + + if (handle->flags & UV_HANDLE_LISTENING) { + handle->flags &= ~UV_HANDLE_LISTENING; + DECREASE_ACTIVE_COUNT(loop, handle); + } + + handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); + + uv__handle_closing(handle); + uv__pipe_interrupt_read(handle); if (handle->name) { @@ -1003,45 +996,27 @@ void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) { } if (handle->flags & UV_HANDLE_CONNECTION) { - handle->flags &= ~UV_HANDLE_WRITABLE; eof_timer_destroy(handle); } if ((handle->flags & UV_HANDLE_CONNECTION) - && handle->handle != INVALID_HANDLE_VALUE) + && handle->handle != INVALID_HANDLE_VALUE) { + /* This will eventually destroy the write queue for us too. */ close_pipe(handle); -} - - -void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) { - if (handle->flags & UV_HANDLE_READING) { - handle->flags &= ~UV_HANDLE_READING; - DECREASE_ACTIVE_COUNT(loop, handle); - } - - if (handle->flags & UV_HANDLE_LISTENING) { - handle->flags &= ~UV_HANDLE_LISTENING; - DECREASE_ACTIVE_COUNT(loop, handle); } - uv_pipe_cleanup(loop, handle); - - if (handle->reqs_pending == 0) { - uv_want_endgame(loop, (uv_handle_t*) handle); - } - - handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); - uv__handle_closing(handle); + if (handle->reqs_pending == 0) + uv__want_endgame(loop, (uv_handle_t*) handle); } -static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, +static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, uv_pipe_accept_t* req, BOOL firstInstance) { assert(handle->flags & UV_HANDLE_LISTENING); if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) { SET_REQ_ERROR(req, GetLastError()); - uv_insert_pending_req(loop, (uv_req_t*) req); + uv__insert_pending_req(loop, (uv_req_t*) req); handle->reqs_pending++; return; } @@ -1061,7 +1036,7 @@ static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, /* Make this req pending reporting an error. */ SET_REQ_ERROR(req, GetLastError()); } - uv_insert_pending_req(loop, (uv_req_t*) req); + uv__insert_pending_req(loop, (uv_req_t*) req); handle->reqs_pending++; return; } @@ -1071,7 +1046,7 @@ static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, } -int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { +int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) { uv_loop_t* loop = server->loop; uv_pipe_t* pipe_client; uv_pipe_accept_t* req; @@ -1099,6 +1074,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { } else { pipe_client = (uv_pipe_t*) client; + uv__pipe_connection_init(pipe_client); /* Find a connection instance that has been connected, but not yet * accepted. */ @@ -1110,7 +1086,6 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { } /* Initialize the client handle and copy the pipeHandle to the client */ - uv_pipe_connection_init(pipe_client); pipe_client->handle = req->pipeHandle; pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE; @@ -1121,7 +1096,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { server->handle = INVALID_HANDLE_VALUE; if (!(server->flags & UV_HANDLE_CLOSING)) { - uv_pipe_queue_accept(loop, server, req, FALSE); + uv__pipe_queue_accept(loop, server, req, FALSE); } } @@ -1130,7 +1105,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { /* Starts listening for connections for the given pipe. */ -int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { +int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { uv_loop_t* loop = handle->loop; int i; @@ -1162,7 +1137,7 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); for (i = 0; i < handle->pipe.serv.pending_instances; i++) { - uv_pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0); + uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0); } return 0; @@ -1306,7 +1281,7 @@ static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out } -static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { +static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { uv_read_t* req; int result; @@ -1365,15 +1340,15 @@ static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { return; error: - uv_insert_pending_req(loop, (uv_req_t*)req); + uv__insert_pending_req(loop, (uv_req_t*)req); handle->flags |= UV_HANDLE_READ_PENDING; handle->reqs_pending++; } -int uv_pipe_read_start(uv_pipe_t* handle, - uv_alloc_cb alloc_cb, - uv_read_cb read_cb) { +int uv__pipe_read_start(uv_pipe_t* handle, + uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { uv_loop_t* loop = handle->loop; handle->flags |= UV_HANDLE_READING; @@ -1391,14 +1366,14 @@ int uv_pipe_read_start(uv_pipe_t* handle, uv_fatal_error(GetLastError(), "CreateEvent"); } } - uv_pipe_queue_read(loop, handle); + uv__pipe_queue_read(loop, handle); } return 0; } -static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle, +static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle, uv_write_t* req) { req->next_req = NULL; if (handle->pipe.conn.non_overlapped_writes_tail) { @@ -1434,7 +1409,7 @@ static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) { } -static void uv_queue_non_overlapped_write(uv_pipe_t* handle) { +static void uv__queue_non_overlapped_write(uv_pipe_t* handle) { uv_write_t* req = uv_remove_non_overlapped_write_req(handle); if (req) { if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc, @@ -1575,9 +1550,9 @@ static int uv__pipe_write_data(uv_loop_t* loop, return 0; } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { req->write_buffer = write_buf; - uv_insert_non_overlapped_write_req(handle, req); + uv__insert_non_overlapped_write_req(handle, req); if (handle->stream.conn.write_reqs_pending == 0) { - uv_queue_non_overlapped_write(handle); + uv__queue_non_overlapped_write(handle); } /* Request queued by the kernel. */ @@ -1790,7 +1765,7 @@ int uv__pipe_write(uv_loop_t* loop, } -static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, +static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, uv_buf_t buf) { /* If there is an eof timer running, we don't need it any more, so discard * it. */ @@ -1802,7 +1777,7 @@ static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, } -static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, +static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, uv_buf_t buf) { /* If there is an eof timer running, we don't need it any more, so discard * it. */ @@ -1814,12 +1789,12 @@ static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, } -static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle, +static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle, int error, uv_buf_t buf) { if (error == ERROR_BROKEN_PIPE) { - uv_pipe_read_eof(loop, handle, buf); + uv__pipe_read_eof(loop, handle, buf); } else { - uv_pipe_read_error(loop, handle, error, buf); + uv__pipe_read_error(loop, handle, error, buf); } } @@ -1890,7 +1865,7 @@ static DWORD uv__pipe_read_data(uv_loop_t* loop, /* Read into the user buffer. */ if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) { - uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf); + uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf); return 0; /* Break out of read loop. */ } @@ -1977,14 +1952,14 @@ invalid: err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */ error: - uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_); + uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_); return 0; /* Break out of read loop. */ } -void uv_process_pipe_read_req(uv_loop_t* loop, - uv_pipe_t* handle, - uv_req_t* req) { +void uv__process_pipe_read_req(uv_loop_t* loop, + uv_pipe_t* handle, + uv_req_t* req) { assert(handle->type == UV_NAMED_PIPE); handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING); @@ -2005,7 +1980,7 @@ void uv_process_pipe_read_req(uv_loop_t* loop, * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to * the user; we'll start a new zero-read at the end of this function. */ if (err != ERROR_OPERATION_ABORTED) - uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_); + uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_); } else { /* The zero-read completed without error, indicating there is data @@ -2015,7 +1990,7 @@ void uv_process_pipe_read_req(uv_loop_t* loop, /* Get the number of bytes available. */ avail = 0; if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL)) - uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_); + uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_); /* Read until we've either read all the bytes available, or the 'reading' * flag is cleared. */ @@ -2044,12 +2019,12 @@ void uv_process_pipe_read_req(uv_loop_t* loop, /* Start another zero-read request if necessary. */ if ((handle->flags & UV_HANDLE_READING) && !(handle->flags & UV_HANDLE_READ_PENDING)) { - uv_pipe_queue_read(loop, handle); + uv__pipe_queue_read(loop, handle); } } -void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, +void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, uv_write_t* req) { int err; @@ -2091,26 +2066,25 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE && handle->pipe.conn.non_overlapped_writes_tail) { assert(handle->stream.conn.write_reqs_pending > 0); - uv_queue_non_overlapped_write(handle); + uv__queue_non_overlapped_write(handle); } - if (handle->stream.conn.shutdown_req != NULL && - handle->stream.conn.write_reqs_pending == 0) { - uv_want_endgame(loop, (uv_handle_t*)handle); - } + if (handle->stream.conn.write_reqs_pending == 0) + if (handle->flags & UV_HANDLE_SHUTTING) + uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req); DECREASE_PENDING_REQ_COUNT(handle); } -void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle, +void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* raw_req) { uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req; assert(handle->type == UV_NAMED_PIPE); if (handle->flags & UV_HANDLE_CLOSING) { - /* The req->pipeHandle should be freed already in uv_pipe_cleanup(). */ + /* The req->pipeHandle should be freed already in uv__pipe_close(). */ assert(req->pipeHandle == INVALID_HANDLE_VALUE); DECREASE_PENDING_REQ_COUNT(handle); return; @@ -2130,7 +2104,7 @@ void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle, req->pipeHandle = INVALID_HANDLE_VALUE; } if (!(handle->flags & UV_HANDLE_CLOSING)) { - uv_pipe_queue_accept(loop, handle, req, FALSE); + uv__pipe_queue_accept(loop, handle, req, FALSE); } } @@ -2138,54 +2112,74 @@ void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle, } -void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle, +void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle, uv_connect_t* req) { + HANDLE pipeHandle; + DWORD duplex_flags; int err; assert(handle->type == UV_NAMED_PIPE); UNREGISTER_HANDLE_REQ(loop, handle, req); - if (req->cb) { - err = 0; - if (REQ_SUCCESS(req)) { - uv_pipe_connection_init(handle); - } else { - err = GET_REQ_ERROR(req); - } - req->cb(req, uv_translate_sys_error(err)); + err = 0; + if (REQ_SUCCESS(req)) { + pipeHandle = req->u.connect.pipeHandle; + duplex_flags = req->u.connect.duplex_flags; + err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags); + if (err) + CloseHandle(pipeHandle); + } else { + err = uv_translate_sys_error(GET_REQ_ERROR(req)); } + if (req->cb) + req->cb(req, err); + DECREASE_PENDING_REQ_COUNT(handle); } -void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle, + +void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t* req) { + int err; + assert(handle->type == UV_NAMED_PIPE); + /* Clear the shutdown_req field so we don't go here again. */ + handle->stream.conn.shutdown_req = NULL; + handle->flags &= ~UV_HANDLE_SHUTTING; UNREGISTER_HANDLE_REQ(loop, handle, req); - if (handle->flags & UV_HANDLE_READABLE) { - /* Initialize and optionally start the eof timer. Only do this if the pipe - * is readable and we haven't seen EOF come in ourselves. */ - eof_timer_init(handle); + if (handle->flags & UV_HANDLE_CLOSING) { + /* Already closing. Cancel the shutdown. */ + err = UV_ECANCELED; + } else if (!REQ_SUCCESS(req)) { + /* An error occurred in trying to shutdown gracefully. */ + err = uv_translate_sys_error(GET_REQ_ERROR(req)); + } else { + if (handle->flags & UV_HANDLE_READABLE) { + /* Initialize and optionally start the eof timer. Only do this if the pipe + * is readable and we haven't seen EOF come in ourselves. */ + eof_timer_init(handle); + + /* If reading start the timer right now. Otherwise uv__pipe_queue_read will + * start it. */ + if (handle->flags & UV_HANDLE_READ_PENDING) { + eof_timer_start(handle); + } - /* If reading start the timer right now. Otherwise uv_pipe_queue_read will - * start it. */ - if (handle->flags & UV_HANDLE_READ_PENDING) { - eof_timer_start(handle); + } else { + /* This pipe is not readable. We can just close it to let the other end + * know that we're done writing. */ + close_pipe(handle); } - - } else { - /* This pipe is not readable. We can just close it to let the other end - * know that we're done writing. */ - close_pipe(handle); + err = 0; } - if (req->cb) { - req->cb(req, 0); - } + if (req->cb) + req->cb(req, err); DECREASE_PENDING_REQ_COUNT(handle); } @@ -2200,7 +2194,8 @@ static void eof_timer_init(uv_pipe_t* pipe) { pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer); r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer); - assert(r == 0); /* timers can't fail */ + assert(r == 0); /* timers can't fail */ + (void) r; pipe->pipe.conn.eof_timer->data = pipe; uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer); } @@ -2231,9 +2226,9 @@ static void eof_timer_cb(uv_timer_t* timer) { assert(pipe->type == UV_NAMED_PIPE); /* This should always be true, since we start the timer only in - * uv_pipe_queue_read after successfully calling ReadFile, or in - * uv_process_pipe_shutdown_req if a read is pending, and we always - * immediately stop the timer in uv_process_pipe_read_req. */ + * uv__pipe_queue_read after successfully calling ReadFile, or in + * uv__process_pipe_shutdown_req if a read is pending, and we always + * immediately stop the timer in uv__process_pipe_read_req. */ assert(pipe->flags & UV_HANDLE_READ_PENDING); /* If there are many packets coming off the iocp then the timer callback may @@ -2254,7 +2249,7 @@ static void eof_timer_cb(uv_timer_t* timer) { /* Report the eof and update flags. This will get reported even if the user * stopped reading in the meantime. TODO: is that okay? */ - uv_pipe_read_eof(loop, pipe, uv_null_buf_); + uv__pipe_read_eof(loop, pipe, uv_null_buf_); } @@ -2280,10 +2275,16 @@ int uv_pipe_open(uv_pipe_t* pipe, uv_file file) { IO_STATUS_BLOCK io_status; FILE_ACCESS_INFORMATION access; DWORD duplex_flags = 0; + int err; if (os_handle == INVALID_HANDLE_VALUE) return UV_EBADF; + if (pipe->flags & UV_HANDLE_PIPESERVER) + return UV_EINVAL; + if (pipe->flags & UV_HANDLE_CONNECTION) + return UV_EBUSY; + uv__pipe_connection_init(pipe); uv__once_init(); /* In order to avoid closing a stdio file descriptor 0-2, duplicate the * underlying OS handle and forget about the original fd. @@ -2300,6 +2301,7 @@ int uv_pipe_open(uv_pipe_t* pipe, uv_file file) { FALSE, DUPLICATE_SAME_ACCESS)) return uv_translate_sys_error(GetLastError()); + assert(os_handle != INVALID_HANDLE_VALUE); file = -1; } @@ -2327,17 +2329,17 @@ int uv_pipe_open(uv_pipe_t* pipe, uv_file file) { if (access.AccessFlags & FILE_READ_DATA) duplex_flags |= UV_HANDLE_READABLE; - if (os_handle == INVALID_HANDLE_VALUE || - uv_set_pipe_handle(pipe->loop, - pipe, - os_handle, - file, - duplex_flags) == -1) { - return UV_EINVAL; + err = uv__set_pipe_handle(pipe->loop, + pipe, + os_handle, + file, + duplex_flags); + if (err) { + if (file == -1) + CloseHandle(os_handle); + return err; } - uv_pipe_connection_init(pipe); - if (pipe->ipc) { assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)); pipe->pipe.conn.ipc_remote_pid = uv_os_getppid(); @@ -2361,6 +2363,51 @@ static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) uv__once_init(); name_info = NULL; + if (handle->name != NULL) { + /* The user might try to query the name before we are connected, + * and this is just easier to return the cached value if we have it. */ + name_buf = handle->name; + name_len = wcslen(name_buf); + + /* check how much space we need */ + addrlen = WideCharToMultiByte(CP_UTF8, + 0, + name_buf, + name_len, + NULL, + 0, + NULL, + NULL); + if (!addrlen) { + *size = 0; + err = uv_translate_sys_error(GetLastError()); + return err; + } else if (addrlen >= *size) { + *size = addrlen + 1; + err = UV_ENOBUFS; + goto error; + } + + addrlen = WideCharToMultiByte(CP_UTF8, + 0, + name_buf, + name_len, + buffer, + addrlen, + NULL, + NULL); + if (!addrlen) { + *size = 0; + err = uv_translate_sys_error(GetLastError()); + return err; + } + + *size = addrlen; + buffer[addrlen] = '\0'; + + return 0; + } + if (handle->handle == INVALID_HANDLE_VALUE) { *size = 0; return UV_EINVAL; @@ -2498,6 +2545,11 @@ int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) { if (handle->handle != INVALID_HANDLE_VALUE) return uv__pipe_getname(handle, buffer, size); + if (handle->flags & UV_HANDLE_CONNECTION) { + if (handle->name != NULL) + return uv__pipe_getname(handle, buffer, size); + } + return UV_EBADF; } |