diff options
Diffstat (limited to 'src/unix/stream.c')
-rw-r--r-- | src/unix/stream.c | 262 |
1 files changed, 124 insertions, 138 deletions
diff --git a/src/unix/stream.c b/src/unix/stream.c index 5ec6bf4..a75ba15 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -58,11 +58,19 @@ struct uv__stream_select_s { fd_set* swrite; size_t swrite_sz; }; -# define WRITE_RETRY_ON_ERROR(send_handle) \ + +/* Due to a possible kernel bug at least in OS X 10.10 "Yosemite", + * EPROTOTYPE can be returned while trying to write to a socket that is + * shutting down. If we retry the write, we should get the expected EPIPE + * instead. + */ +# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE) +# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \ (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \ - (errno == EMSGSIZE && send_handle)) + (errno == EMSGSIZE && send_handle != NULL)) #else -# define WRITE_RETRY_ON_ERROR(send_handle) \ +# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR) +# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \ (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) #endif /* defined(__APPLE__) */ @@ -220,7 +228,7 @@ static void uv__stream_osx_select(void* arg) { uv_sem_wait(&s->async_sem); /* Should be processed at this stage */ - assert((s->events == 0) || (stream->flags & UV_CLOSING)); + assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING)); } } } @@ -248,7 +256,7 @@ static void uv__stream_osx_select_cb(uv_async_t* handle) { if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT)) uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT); - if (stream->flags & UV_CLOSING) + if (stream->flags & UV_HANDLE_CLOSING) return; /* NOTE: It is important to do it here, otherwise `select()` might be called @@ -342,7 +350,7 @@ int uv__stream_try_select(uv_stream_t* stream, int* fd) { if (err) goto failed_async_init; - s->async.flags |= UV__HANDLE_INTERNAL; + s->async.flags |= UV_HANDLE_INTERNAL; uv__handle_unref(&s->async); err = uv_sem_init(&s->close_sem, 0); @@ -407,12 +415,14 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) { stream->flags |= flags; if (stream->type == UV_TCP) { - if ((stream->flags & UV_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) + if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) return UV__ERR(errno); /* TODO Use delay the user passed in. */ - if ((stream->flags & UV_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1, 60)) + if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) && + uv__tcp_keepalive(fd, 1, 60)) { return UV__ERR(errno); + } } #if defined(__APPLE__) @@ -447,7 +457,7 @@ void uv__stream_flush_write_queue(uv_stream_t* stream, int error) { void uv__stream_destroy(uv_stream_t* stream) { assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT)); - assert(stream->flags & UV_CLOSED); + assert(stream->flags & UV_HANDLE_CLOSED); if (stream->connect_req) { uv__req_unregister(stream->loop, stream->connect_req); @@ -522,7 +532,7 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { stream = container_of(w, uv_stream_t, io_watcher); assert(events & POLLIN); assert(stream->accepted_fd == -1); - assert(!(stream->flags & UV_CLOSING)); + assert(!(stream->flags & UV_HANDLE_CLOSING)); uv__io_start(stream->loop, &stream->io_watcher, POLLIN); @@ -565,7 +575,8 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { return; } - if (stream->type == UV_TCP && (stream->flags & UV_TCP_SINGLE_ACCEPT)) { + if (stream->type == UV_TCP && + (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) { /* Give other processes a chance to accept connections. */ struct timespec timeout = { 0, 1 }; nanosleep(&timeout, NULL); @@ -590,7 +601,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { case UV_TCP: err = uv__stream_open(client, server->accepted_fd, - UV_STREAM_READABLE | UV_STREAM_WRITABLE); + UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); if (err) { /* TODO handle error */ uv__close(server->accepted_fd); @@ -674,14 +685,14 @@ static void uv__drain(uv_stream_t* stream) { uv__stream_osx_interrupt_select(stream); /* Shutdown? */ - if ((stream->flags & UV_STREAM_SHUTTING) && - !(stream->flags & UV_CLOSING) && - !(stream->flags & UV_STREAM_SHUT)) { + if ((stream->flags & UV_HANDLE_SHUTTING) && + !(stream->flags & UV_HANDLE_CLOSING) && + !(stream->flags & UV_HANDLE_SHUT)) { assert(stream->shutdown_req); req = stream->shutdown_req; stream->shutdown_req = NULL; - stream->flags &= ~UV_STREAM_SHUTTING; + stream->flags &= ~UV_HANDLE_SHUTTING; uv__req_unregister(stream->loop, req); err = 0; @@ -689,7 +700,7 @@ static void uv__drain(uv_stream_t* stream) { err = UV__ERR(errno); if (err == 0) - stream->flags |= UV_STREAM_SHUT; + stream->flags |= UV_HANDLE_SHUT; if (req->cb != NULL) req->cb(req, err); @@ -697,6 +708,14 @@ static void uv__drain(uv_stream_t* stream) { } +static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) { + if (n == 1) + return write(fd, vec->iov_base, vec->iov_len); + else + return writev(fd, vec, n); +} + + static size_t uv__write_req_size(uv_write_t* req) { size_t size; @@ -709,6 +728,37 @@ static size_t uv__write_req_size(uv_write_t* req) { } +/* Returns 1 if all write request data has been written, or 0 if there is still + * more data to write. + * + * Note: the return value only says something about the *current* request. + * There may still be other write requests sitting in the queue. + */ +static int uv__write_req_update(uv_stream_t* stream, + uv_write_t* req, + size_t n) { + uv_buf_t* buf; + size_t len; + + assert(n <= stream->write_queue_size); + stream->write_queue_size -= n; + + buf = req->bufs + req->write_index; + + while (n > 0) { + len = n < buf->len ? n : buf->len; + buf->base += len; + buf->len -= len; + buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */ + n -= len; + } + + req->write_index = buf - req->bufs; + + return req->write_index == req->nbufs; +} + + static void uv__write_req_finish(uv_write_t* req) { uv_stream_t* stream = req->handle; @@ -829,102 +879,32 @@ start: *pi = fd_to_send; } - do { + do n = sendmsg(uv__stream_fd(stream), &msg, 0); - } -#if defined(__APPLE__) - /* - * Due to a possible kernel bug at least in OS X 10.10 "Yosemite", - * EPROTOTYPE can be returned while trying to write to a socket that is - * shutting down. If we retry the write, we should get the expected EPIPE - * instead. - */ - while (n == -1 && (errno == EINTR || errno == EPROTOTYPE)); -#else - while (n == -1 && errno == EINTR); -#endif - } else { - do { - if (iovcnt == 1) { - n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len); - } else { - n = writev(uv__stream_fd(stream), iov, iovcnt); - } - } -#if defined(__APPLE__) - /* - * Due to a possible kernel bug at least in OS X 10.10 "Yosemite", - * EPROTOTYPE can be returned while trying to write to a socket that is - * shutting down. If we retry the write, we should get the expected EPIPE - * instead. - */ - while (n == -1 && (errno == EINTR || errno == EPROTOTYPE)); -#else - while (n == -1 && errno == EINTR); -#endif - } + while (n == -1 && RETRY_ON_WRITE_ERROR(errno)); - if (n < 0) { - if (!WRITE_RETRY_ON_ERROR(req->send_handle)) { - err = UV__ERR(errno); - goto error; - } else if (stream->flags & UV_STREAM_BLOCKING) { - /* If this is a blocking stream, try again. */ - goto start; - } + /* Ensure the handle isn't sent again in case this is a partial write. */ + if (n >= 0) + req->send_handle = NULL; } else { - /* Successful write */ - - while (n >= 0) { - uv_buf_t* buf = &(req->bufs[req->write_index]); - size_t len = buf->len; - - assert(req->write_index < req->nbufs); - - if ((size_t)n < len) { - buf->base += n; - buf->len -= n; - stream->write_queue_size -= n; - n = 0; - - /* There is more to write. */ - if (stream->flags & UV_STREAM_BLOCKING) { - /* - * If we're blocking then we should not be enabling the write - * watcher - instead we need to try again. - */ - goto start; - } else { - /* Break loop and ensure the watcher is pending. */ - break; - } - - } else { - /* Finished writing the buf at index req->write_index. */ - req->write_index++; - - assert((size_t)n >= len); - n -= len; - - assert(stream->write_queue_size >= len); - stream->write_queue_size -= len; + do + n = uv__writev(uv__stream_fd(stream), iov, iovcnt); + while (n == -1 && RETRY_ON_WRITE_ERROR(errno)); + } - if (req->write_index == req->nbufs) { - /* Then we're done! */ - assert(n == 0); - uv__write_req_finish(req); - /* TODO: start trying to write the next request. */ - return; - } - } - } + if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) { + err = UV__ERR(errno); + goto error; } - /* Either we've counted n down to zero or we've got EAGAIN. */ - assert(n == 0 || n == -1); + if (n > 0 && uv__write_req_update(stream, req, n)) { + uv__write_req_finish(req); + return; /* TODO(bnoordhuis) Start trying to write the next request. */ + } - /* Only non-blocking streams should use the write_watcher. */ - assert(!(stream->flags & UV_STREAM_BLOCKING)); + /* If this is a blocking stream, try again. */ + if (stream->flags & UV_HANDLE_BLOCKING_WRITES) + goto start; /* We're not done. */ uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); @@ -947,10 +927,16 @@ error: static void uv__write_callbacks(uv_stream_t* stream) { uv_write_t* req; QUEUE* q; + QUEUE pq; + + if (QUEUE_EMPTY(&stream->write_completed_queue)) + return; + + QUEUE_MOVE(&stream->write_completed_queue, &pq); - while (!QUEUE_EMPTY(&stream->write_completed_queue)) { + while (!QUEUE_EMPTY(&pq)) { /* Pop a req off write_completed_queue. */ - q = QUEUE_HEAD(&stream->write_completed_queue); + q = QUEUE_HEAD(&pq); req = QUEUE_DATA(q, uv_write_t, queue); QUEUE_REMOVE(q); uv__req_unregister(stream->loop, req); @@ -966,8 +952,6 @@ static void uv__write_callbacks(uv_stream_t* stream) { if (req->cb) req->cb(req, req->error); } - - assert(QUEUE_EMPTY(&stream->write_completed_queue)); } @@ -1015,13 +999,13 @@ uv_handle_type uv__handle_type(int fd) { static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { - stream->flags |= UV_STREAM_READ_EOF; + stream->flags |= UV_HANDLE_READ_EOF; uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); if (!uv__io_active(&stream->io_watcher, POLLOUT)) uv__handle_stop(stream); uv__stream_osx_interrupt_select(stream); stream->read_cb(stream, UV_EOF, buf); - stream->flags &= ~UV_STREAM_READING; + stream->flags &= ~UV_HANDLE_READING; } @@ -1121,6 +1105,7 @@ static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) { #ifdef __clang__ # pragma clang diagnostic push # pragma clang diagnostic ignored "-Wgnu-folding-constant" +# pragma clang diagnostic ignored "-Wvla-extension" #endif static void uv__read(uv_stream_t* stream) { @@ -1132,7 +1117,7 @@ static void uv__read(uv_stream_t* stream) { int err; int is_ipc; - stream->flags &= ~UV_STREAM_READ_PARTIAL; + stream->flags &= ~UV_HANDLE_READ_PARTIAL; /* Prevent loop starvation when the data comes in as fast as (or faster than) * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. @@ -1141,11 +1126,11 @@ static void uv__read(uv_stream_t* stream) { is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc; - /* XXX: Maybe instead of having UV_STREAM_READING we just test if + /* XXX: Maybe instead of having UV_HANDLE_READING we just test if * tcp->read_cb is NULL or not? */ while (stream->read_cb - && (stream->flags & UV_STREAM_READING) + && (stream->flags & UV_HANDLE_READING) && (count-- > 0)) { assert(stream->alloc_cb != NULL); @@ -1186,7 +1171,7 @@ static void uv__read(uv_stream_t* stream) { /* Error */ if (errno == EAGAIN || errno == EWOULDBLOCK) { /* Wait for the next one. */ - if (stream->flags & UV_STREAM_READING) { + if (stream->flags & UV_HANDLE_READING) { uv__io_start(stream->loop, &stream->io_watcher, POLLIN); uv__stream_osx_interrupt_select(stream); } @@ -1199,8 +1184,8 @@ static void uv__read(uv_stream_t* stream) { } else { /* Error. User should call uv_close(). */ stream->read_cb(stream, UV__ERR(errno), &buf); - if (stream->flags & UV_STREAM_READING) { - stream->flags &= ~UV_STREAM_READING; + if (stream->flags & UV_HANDLE_READING) { + stream->flags &= ~UV_HANDLE_READING; uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); if (!uv__io_active(&stream->io_watcher, POLLOUT)) uv__handle_stop(stream); @@ -1250,7 +1235,7 @@ static void uv__read(uv_stream_t* stream) { /* Return if we didn't fill the buffer, there is no more data to read. */ if (nread < buflen) { - stream->flags |= UV_STREAM_READ_PARTIAL; + stream->flags |= UV_HANDLE_READ_PARTIAL; return; } } @@ -1271,9 +1256,9 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { stream->type == UV_TTY || stream->type == UV_NAMED_PIPE); - if (!(stream->flags & UV_STREAM_WRITABLE) || - stream->flags & UV_STREAM_SHUT || - stream->flags & UV_STREAM_SHUTTING || + if (!(stream->flags & UV_HANDLE_WRITABLE) || + stream->flags & UV_HANDLE_SHUT || + stream->flags & UV_HANDLE_SHUTTING || uv__is_closing(stream)) { return UV_ENOTCONN; } @@ -1285,7 +1270,7 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { req->handle = stream; req->cb = cb; stream->shutdown_req = req; - stream->flags |= UV_STREAM_SHUTTING; + stream->flags |= UV_HANDLE_SHUTTING; uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); uv__stream_osx_interrupt_select(stream); @@ -1302,7 +1287,7 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); - assert(!(stream->flags & UV_CLOSING)); + assert(!(stream->flags & UV_HANDLE_CLOSING)); if (stream->connect_req) { uv__stream_connect(stream); @@ -1311,7 +1296,7 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { assert(uv__stream_fd(stream) >= 0); - /* Ignore POLLHUP here. Even it it's set, there may still be data to read. */ + /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */ if (events & (POLLIN | POLLERR | POLLHUP)) uv__read(stream); @@ -1325,9 +1310,9 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { * report the EOF yet because there is still data to read. */ if ((events & POLLHUP) && - (stream->flags & UV_STREAM_READING) && - (stream->flags & UV_STREAM_READ_PARTIAL) && - !(stream->flags & UV_STREAM_READ_EOF)) { + (stream->flags & UV_HANDLE_READING) && + (stream->flags & UV_HANDLE_READ_PARTIAL) && + !(stream->flags & UV_HANDLE_READ_EOF)) { uv_buf_t buf = { NULL, 0 }; uv__stream_eof(stream, &buf); } @@ -1417,7 +1402,7 @@ int uv_write2(uv_write_t* req, if (uv__stream_fd(stream) < 0) return UV_EBADF; - if (!(stream->flags & UV_STREAM_WRITABLE)) + if (!(stream->flags & UV_HANDLE_WRITABLE)) return -EPIPE; if (send_handle) { @@ -1487,7 +1472,7 @@ int uv_write2(uv_write_t* req, * if this assert fires then somehow the blocking stream isn't being * sufficiently flushed in uv__write. */ - assert(!(stream->flags & UV_STREAM_BLOCKING)); + assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES)); uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); uv__stream_osx_interrupt_select(stream); } @@ -1568,16 +1553,16 @@ int uv_read_start(uv_stream_t* stream, assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); - if (stream->flags & UV_CLOSING) + if (stream->flags & UV_HANDLE_CLOSING) return UV_EINVAL; - if (!(stream->flags & UV_STREAM_READABLE)) + if (!(stream->flags & UV_HANDLE_READABLE)) return -ENOTCONN; - /* The UV_STREAM_READING flag is irrelevant of the state of the tcp - it just + /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just * expresses the desired state of the user. */ - stream->flags |= UV_STREAM_READING; + stream->flags |= UV_HANDLE_READING; /* TODO: try to do the read inline? */ /* TODO: keep track of tcp state. If we've gotten a EOF then we should @@ -1598,10 +1583,10 @@ int uv_read_start(uv_stream_t* stream, int uv_read_stop(uv_stream_t* stream) { - if (!(stream->flags & UV_STREAM_READING)) + if (!(stream->flags & UV_HANDLE_READING)) return 0; - stream->flags &= ~UV_STREAM_READING; + stream->flags &= ~UV_HANDLE_READING; uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); if (!uv__io_active(&stream->io_watcher, POLLOUT)) uv__handle_stop(stream); @@ -1614,12 +1599,12 @@ int uv_read_stop(uv_stream_t* stream) { int uv_is_readable(const uv_stream_t* stream) { - return !!(stream->flags & UV_STREAM_READABLE); + return !!(stream->flags & UV_HANDLE_READABLE); } int uv_is_writable(const uv_stream_t* stream) { - return !!(stream->flags & UV_STREAM_WRITABLE); + return !!(stream->flags & UV_HANDLE_WRITABLE); } @@ -1668,6 +1653,7 @@ void uv__stream_close(uv_stream_t* handle) { uv__io_close(handle->loop, &handle->io_watcher); uv_read_stop(handle); uv__handle_stop(handle); + handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); if (handle->io_watcher.fd != -1) { /* Don't close stdio file descriptors. Nothing good comes from it. */ |