summaryrefslogtreecommitdiffstats
path: root/Utilities/cmlibuv/src/unix/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'Utilities/cmlibuv/src/unix/stream.c')
-rw-r--r--Utilities/cmlibuv/src/unix/stream.c259
1 files changed, 120 insertions, 139 deletions
diff --git a/Utilities/cmlibuv/src/unix/stream.c b/Utilities/cmlibuv/src/unix/stream.c
index 3b6da8d..52e2b9a 100644
--- a/Utilities/cmlibuv/src/unix/stream.c
+++ b/Utilities/cmlibuv/src/unix/stream.c
@@ -58,20 +58,6 @@ struct uv__stream_select_s {
fd_set* swrite;
size_t swrite_sz;
};
-
-/* Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
- * EPROTOTYPE can be returned while trying to write to a socket that is
- * shutting down. If we retry the write, we should get the expected EPIPE
- * instead.
- */
-# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE)
-# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
- (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \
- (errno == EMSGSIZE && send_handle != NULL))
-#else
-# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR)
-# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
- (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
#endif /* defined(__APPLE__) */
static void uv__stream_connect(uv_stream_t*);
@@ -164,7 +150,7 @@ static void uv__stream_osx_select(void* arg) {
else
max_fd = s->int_fd;
- while (1) {
+ for (;;) {
/* Terminate on semaphore */
if (uv_sem_trywait(&s->close_sem) == 0)
break;
@@ -195,7 +181,7 @@ static void uv__stream_osx_select(void* arg) {
/* Empty socketpair's buffer in case of interruption */
if (FD_ISSET(s->int_fd, s->sread))
- while (1) {
+ for (;;) {
r = read(s->int_fd, buf, sizeof(buf));
if (r == sizeof(buf))
@@ -799,33 +785,21 @@ static int uv__handle_fd(uv_handle_t* handle) {
}
}
-static void uv__write(uv_stream_t* stream) {
+static int uv__try_write(uv_stream_t* stream,
+ const uv_buf_t bufs[],
+ unsigned int nbufs,
+ uv_stream_t* send_handle) {
struct iovec* iov;
- QUEUE* q;
- uv_write_t* req;
int iovmax;
int iovcnt;
ssize_t n;
- int err;
-
-start:
-
- assert(uv__stream_fd(stream) >= 0);
-
- if (QUEUE_EMPTY(&stream->write_queue))
- return;
-
- q = QUEUE_HEAD(&stream->write_queue);
- req = QUEUE_DATA(q, uv_write_t, queue);
- assert(req->handle == stream);
/*
* Cast to iovec. We had to have our own uv_buf_t instead of iovec
* because Windows's WSABUF is not an iovec.
*/
- assert(sizeof(uv_buf_t) == sizeof(struct iovec));
- iov = (struct iovec*) &(req->bufs[req->write_index]);
- iovcnt = req->nbufs - req->write_index;
+ iov = (struct iovec*) bufs;
+ iovcnt = nbufs;
iovmax = uv__getiovmax();
@@ -837,8 +811,7 @@ start:
* Now do the actual writev. Note that we've been updating the pointers
* inside the iov each time we write. So there is no need to offset it.
*/
-
- if (req->send_handle) {
+ if (send_handle != NULL) {
int fd_to_send;
struct msghdr msg;
struct cmsghdr *cmsg;
@@ -847,12 +820,10 @@ start:
struct cmsghdr alias;
} scratch;
- if (uv__is_closing(req->send_handle)) {
- err = UV_EBADF;
- goto error;
- }
+ if (uv__is_closing(send_handle))
+ return UV_EBADF;
- fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
+ fd_to_send = uv__handle_fd((uv_handle_t*) send_handle);
memset(&scratch, 0, sizeof(scratch));
@@ -881,45 +852,83 @@ start:
do
n = sendmsg(uv__stream_fd(stream), &msg, 0);
- while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
-
- /* Ensure the handle isn't sent again in case this is a partial write. */
- if (n >= 0)
- req->send_handle = NULL;
+ while (n == -1 && errno == EINTR);
} else {
do
n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
- while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
+ while (n == -1 && errno == EINTR);
}
- if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
- err = UV__ERR(errno);
- goto error;
- }
+ if (n >= 0)
+ return n;
- if (n >= 0 && uv__write_req_update(stream, req, n)) {
- uv__write_req_finish(req);
- return; /* TODO(bnoordhuis) Start trying to write the next request. */
- }
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
+ return UV_EAGAIN;
- /* If this is a blocking stream, try again. */
- if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
- goto start;
+#ifdef __APPLE__
+ /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too -
+ * have a bug where a race condition causes the kernel to return EPROTOTYPE
+ * because the socket isn't fully constructed. It's probably the result of
+ * the peer closing the connection and that is why libuv translates it to
+ * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went
+ * away but some VPN software causes the same behavior except the error is
+ * permanent, not transient, turning the retry mechanism into an infinite
+ * loop. See https://github.com/libuv/libuv/pull/482.
+ */
+ if (errno == EPROTOTYPE)
+ return UV_ECONNRESET;
+#endif /* __APPLE__ */
- /* We're not done. */
- uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
+ return UV__ERR(errno);
+}
- /* Notify select() thread about state change */
- uv__stream_osx_interrupt_select(stream);
+static void uv__write(uv_stream_t* stream) {
+ QUEUE* q;
+ uv_write_t* req;
+ ssize_t n;
+
+ assert(uv__stream_fd(stream) >= 0);
+
+ for (;;) {
+ if (QUEUE_EMPTY(&stream->write_queue))
+ return;
- return;
+ q = QUEUE_HEAD(&stream->write_queue);
+ req = QUEUE_DATA(q, uv_write_t, queue);
+ assert(req->handle == stream);
+
+ n = uv__try_write(stream,
+ &(req->bufs[req->write_index]),
+ req->nbufs - req->write_index,
+ req->send_handle);
+
+ /* Ensure the handle isn't sent again in case this is a partial write. */
+ if (n >= 0) {
+ req->send_handle = NULL;
+ if (uv__write_req_update(stream, req, n)) {
+ uv__write_req_finish(req);
+ return; /* TODO(bnoordhuis) Start trying to write the next request. */
+ }
+ } else if (n != UV_EAGAIN)
+ break;
+
+ /* If this is a blocking stream, try again. */
+ if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
+ continue;
+
+ /* We're not done. */
+ uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
-error:
- req->error = err;
+ /* Notify select() thread about state change */
+ uv__stream_osx_interrupt_select(stream);
+
+ return;
+ }
+
+ req->error = n;
+ /* XXX(jwn): this must call uv__stream_flush_write_queue(stream, n) here, since we won't generate any more events */
uv__write_req_finish(req);
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
- if (!uv__io_active(&stream->io_watcher, POLLIN))
- uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
@@ -1002,8 +1011,7 @@ static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
stream->flags |= UV_HANDLE_READ_EOF;
stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
- if (!uv__io_active(&stream->io_watcher, POLLOUT))
- uv__handle_stop(stream);
+ uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
stream->read_cb(stream, UV_EOF, buf);
}
@@ -1188,12 +1196,12 @@ static void uv__read(uv_stream_t* stream) {
#endif
} else {
/* Error. User should call uv_close(). */
+ stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
stream->read_cb(stream, UV__ERR(errno), &buf);
if (stream->flags & UV_HANDLE_READING) {
stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
- if (!uv__io_active(&stream->io_watcher, POLLOUT))
- uv__handle_stop(stream);
+ uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
}
@@ -1276,6 +1284,7 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
req->cb = cb;
stream->shutdown_req = req;
stream->flags |= UV_HANDLE_SHUTTING;
+ stream->flags &= ~UV_HANDLE_WRITABLE;
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
@@ -1390,14 +1399,9 @@ static void uv__stream_connect(uv_stream_t* stream) {
}
-int uv_write2(uv_write_t* req,
- uv_stream_t* stream,
- const uv_buf_t bufs[],
- unsigned int nbufs,
- uv_stream_t* send_handle,
- uv_write_cb cb) {
- int empty_queue;
-
+static int uv__check_before_write(uv_stream_t* stream,
+ unsigned int nbufs,
+ uv_stream_t* send_handle) {
assert(nbufs > 0);
assert((stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
@@ -1410,7 +1414,7 @@ int uv_write2(uv_write_t* req,
if (!(stream->flags & UV_HANDLE_WRITABLE))
return UV_EPIPE;
- if (send_handle) {
+ if (send_handle != NULL) {
if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
return UV_EINVAL;
@@ -1430,6 +1434,22 @@ int uv_write2(uv_write_t* req,
#endif
}
+ return 0;
+}
+
+int uv_write2(uv_write_t* req,
+ uv_stream_t* stream,
+ const uv_buf_t bufs[],
+ unsigned int nbufs,
+ uv_stream_t* send_handle,
+ uv_write_cb cb) {
+ int empty_queue;
+ int err;
+
+ err = uv__check_before_write(stream, nbufs, send_handle);
+ if (err < 0)
+ return err;
+
/* It's legal for write_queue_size > 0 even when the write_queue is empty;
* it means there are error-state requests in the write_completed_queue that
* will touch up write_queue_size later, see also uv__write_req_finish().
@@ -1498,81 +1518,43 @@ int uv_write(uv_write_t* req,
}
-void uv_try_write_cb(uv_write_t* req, int status) {
- /* Should not be called */
- abort();
-}
-
-
int uv_try_write(uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs) {
- int r;
- int has_pollout;
- size_t written;
- size_t req_size;
- uv_write_t req;
+ return uv_try_write2(stream, bufs, nbufs, NULL);
+}
+
+
+int uv_try_write2(uv_stream_t* stream,
+ const uv_buf_t bufs[],
+ unsigned int nbufs,
+ uv_stream_t* send_handle) {
+ int err;
/* Connecting or already writing some data */
if (stream->connect_req != NULL || stream->write_queue_size != 0)
return UV_EAGAIN;
- has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);
-
- r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
- if (r != 0)
- return r;
+ err = uv__check_before_write(stream, nbufs, NULL);
+ if (err < 0)
+ return err;
- /* Remove not written bytes from write queue size */
- written = uv__count_bufs(bufs, nbufs);
- if (req.bufs != NULL)
- req_size = uv__write_req_size(&req);
- else
- req_size = 0;
- written -= req_size;
- stream->write_queue_size -= req_size;
-
- /* Unqueue request, regardless of immediateness */
- QUEUE_REMOVE(&req.queue);
- uv__req_unregister(stream->loop, &req);
- if (req.bufs != req.bufsml)
- uv__free(req.bufs);
- req.bufs = NULL;
-
- /* Do not poll for writable, if we wasn't before calling this */
- if (!has_pollout) {
- uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
- uv__stream_osx_interrupt_select(stream);
- }
-
- if (written == 0 && req_size != 0)
- return req.error < 0 ? req.error : UV_EAGAIN;
- else
- return written;
+ return uv__try_write(stream, bufs, nbufs, send_handle);
}
-int uv_read_start(uv_stream_t* stream,
- uv_alloc_cb alloc_cb,
- uv_read_cb read_cb) {
+int uv__read_start(uv_stream_t* stream,
+ uv_alloc_cb alloc_cb,
+ uv_read_cb read_cb) {
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
- if (stream->flags & UV_HANDLE_CLOSING)
- return UV_EINVAL;
-
- if (!(stream->flags & UV_HANDLE_READABLE))
- return UV_ENOTCONN;
-
- /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
- * expresses the desired state of the user.
- */
+ /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it
+ * just expresses the desired state of the user. */
stream->flags |= UV_HANDLE_READING;
+ stream->flags &= ~UV_HANDLE_READ_EOF;
/* TODO: try to do the read inline? */
- /* TODO: keep track of tcp state. If we've gotten a EOF then we should
- * not start the IO watcher.
- */
assert(uv__stream_fd(stream) >= 0);
assert(alloc_cb);
@@ -1593,8 +1575,7 @@ int uv_read_stop(uv_stream_t* stream) {
stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
- if (!uv__io_active(&stream->io_watcher, POLLOUT))
- uv__handle_stop(stream);
+ uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
stream->read_cb = NULL;