summaryrefslogtreecommitdiffstats
path: root/src/unix/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/unix/stream.c')
-rw-r--r--src/unix/stream.c262
1 files changed, 124 insertions, 138 deletions
diff --git a/src/unix/stream.c b/src/unix/stream.c
index 5ec6bf4..a75ba15 100644
--- a/src/unix/stream.c
+++ b/src/unix/stream.c
@@ -58,11 +58,19 @@ struct uv__stream_select_s {
fd_set* swrite;
size_t swrite_sz;
};
-# define WRITE_RETRY_ON_ERROR(send_handle) \
+
+/* Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
+ * EPROTOTYPE can be returned while trying to write to a socket that is
+ * shutting down. If we retry the write, we should get the expected EPIPE
+ * instead.
+ */
+# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE)
+# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
(errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \
- (errno == EMSGSIZE && send_handle))
+ (errno == EMSGSIZE && send_handle != NULL))
#else
-# define WRITE_RETRY_ON_ERROR(send_handle) \
+# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR)
+# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
(errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
#endif /* defined(__APPLE__) */
@@ -220,7 +228,7 @@ static void uv__stream_osx_select(void* arg) {
uv_sem_wait(&s->async_sem);
/* Should be processed at this stage */
- assert((s->events == 0) || (stream->flags & UV_CLOSING));
+ assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
}
}
}
@@ -248,7 +256,7 @@ static void uv__stream_osx_select_cb(uv_async_t* handle) {
if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
- if (stream->flags & UV_CLOSING)
+ if (stream->flags & UV_HANDLE_CLOSING)
return;
/* NOTE: It is important to do it here, otherwise `select()` might be called
@@ -342,7 +350,7 @@ int uv__stream_try_select(uv_stream_t* stream, int* fd) {
if (err)
goto failed_async_init;
- s->async.flags |= UV__HANDLE_INTERNAL;
+ s->async.flags |= UV_HANDLE_INTERNAL;
uv__handle_unref(&s->async);
err = uv_sem_init(&s->close_sem, 0);
@@ -407,12 +415,14 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
stream->flags |= flags;
if (stream->type == UV_TCP) {
- if ((stream->flags & UV_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
+ if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
return UV__ERR(errno);
/* TODO Use delay the user passed in. */
- if ((stream->flags & UV_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1, 60))
+ if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
+ uv__tcp_keepalive(fd, 1, 60)) {
return UV__ERR(errno);
+ }
}
#if defined(__APPLE__)
@@ -447,7 +457,7 @@ void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
void uv__stream_destroy(uv_stream_t* stream) {
assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
- assert(stream->flags & UV_CLOSED);
+ assert(stream->flags & UV_HANDLE_CLOSED);
if (stream->connect_req) {
uv__req_unregister(stream->loop, stream->connect_req);
@@ -522,7 +532,7 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
stream = container_of(w, uv_stream_t, io_watcher);
assert(events & POLLIN);
assert(stream->accepted_fd == -1);
- assert(!(stream->flags & UV_CLOSING));
+ assert(!(stream->flags & UV_HANDLE_CLOSING));
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
@@ -565,7 +575,8 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
return;
}
- if (stream->type == UV_TCP && (stream->flags & UV_TCP_SINGLE_ACCEPT)) {
+ if (stream->type == UV_TCP &&
+ (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
/* Give other processes a chance to accept connections. */
struct timespec timeout = { 0, 1 };
nanosleep(&timeout, NULL);
@@ -590,7 +601,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
case UV_TCP:
err = uv__stream_open(client,
server->accepted_fd,
- UV_STREAM_READABLE | UV_STREAM_WRITABLE);
+ UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
if (err) {
/* TODO handle error */
uv__close(server->accepted_fd);
@@ -674,14 +685,14 @@ static void uv__drain(uv_stream_t* stream) {
uv__stream_osx_interrupt_select(stream);
/* Shutdown? */
- if ((stream->flags & UV_STREAM_SHUTTING) &&
- !(stream->flags & UV_CLOSING) &&
- !(stream->flags & UV_STREAM_SHUT)) {
+ if ((stream->flags & UV_HANDLE_SHUTTING) &&
+ !(stream->flags & UV_HANDLE_CLOSING) &&
+ !(stream->flags & UV_HANDLE_SHUT)) {
assert(stream->shutdown_req);
req = stream->shutdown_req;
stream->shutdown_req = NULL;
- stream->flags &= ~UV_STREAM_SHUTTING;
+ stream->flags &= ~UV_HANDLE_SHUTTING;
uv__req_unregister(stream->loop, req);
err = 0;
@@ -689,7 +700,7 @@ static void uv__drain(uv_stream_t* stream) {
err = UV__ERR(errno);
if (err == 0)
- stream->flags |= UV_STREAM_SHUT;
+ stream->flags |= UV_HANDLE_SHUT;
if (req->cb != NULL)
req->cb(req, err);
@@ -697,6 +708,14 @@ static void uv__drain(uv_stream_t* stream) {
}
+static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
+ if (n == 1)
+ return write(fd, vec->iov_base, vec->iov_len);
+ else
+ return writev(fd, vec, n);
+}
+
+
static size_t uv__write_req_size(uv_write_t* req) {
size_t size;
@@ -709,6 +728,37 @@ static size_t uv__write_req_size(uv_write_t* req) {
}
+/* Returns 1 if all write request data has been written, or 0 if there is still
+ * more data to write.
+ *
+ * Note: the return value only says something about the *current* request.
+ * There may still be other write requests sitting in the queue.
+ */
+static int uv__write_req_update(uv_stream_t* stream,
+ uv_write_t* req,
+ size_t n) {
+ uv_buf_t* buf;
+ size_t len;
+
+ assert(n <= stream->write_queue_size);
+ stream->write_queue_size -= n;
+
+ buf = req->bufs + req->write_index;
+
+ while (n > 0) {
+ len = n < buf->len ? n : buf->len;
+ buf->base += len;
+ buf->len -= len;
+ buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */
+ n -= len;
+ }
+
+ req->write_index = buf - req->bufs;
+
+ return req->write_index == req->nbufs;
+}
+
+
static void uv__write_req_finish(uv_write_t* req) {
uv_stream_t* stream = req->handle;
@@ -829,102 +879,32 @@ start:
*pi = fd_to_send;
}
- do {
+ do
n = sendmsg(uv__stream_fd(stream), &msg, 0);
- }
-#if defined(__APPLE__)
- /*
- * Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
- * EPROTOTYPE can be returned while trying to write to a socket that is
- * shutting down. If we retry the write, we should get the expected EPIPE
- * instead.
- */
- while (n == -1 && (errno == EINTR || errno == EPROTOTYPE));
-#else
- while (n == -1 && errno == EINTR);
-#endif
- } else {
- do {
- if (iovcnt == 1) {
- n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len);
- } else {
- n = writev(uv__stream_fd(stream), iov, iovcnt);
- }
- }
-#if defined(__APPLE__)
- /*
- * Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
- * EPROTOTYPE can be returned while trying to write to a socket that is
- * shutting down. If we retry the write, we should get the expected EPIPE
- * instead.
- */
- while (n == -1 && (errno == EINTR || errno == EPROTOTYPE));
-#else
- while (n == -1 && errno == EINTR);
-#endif
- }
+ while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
- if (n < 0) {
- if (!WRITE_RETRY_ON_ERROR(req->send_handle)) {
- err = UV__ERR(errno);
- goto error;
- } else if (stream->flags & UV_STREAM_BLOCKING) {
- /* If this is a blocking stream, try again. */
- goto start;
- }
+ /* Ensure the handle isn't sent again in case this is a partial write. */
+ if (n >= 0)
+ req->send_handle = NULL;
} else {
- /* Successful write */
-
- while (n >= 0) {
- uv_buf_t* buf = &(req->bufs[req->write_index]);
- size_t len = buf->len;
-
- assert(req->write_index < req->nbufs);
-
- if ((size_t)n < len) {
- buf->base += n;
- buf->len -= n;
- stream->write_queue_size -= n;
- n = 0;
-
- /* There is more to write. */
- if (stream->flags & UV_STREAM_BLOCKING) {
- /*
- * If we're blocking then we should not be enabling the write
- * watcher - instead we need to try again.
- */
- goto start;
- } else {
- /* Break loop and ensure the watcher is pending. */
- break;
- }
-
- } else {
- /* Finished writing the buf at index req->write_index. */
- req->write_index++;
-
- assert((size_t)n >= len);
- n -= len;
-
- assert(stream->write_queue_size >= len);
- stream->write_queue_size -= len;
+ do
+ n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
+ while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
+ }
- if (req->write_index == req->nbufs) {
- /* Then we're done! */
- assert(n == 0);
- uv__write_req_finish(req);
- /* TODO: start trying to write the next request. */
- return;
- }
- }
- }
+ if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
+ err = UV__ERR(errno);
+ goto error;
}
- /* Either we've counted n down to zero or we've got EAGAIN. */
- assert(n == 0 || n == -1);
+ if (n > 0 && uv__write_req_update(stream, req, n)) {
+ uv__write_req_finish(req);
+ return; /* TODO(bnoordhuis) Start trying to write the next request. */
+ }
- /* Only non-blocking streams should use the write_watcher. */
- assert(!(stream->flags & UV_STREAM_BLOCKING));
+ /* If this is a blocking stream, try again. */
+ if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
+ goto start;
/* We're not done. */
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
@@ -947,10 +927,16 @@ error:
static void uv__write_callbacks(uv_stream_t* stream) {
uv_write_t* req;
QUEUE* q;
+ QUEUE pq;
+
+ if (QUEUE_EMPTY(&stream->write_completed_queue))
+ return;
+
+ QUEUE_MOVE(&stream->write_completed_queue, &pq);
- while (!QUEUE_EMPTY(&stream->write_completed_queue)) {
+ while (!QUEUE_EMPTY(&pq)) {
/* Pop a req off write_completed_queue. */
- q = QUEUE_HEAD(&stream->write_completed_queue);
+ q = QUEUE_HEAD(&pq);
req = QUEUE_DATA(q, uv_write_t, queue);
QUEUE_REMOVE(q);
uv__req_unregister(stream->loop, req);
@@ -966,8 +952,6 @@ static void uv__write_callbacks(uv_stream_t* stream) {
if (req->cb)
req->cb(req, req->error);
}
-
- assert(QUEUE_EMPTY(&stream->write_completed_queue));
}
@@ -1015,13 +999,13 @@ uv_handle_type uv__handle_type(int fd) {
static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
- stream->flags |= UV_STREAM_READ_EOF;
+ stream->flags |= UV_HANDLE_READ_EOF;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
stream->read_cb(stream, UV_EOF, buf);
- stream->flags &= ~UV_STREAM_READING;
+ stream->flags &= ~UV_HANDLE_READING;
}
@@ -1121,6 +1105,7 @@ static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wgnu-folding-constant"
+# pragma clang diagnostic ignored "-Wvla-extension"
#endif
static void uv__read(uv_stream_t* stream) {
@@ -1132,7 +1117,7 @@ static void uv__read(uv_stream_t* stream) {
int err;
int is_ipc;
- stream->flags &= ~UV_STREAM_READ_PARTIAL;
+ stream->flags &= ~UV_HANDLE_READ_PARTIAL;
/* Prevent loop starvation when the data comes in as fast as (or faster than)
* we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
@@ -1141,11 +1126,11 @@ static void uv__read(uv_stream_t* stream) {
is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
- /* XXX: Maybe instead of having UV_STREAM_READING we just test if
+ /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
* tcp->read_cb is NULL or not?
*/
while (stream->read_cb
- && (stream->flags & UV_STREAM_READING)
+ && (stream->flags & UV_HANDLE_READING)
&& (count-- > 0)) {
assert(stream->alloc_cb != NULL);
@@ -1186,7 +1171,7 @@ static void uv__read(uv_stream_t* stream) {
/* Error */
if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* Wait for the next one. */
- if (stream->flags & UV_STREAM_READING) {
+ if (stream->flags & UV_HANDLE_READING) {
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
uv__stream_osx_interrupt_select(stream);
}
@@ -1199,8 +1184,8 @@ static void uv__read(uv_stream_t* stream) {
} else {
/* Error. User should call uv_close(). */
stream->read_cb(stream, UV__ERR(errno), &buf);
- if (stream->flags & UV_STREAM_READING) {
- stream->flags &= ~UV_STREAM_READING;
+ if (stream->flags & UV_HANDLE_READING) {
+ stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
@@ -1250,7 +1235,7 @@ static void uv__read(uv_stream_t* stream) {
/* Return if we didn't fill the buffer, there is no more data to read. */
if (nread < buflen) {
- stream->flags |= UV_STREAM_READ_PARTIAL;
+ stream->flags |= UV_HANDLE_READ_PARTIAL;
return;
}
}
@@ -1271,9 +1256,9 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
stream->type == UV_TTY ||
stream->type == UV_NAMED_PIPE);
- if (!(stream->flags & UV_STREAM_WRITABLE) ||
- stream->flags & UV_STREAM_SHUT ||
- stream->flags & UV_STREAM_SHUTTING ||
+ if (!(stream->flags & UV_HANDLE_WRITABLE) ||
+ stream->flags & UV_HANDLE_SHUT ||
+ stream->flags & UV_HANDLE_SHUTTING ||
uv__is_closing(stream)) {
return UV_ENOTCONN;
}
@@ -1285,7 +1270,7 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
req->handle = stream;
req->cb = cb;
stream->shutdown_req = req;
- stream->flags |= UV_STREAM_SHUTTING;
+ stream->flags |= UV_HANDLE_SHUTTING;
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
@@ -1302,7 +1287,7 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
assert(stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
- assert(!(stream->flags & UV_CLOSING));
+ assert(!(stream->flags & UV_HANDLE_CLOSING));
if (stream->connect_req) {
uv__stream_connect(stream);
@@ -1311,7 +1296,7 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
assert(uv__stream_fd(stream) >= 0);
- /* Ignore POLLHUP here. Even it it's set, there may still be data to read. */
+ /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
if (events & (POLLIN | POLLERR | POLLHUP))
uv__read(stream);
@@ -1325,9 +1310,9 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
* report the EOF yet because there is still data to read.
*/
if ((events & POLLHUP) &&
- (stream->flags & UV_STREAM_READING) &&
- (stream->flags & UV_STREAM_READ_PARTIAL) &&
- !(stream->flags & UV_STREAM_READ_EOF)) {
+ (stream->flags & UV_HANDLE_READING) &&
+ (stream->flags & UV_HANDLE_READ_PARTIAL) &&
+ !(stream->flags & UV_HANDLE_READ_EOF)) {
uv_buf_t buf = { NULL, 0 };
uv__stream_eof(stream, &buf);
}
@@ -1417,7 +1402,7 @@ int uv_write2(uv_write_t* req,
if (uv__stream_fd(stream) < 0)
return UV_EBADF;
- if (!(stream->flags & UV_STREAM_WRITABLE))
+ if (!(stream->flags & UV_HANDLE_WRITABLE))
return -EPIPE;
if (send_handle) {
@@ -1487,7 +1472,7 @@ int uv_write2(uv_write_t* req,
* if this assert fires then somehow the blocking stream isn't being
* sufficiently flushed in uv__write.
*/
- assert(!(stream->flags & UV_STREAM_BLOCKING));
+ assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
@@ -1568,16 +1553,16 @@ int uv_read_start(uv_stream_t* stream,
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
- if (stream->flags & UV_CLOSING)
+ if (stream->flags & UV_HANDLE_CLOSING)
return UV_EINVAL;
- if (!(stream->flags & UV_STREAM_READABLE))
+ if (!(stream->flags & UV_HANDLE_READABLE))
return -ENOTCONN;
- /* The UV_STREAM_READING flag is irrelevant of the state of the tcp - it just
+ /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
* expresses the desired state of the user.
*/
- stream->flags |= UV_STREAM_READING;
+ stream->flags |= UV_HANDLE_READING;
/* TODO: try to do the read inline? */
/* TODO: keep track of tcp state. If we've gotten a EOF then we should
@@ -1598,10 +1583,10 @@ int uv_read_start(uv_stream_t* stream,
int uv_read_stop(uv_stream_t* stream) {
- if (!(stream->flags & UV_STREAM_READING))
+ if (!(stream->flags & UV_HANDLE_READING))
return 0;
- stream->flags &= ~UV_STREAM_READING;
+ stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
@@ -1614,12 +1599,12 @@ int uv_read_stop(uv_stream_t* stream) {
int uv_is_readable(const uv_stream_t* stream) {
- return !!(stream->flags & UV_STREAM_READABLE);
+ return !!(stream->flags & UV_HANDLE_READABLE);
}
int uv_is_writable(const uv_stream_t* stream) {
- return !!(stream->flags & UV_STREAM_WRITABLE);
+ return !!(stream->flags & UV_HANDLE_WRITABLE);
}
@@ -1668,6 +1653,7 @@ void uv__stream_close(uv_stream_t* handle) {
uv__io_close(handle->loop, &handle->io_watcher);
uv_read_stop(handle);
uv__handle_stop(handle);
+ handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
if (handle->io_watcher.fd != -1) {
/* Don't close stdio file descriptors. Nothing good comes from it. */