diff options
Diffstat (limited to 'Utilities/cmlibuv/src/unix/stream.c')
-rw-r--r-- | Utilities/cmlibuv/src/unix/stream.c | 259 |
1 files changed, 120 insertions, 139 deletions
diff --git a/Utilities/cmlibuv/src/unix/stream.c b/Utilities/cmlibuv/src/unix/stream.c index 3b6da8d..52e2b9a 100644 --- a/Utilities/cmlibuv/src/unix/stream.c +++ b/Utilities/cmlibuv/src/unix/stream.c @@ -58,20 +58,6 @@ struct uv__stream_select_s { fd_set* swrite; size_t swrite_sz; }; - -/* Due to a possible kernel bug at least in OS X 10.10 "Yosemite", - * EPROTOTYPE can be returned while trying to write to a socket that is - * shutting down. If we retry the write, we should get the expected EPIPE - * instead. - */ -# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE) -# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \ - (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \ - (errno == EMSGSIZE && send_handle != NULL)) -#else -# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR) -# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \ - (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) #endif /* defined(__APPLE__) */ static void uv__stream_connect(uv_stream_t*); @@ -164,7 +150,7 @@ static void uv__stream_osx_select(void* arg) { else max_fd = s->int_fd; - while (1) { + for (;;) { /* Terminate on semaphore */ if (uv_sem_trywait(&s->close_sem) == 0) break; @@ -195,7 +181,7 @@ static void uv__stream_osx_select(void* arg) { /* Empty socketpair's buffer in case of interruption */ if (FD_ISSET(s->int_fd, s->sread)) - while (1) { + for (;;) { r = read(s->int_fd, buf, sizeof(buf)); if (r == sizeof(buf)) @@ -799,33 +785,21 @@ static int uv__handle_fd(uv_handle_t* handle) { } } -static void uv__write(uv_stream_t* stream) { +static int uv__try_write(uv_stream_t* stream, + const uv_buf_t bufs[], + unsigned int nbufs, + uv_stream_t* send_handle) { struct iovec* iov; - QUEUE* q; - uv_write_t* req; int iovmax; int iovcnt; ssize_t n; - int err; - -start: - - assert(uv__stream_fd(stream) >= 0); - - if (QUEUE_EMPTY(&stream->write_queue)) - return; - - q = QUEUE_HEAD(&stream->write_queue); - req = QUEUE_DATA(q, uv_write_t, queue); - assert(req->handle == stream); /* * Cast to iovec. We had to have our own uv_buf_t instead of iovec * because Windows's WSABUF is not an iovec. */ - assert(sizeof(uv_buf_t) == sizeof(struct iovec)); - iov = (struct iovec*) &(req->bufs[req->write_index]); - iovcnt = req->nbufs - req->write_index; + iov = (struct iovec*) bufs; + iovcnt = nbufs; iovmax = uv__getiovmax(); @@ -837,8 +811,7 @@ start: * Now do the actual writev. Note that we've been updating the pointers * inside the iov each time we write. So there is no need to offset it. */ - - if (req->send_handle) { + if (send_handle != NULL) { int fd_to_send; struct msghdr msg; struct cmsghdr *cmsg; @@ -847,12 +820,10 @@ start: struct cmsghdr alias; } scratch; - if (uv__is_closing(req->send_handle)) { - err = UV_EBADF; - goto error; - } + if (uv__is_closing(send_handle)) + return UV_EBADF; - fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle); + fd_to_send = uv__handle_fd((uv_handle_t*) send_handle); memset(&scratch, 0, sizeof(scratch)); @@ -881,45 +852,83 @@ start: do n = sendmsg(uv__stream_fd(stream), &msg, 0); - while (n == -1 && RETRY_ON_WRITE_ERROR(errno)); - - /* Ensure the handle isn't sent again in case this is a partial write. */ - if (n >= 0) - req->send_handle = NULL; + while (n == -1 && errno == EINTR); } else { do n = uv__writev(uv__stream_fd(stream), iov, iovcnt); - while (n == -1 && RETRY_ON_WRITE_ERROR(errno)); + while (n == -1 && errno == EINTR); } - if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) { - err = UV__ERR(errno); - goto error; - } + if (n >= 0) + return n; - if (n >= 0 && uv__write_req_update(stream, req, n)) { - uv__write_req_finish(req); - return; /* TODO(bnoordhuis) Start trying to write the next request. */ - } + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) + return UV_EAGAIN; - /* If this is a blocking stream, try again. */ - if (stream->flags & UV_HANDLE_BLOCKING_WRITES) - goto start; +#ifdef __APPLE__ + /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too - + * have a bug where a race condition causes the kernel to return EPROTOTYPE + * because the socket isn't fully constructed. It's probably the result of + * the peer closing the connection and that is why libuv translates it to + * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went + * away but some VPN software causes the same behavior except the error is + * permanent, not transient, turning the retry mechanism into an infinite + * loop. See https://github.com/libuv/libuv/pull/482. + */ + if (errno == EPROTOTYPE) + return UV_ECONNRESET; +#endif /* __APPLE__ */ - /* We're not done. */ - uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); + return UV__ERR(errno); +} - /* Notify select() thread about state change */ - uv__stream_osx_interrupt_select(stream); +static void uv__write(uv_stream_t* stream) { + QUEUE* q; + uv_write_t* req; + ssize_t n; + + assert(uv__stream_fd(stream) >= 0); + + for (;;) { + if (QUEUE_EMPTY(&stream->write_queue)) + return; - return; + q = QUEUE_HEAD(&stream->write_queue); + req = QUEUE_DATA(q, uv_write_t, queue); + assert(req->handle == stream); + + n = uv__try_write(stream, + &(req->bufs[req->write_index]), + req->nbufs - req->write_index, + req->send_handle); + + /* Ensure the handle isn't sent again in case this is a partial write. */ + if (n >= 0) { + req->send_handle = NULL; + if (uv__write_req_update(stream, req, n)) { + uv__write_req_finish(req); + return; /* TODO(bnoordhuis) Start trying to write the next request. */ + } + } else if (n != UV_EAGAIN) + break; + + /* If this is a blocking stream, try again. */ + if (stream->flags & UV_HANDLE_BLOCKING_WRITES) + continue; + + /* We're not done. */ + uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); -error: - req->error = err; + /* Notify select() thread about state change */ + uv__stream_osx_interrupt_select(stream); + + return; + } + + req->error = n; + /* XXX(jwn): this must call uv__stream_flush_write_queue(stream, n) here, since we won't generate any more events */ uv__write_req_finish(req); uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); - if (!uv__io_active(&stream->io_watcher, POLLIN)) - uv__handle_stop(stream); uv__stream_osx_interrupt_select(stream); } @@ -1002,8 +1011,7 @@ static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { stream->flags |= UV_HANDLE_READ_EOF; stream->flags &= ~UV_HANDLE_READING; uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); - if (!uv__io_active(&stream->io_watcher, POLLOUT)) - uv__handle_stop(stream); + uv__handle_stop(stream); uv__stream_osx_interrupt_select(stream); stream->read_cb(stream, UV_EOF, buf); } @@ -1188,12 +1196,12 @@ static void uv__read(uv_stream_t* stream) { #endif } else { /* Error. User should call uv_close(). */ + stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); stream->read_cb(stream, UV__ERR(errno), &buf); if (stream->flags & UV_HANDLE_READING) { stream->flags &= ~UV_HANDLE_READING; uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); - if (!uv__io_active(&stream->io_watcher, POLLOUT)) - uv__handle_stop(stream); + uv__handle_stop(stream); uv__stream_osx_interrupt_select(stream); } } @@ -1276,6 +1284,7 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { req->cb = cb; stream->shutdown_req = req; stream->flags |= UV_HANDLE_SHUTTING; + stream->flags &= ~UV_HANDLE_WRITABLE; uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); uv__stream_osx_interrupt_select(stream); @@ -1390,14 +1399,9 @@ static void uv__stream_connect(uv_stream_t* stream) { } -int uv_write2(uv_write_t* req, - uv_stream_t* stream, - const uv_buf_t bufs[], - unsigned int nbufs, - uv_stream_t* send_handle, - uv_write_cb cb) { - int empty_queue; - +static int uv__check_before_write(uv_stream_t* stream, + unsigned int nbufs, + uv_stream_t* send_handle) { assert(nbufs > 0); assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || @@ -1410,7 +1414,7 @@ int uv_write2(uv_write_t* req, if (!(stream->flags & UV_HANDLE_WRITABLE)) return UV_EPIPE; - if (send_handle) { + if (send_handle != NULL) { if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc) return UV_EINVAL; @@ -1430,6 +1434,22 @@ int uv_write2(uv_write_t* req, #endif } + return 0; +} + +int uv_write2(uv_write_t* req, + uv_stream_t* stream, + const uv_buf_t bufs[], + unsigned int nbufs, + uv_stream_t* send_handle, + uv_write_cb cb) { + int empty_queue; + int err; + + err = uv__check_before_write(stream, nbufs, send_handle); + if (err < 0) + return err; + /* It's legal for write_queue_size > 0 even when the write_queue is empty; * it means there are error-state requests in the write_completed_queue that * will touch up write_queue_size later, see also uv__write_req_finish(). @@ -1498,81 +1518,43 @@ int uv_write(uv_write_t* req, } -void uv_try_write_cb(uv_write_t* req, int status) { - /* Should not be called */ - abort(); -} - - int uv_try_write(uv_stream_t* stream, const uv_buf_t bufs[], unsigned int nbufs) { - int r; - int has_pollout; - size_t written; - size_t req_size; - uv_write_t req; + return uv_try_write2(stream, bufs, nbufs, NULL); +} + + +int uv_try_write2(uv_stream_t* stream, + const uv_buf_t bufs[], + unsigned int nbufs, + uv_stream_t* send_handle) { + int err; /* Connecting or already writing some data */ if (stream->connect_req != NULL || stream->write_queue_size != 0) return UV_EAGAIN; - has_pollout = uv__io_active(&stream->io_watcher, POLLOUT); - - r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb); - if (r != 0) - return r; + err = uv__check_before_write(stream, nbufs, NULL); + if (err < 0) + return err; - /* Remove not written bytes from write queue size */ - written = uv__count_bufs(bufs, nbufs); - if (req.bufs != NULL) - req_size = uv__write_req_size(&req); - else - req_size = 0; - written -= req_size; - stream->write_queue_size -= req_size; - - /* Unqueue request, regardless of immediateness */ - QUEUE_REMOVE(&req.queue); - uv__req_unregister(stream->loop, &req); - if (req.bufs != req.bufsml) - uv__free(req.bufs); - req.bufs = NULL; - - /* Do not poll for writable, if we wasn't before calling this */ - if (!has_pollout) { - uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); - uv__stream_osx_interrupt_select(stream); - } - - if (written == 0 && req_size != 0) - return req.error < 0 ? req.error : UV_EAGAIN; - else - return written; + return uv__try_write(stream, bufs, nbufs, send_handle); } -int uv_read_start(uv_stream_t* stream, - uv_alloc_cb alloc_cb, - uv_read_cb read_cb) { +int uv__read_start(uv_stream_t* stream, + uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); - if (stream->flags & UV_HANDLE_CLOSING) - return UV_EINVAL; - - if (!(stream->flags & UV_HANDLE_READABLE)) - return UV_ENOTCONN; - - /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just - * expresses the desired state of the user. - */ + /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it + * just expresses the desired state of the user. */ stream->flags |= UV_HANDLE_READING; + stream->flags &= ~UV_HANDLE_READ_EOF; /* TODO: try to do the read inline? */ - /* TODO: keep track of tcp state. If we've gotten a EOF then we should - * not start the IO watcher. - */ assert(uv__stream_fd(stream) >= 0); assert(alloc_cb); @@ -1593,8 +1575,7 @@ int uv_read_stop(uv_stream_t* stream) { stream->flags &= ~UV_HANDLE_READING; uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); - if (!uv__io_active(&stream->io_watcher, POLLOUT)) - uv__handle_stop(stream); + uv__handle_stop(stream); uv__stream_osx_interrupt_select(stream); stream->read_cb = NULL; |