summaryrefslogtreecommitdiffstats
path: root/Utilities/cmlibuv/src/unix/udp.c
diff options
context:
space:
mode:
Diffstat (limited to 'Utilities/cmlibuv/src/unix/udp.c')
-rw-r--r--Utilities/cmlibuv/src/unix/udp.c1335
1 files changed, 1335 insertions, 0 deletions
diff --git a/Utilities/cmlibuv/src/unix/udp.c b/Utilities/cmlibuv/src/unix/udp.c
new file mode 100644
index 0000000..f2fcae1
--- /dev/null
+++ b/Utilities/cmlibuv/src/unix/udp.c
@@ -0,0 +1,1335 @@
+/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include "uv.h"
+#include "internal.h"
+
+#include <assert.h>
+#include <string.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <unistd.h>
+#if defined(__MVS__)
+#include <xti.h>
+#endif
+#include <sys/un.h>
+
+#define UV__UDP_DGRAM_MAXSIZE (64 * 1024)
+
+#if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
+# define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
+#endif
+
+#if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP)
+# define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
+#endif
+
+
+static void uv__udp_run_completed(uv_udp_t* handle);
+static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
+static void uv__udp_recvmsg(uv_udp_t* handle);
+static void uv__udp_sendmsg(uv_udp_t* handle);
+static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
+ int domain,
+ unsigned int flags);
+
+#if HAVE_MMSG
+
+#define UV__MMSG_MAXWIDTH 20
+
+static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf);
+static void uv__udp_sendmmsg(uv_udp_t* handle);
+
+static int uv__recvmmsg_avail;
+static int uv__sendmmsg_avail;
+static uv_once_t once = UV_ONCE_INIT;
+
+static void uv__udp_mmsg_init(void) {
+ int ret;
+ int s;
+ s = uv__socket(AF_INET, SOCK_DGRAM, 0);
+ if (s < 0)
+ return;
+ ret = uv__sendmmsg(s, NULL, 0, 0);
+ if (ret == 0 || errno != ENOSYS) {
+ uv__sendmmsg_avail = 1;
+ uv__recvmmsg_avail = 1;
+ } else {
+ ret = uv__recvmmsg(s, NULL, 0, 0, NULL);
+ if (ret == 0 || errno != ENOSYS)
+ uv__recvmmsg_avail = 1;
+ }
+ uv__close(s);
+}
+
+#endif
+
+void uv__udp_close(uv_udp_t* handle) {
+ uv__io_close(handle->loop, &handle->io_watcher);
+ uv__handle_stop(handle);
+
+ if (handle->io_watcher.fd != -1) {
+ uv__close(handle->io_watcher.fd);
+ handle->io_watcher.fd = -1;
+ }
+}
+
+
+void uv__udp_finish_close(uv_udp_t* handle) {
+ uv_udp_send_t* req;
+ QUEUE* q;
+
+ assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
+ assert(handle->io_watcher.fd == -1);
+
+ while (!QUEUE_EMPTY(&handle->write_queue)) {
+ q = QUEUE_HEAD(&handle->write_queue);
+ QUEUE_REMOVE(q);
+
+ req = QUEUE_DATA(q, uv_udp_send_t, queue);
+ req->status = UV_ECANCELED;
+ QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
+ }
+
+ uv__udp_run_completed(handle);
+
+ assert(handle->send_queue_size == 0);
+ assert(handle->send_queue_count == 0);
+
+ /* Now tear down the handle. */
+ handle->recv_cb = NULL;
+ handle->alloc_cb = NULL;
+ /* but _do not_ touch close_cb */
+}
+
+
+static void uv__udp_run_completed(uv_udp_t* handle) {
+ uv_udp_send_t* req;
+ QUEUE* q;
+
+ assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
+ handle->flags |= UV_HANDLE_UDP_PROCESSING;
+
+ while (!QUEUE_EMPTY(&handle->write_completed_queue)) {
+ q = QUEUE_HEAD(&handle->write_completed_queue);
+ QUEUE_REMOVE(q);
+
+ req = QUEUE_DATA(q, uv_udp_send_t, queue);
+ uv__req_unregister(handle->loop, req);
+
+ handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
+ handle->send_queue_count--;
+
+ if (req->bufs != req->bufsml)
+ uv__free(req->bufs);
+ req->bufs = NULL;
+
+ if (req->send_cb == NULL)
+ continue;
+
+ /* req->status >= 0 == bytes written
+ * req->status < 0 == errno
+ */
+ if (req->status >= 0)
+ req->send_cb(req, 0);
+ else
+ req->send_cb(req, req->status);
+ }
+
+ if (QUEUE_EMPTY(&handle->write_queue)) {
+ /* Pending queue and completion queue empty, stop watcher. */
+ uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
+ if (!uv__io_active(&handle->io_watcher, POLLIN))
+ uv__handle_stop(handle);
+ }
+
+ handle->flags &= ~UV_HANDLE_UDP_PROCESSING;
+}
+
+
+static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
+ uv_udp_t* handle;
+
+ handle = container_of(w, uv_udp_t, io_watcher);
+ assert(handle->type == UV_UDP);
+
+ if (revents & POLLIN)
+ uv__udp_recvmsg(handle);
+
+ if (revents & POLLOUT) {
+ uv__udp_sendmsg(handle);
+ uv__udp_run_completed(handle);
+ }
+}
+
+#if HAVE_MMSG
+static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) {
+ struct sockaddr_in6 peers[UV__MMSG_MAXWIDTH];
+ struct iovec iov[UV__MMSG_MAXWIDTH];
+ struct uv__mmsghdr msgs[UV__MMSG_MAXWIDTH];
+ ssize_t nread;
+ uv_buf_t chunk_buf;
+ size_t chunks;
+ int flags;
+ size_t k;
+
+ /* prepare structures for recvmmsg */
+ chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
+ if (chunks > ARRAY_SIZE(iov))
+ chunks = ARRAY_SIZE(iov);
+ for (k = 0; k < chunks; ++k) {
+ iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
+ iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
+ msgs[k].msg_hdr.msg_iov = iov + k;
+ msgs[k].msg_hdr.msg_iovlen = 1;
+ msgs[k].msg_hdr.msg_name = peers + k;
+ msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]);
+ }
+
+ do
+ nread = uv__recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL);
+ while (nread == -1 && errno == EINTR);
+
+ if (nread < 1) {
+ if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
+ handle->recv_cb(handle, 0, buf, NULL, 0);
+ else
+ handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
+ } else {
+ /* pass each chunk to the application */
+ for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
+ flags = UV_UDP_MMSG_CHUNK;
+ if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
+ flags |= UV_UDP_PARTIAL;
+
+ chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
+ handle->recv_cb(handle,
+ msgs[k].msg_len,
+ &chunk_buf,
+ msgs[k].msg_hdr.msg_name,
+ flags);
+ }
+
+ /* one last callback so the original buffer is freed */
+ if (handle->recv_cb != NULL)
+ handle->recv_cb(handle, 0, buf, NULL, 0);
+ }
+ return nread;
+}
+#endif
+
+static void uv__udp_recvmsg(uv_udp_t* handle) {
+ struct sockaddr_storage peer;
+ struct msghdr h;
+ ssize_t nread;
+ uv_buf_t buf;
+ int flags;
+ int count;
+
+ assert(handle->recv_cb != NULL);
+ assert(handle->alloc_cb != NULL);
+
+ /* Prevent loop starvation when the data comes in as fast as (or faster than)
+ * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
+ */
+ count = 32;
+
+ do {
+ buf = uv_buf_init(NULL, 0);
+ handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
+ if (buf.base == NULL || buf.len == 0) {
+ handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
+ return;
+ }
+ assert(buf.base != NULL);
+
+#if HAVE_MMSG
+ uv_once(&once, uv__udp_mmsg_init);
+ if (uv__recvmmsg_avail) {
+ /* Returned space for more than 1 datagram, use it to receive
+ * multiple datagrams. */
+ if (buf.len >= 2 * UV__UDP_DGRAM_MAXSIZE) {
+ nread = uv__udp_recvmmsg(handle, &buf);
+ if (nread > 0)
+ count -= nread;
+ continue;
+ }
+ }
+#endif
+
+ memset(&h, 0, sizeof(h));
+ memset(&peer, 0, sizeof(peer));
+ h.msg_name = &peer;
+ h.msg_namelen = sizeof(peer);
+ h.msg_iov = (void*) &buf;
+ h.msg_iovlen = 1;
+
+ do {
+ nread = recvmsg(handle->io_watcher.fd, &h, 0);
+ }
+ while (nread == -1 && errno == EINTR);
+
+ if (nread == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ handle->recv_cb(handle, 0, &buf, NULL, 0);
+ else
+ handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
+ }
+ else {
+ flags = 0;
+ if (h.msg_flags & MSG_TRUNC)
+ flags |= UV_UDP_PARTIAL;
+
+ handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
+ }
+ count--;
+ }
+ /* recv_cb callback may decide to pause or close the handle */
+ while (nread != -1
+ && count > 0
+ && handle->io_watcher.fd != -1
+ && handle->recv_cb != NULL);
+}
+
+#if HAVE_MMSG
+static void uv__udp_sendmmsg(uv_udp_t* handle) {
+ uv_udp_send_t* req;
+ struct uv__mmsghdr h[UV__MMSG_MAXWIDTH];
+ struct uv__mmsghdr *p;
+ QUEUE* q;
+ ssize_t npkts;
+ size_t pkts;
+ size_t i;
+
+ if (QUEUE_EMPTY(&handle->write_queue))
+ return;
+
+write_queue_drain:
+ for (pkts = 0, q = QUEUE_HEAD(&handle->write_queue);
+ pkts < UV__MMSG_MAXWIDTH && q != &handle->write_queue;
+ ++pkts, q = QUEUE_HEAD(q)) {
+ assert(q != NULL);
+ req = QUEUE_DATA(q, uv_udp_send_t, queue);
+ assert(req != NULL);
+
+ p = &h[pkts];
+ memset(p, 0, sizeof(*p));
+ if (req->addr.ss_family == AF_UNSPEC) {
+ p->msg_hdr.msg_name = NULL;
+ p->msg_hdr.msg_namelen = 0;
+ } else {
+ p->msg_hdr.msg_name = &req->addr;
+ if (req->addr.ss_family == AF_INET6)
+ p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6);
+ else if (req->addr.ss_family == AF_INET)
+ p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in);
+ else if (req->addr.ss_family == AF_UNIX)
+ p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un);
+ else {
+ assert(0 && "unsupported address family");
+ abort();
+ }
+ }
+ h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs;
+ h[pkts].msg_hdr.msg_iovlen = req->nbufs;
+ }
+
+ do
+ npkts = uv__sendmmsg(handle->io_watcher.fd, h, pkts, 0);
+ while (npkts == -1 && errno == EINTR);
+
+ if (npkts < 1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
+ return;
+ for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
+ i < pkts && q != &handle->write_queue;
+ ++i, q = QUEUE_HEAD(q)) {
+ assert(q != NULL);
+ req = QUEUE_DATA(q, uv_udp_send_t, queue);
+ assert(req != NULL);
+
+ req->status = UV__ERR(errno);
+ QUEUE_REMOVE(&req->queue);
+ QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
+ }
+ uv__io_feed(handle->loop, &handle->io_watcher);
+ return;
+ }
+
+ for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
+ i < pkts && q != &handle->write_queue;
+ ++i, q = QUEUE_HEAD(&handle->write_queue)) {
+ assert(q != NULL);
+ req = QUEUE_DATA(q, uv_udp_send_t, queue);
+ assert(req != NULL);
+
+ req->status = req->bufs[0].len;
+
+ /* Sending a datagram is an atomic operation: either all data
+ * is written or nothing is (and EMSGSIZE is raised). That is
+ * why we don't handle partial writes. Just pop the request
+ * off the write queue and onto the completed queue, done.
+ */
+ QUEUE_REMOVE(&req->queue);
+ QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
+ }
+
+ /* couldn't batch everything, continue sending (jump to avoid stack growth) */
+ if (!QUEUE_EMPTY(&handle->write_queue))
+ goto write_queue_drain;
+ uv__io_feed(handle->loop, &handle->io_watcher);
+ return;
+}
+#endif
+
+static void uv__udp_sendmsg(uv_udp_t* handle) {
+ uv_udp_send_t* req;
+ struct msghdr h;
+ QUEUE* q;
+ ssize_t size;
+
+#if HAVE_MMSG
+ uv_once(&once, uv__udp_mmsg_init);
+ if (uv__sendmmsg_avail) {
+ uv__udp_sendmmsg(handle);
+ return;
+ }
+#endif
+
+ while (!QUEUE_EMPTY(&handle->write_queue)) {
+ q = QUEUE_HEAD(&handle->write_queue);
+ assert(q != NULL);
+
+ req = QUEUE_DATA(q, uv_udp_send_t, queue);
+ assert(req != NULL);
+
+ memset(&h, 0, sizeof h);
+ if (req->addr.ss_family == AF_UNSPEC) {
+ h.msg_name = NULL;
+ h.msg_namelen = 0;
+ } else {
+ h.msg_name = &req->addr;
+ if (req->addr.ss_family == AF_INET6)
+ h.msg_namelen = sizeof(struct sockaddr_in6);
+ else if (req->addr.ss_family == AF_INET)
+ h.msg_namelen = sizeof(struct sockaddr_in);
+ else if (req->addr.ss_family == AF_UNIX)
+ h.msg_namelen = sizeof(struct sockaddr_un);
+ else {
+ assert(0 && "unsupported address family");
+ abort();
+ }
+ }
+ h.msg_iov = (struct iovec*) req->bufs;
+ h.msg_iovlen = req->nbufs;
+
+ do {
+ size = sendmsg(handle->io_watcher.fd, &h, 0);
+ } while (size == -1 && errno == EINTR);
+
+ if (size == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
+ break;
+ }
+
+ req->status = (size == -1 ? UV__ERR(errno) : size);
+
+ /* Sending a datagram is an atomic operation: either all data
+ * is written or nothing is (and EMSGSIZE is raised). That is
+ * why we don't handle partial writes. Just pop the request
+ * off the write queue and onto the completed queue, done.
+ */
+ QUEUE_REMOVE(&req->queue);
+ QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
+ uv__io_feed(handle->loop, &handle->io_watcher);
+ }
+}
+
+/* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
+ * refinements for programs that use multicast.
+ *
+ * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
+ * are different from the BSDs: it _shares_ the port rather than steal it
+ * from the current listener. While useful, it's not something we can emulate
+ * on other platforms so we don't enable it.
+ *
+ * zOS does not support getsockname with SO_REUSEPORT option when using
+ * AF_UNIX.
+ */
+static int uv__set_reuse(int fd) {
+ int yes;
+ yes = 1;
+
+#if defined(SO_REUSEPORT) && defined(__MVS__)
+ struct sockaddr_in sockfd;
+ unsigned int sockfd_len = sizeof(sockfd);
+ if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1)
+ return UV__ERR(errno);
+ if (sockfd.sin_family == AF_UNIX) {
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
+ return UV__ERR(errno);
+ } else {
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
+ return UV__ERR(errno);
+ }
+#elif defined(SO_REUSEPORT) && !defined(__linux__)
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
+ return UV__ERR(errno);
+#else
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
+ return UV__ERR(errno);
+#endif
+
+ return 0;
+}
+
+
+int uv__udp_bind(uv_udp_t* handle,
+ const struct sockaddr* addr,
+ unsigned int addrlen,
+ unsigned int flags) {
+ int err;
+ int yes;
+ int fd;
+
+ /* Check for bad flags. */
+ if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR))
+ return UV_EINVAL;
+
+ /* Cannot set IPv6-only mode on non-IPv6 socket. */
+ if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6)
+ return UV_EINVAL;
+
+ fd = handle->io_watcher.fd;
+ if (fd == -1) {
+ err = uv__socket(addr->sa_family, SOCK_DGRAM, 0);
+ if (err < 0)
+ return err;
+ fd = err;
+ handle->io_watcher.fd = fd;
+ }
+
+ if (flags & UV_UDP_REUSEADDR) {
+ err = uv__set_reuse(fd);
+ if (err)
+ return err;
+ }
+
+ if (flags & UV_UDP_IPV6ONLY) {
+#ifdef IPV6_V6ONLY
+ yes = 1;
+ if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
+ err = UV__ERR(errno);
+ return err;
+ }
+#else
+ err = UV_ENOTSUP;
+ return err;
+#endif
+ }
+
+ if (bind(fd, addr, addrlen)) {
+ err = UV__ERR(errno);
+ if (errno == EAFNOSUPPORT)
+ /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
+ * socket created with AF_INET to an AF_INET6 address or vice versa. */
+ err = UV_EINVAL;
+ return err;
+ }
+
+ if (addr->sa_family == AF_INET6)
+ handle->flags |= UV_HANDLE_IPV6;
+
+ handle->flags |= UV_HANDLE_BOUND;
+ return 0;
+}
+
+
+static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
+ int domain,
+ unsigned int flags) {
+ union {
+ struct sockaddr_in6 in6;
+ struct sockaddr_in in;
+ struct sockaddr addr;
+ } taddr;
+ socklen_t addrlen;
+
+ if (handle->io_watcher.fd != -1)
+ return 0;
+
+ switch (domain) {
+ case AF_INET:
+ {
+ struct sockaddr_in* addr = &taddr.in;
+ memset(addr, 0, sizeof *addr);
+ addr->sin_family = AF_INET;
+ addr->sin_addr.s_addr = INADDR_ANY;
+ addrlen = sizeof *addr;
+ break;
+ }
+ case AF_INET6:
+ {
+ struct sockaddr_in6* addr = &taddr.in6;
+ memset(addr, 0, sizeof *addr);
+ addr->sin6_family = AF_INET6;
+ addr->sin6_addr = in6addr_any;
+ addrlen = sizeof *addr;
+ break;
+ }
+ default:
+ assert(0 && "unsupported address family");
+ abort();
+ }
+
+ return uv__udp_bind(handle, &taddr.addr, addrlen, flags);
+}
+
+
+int uv__udp_connect(uv_udp_t* handle,
+ const struct sockaddr* addr,
+ unsigned int addrlen) {
+ int err;
+
+ err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
+ if (err)
+ return err;
+
+ do {
+ errno = 0;
+ err = connect(handle->io_watcher.fd, addr, addrlen);
+ } while (err == -1 && errno == EINTR);
+
+ if (err)
+ return UV__ERR(errno);
+
+ handle->flags |= UV_HANDLE_UDP_CONNECTED;
+
+ return 0;
+}
+
+
+int uv__udp_disconnect(uv_udp_t* handle) {
+ int r;
+ struct sockaddr addr;
+
+ memset(&addr, 0, sizeof(addr));
+
+ addr.sa_family = AF_UNSPEC;
+
+ do {
+ errno = 0;
+ r = connect(handle->io_watcher.fd, &addr, sizeof(addr));
+ } while (r == -1 && errno == EINTR);
+
+ if (r == -1 && errno != EAFNOSUPPORT)
+ return UV__ERR(errno);
+
+ handle->flags &= ~UV_HANDLE_UDP_CONNECTED;
+ return 0;
+}
+
+
+int uv__udp_send(uv_udp_send_t* req,
+ uv_udp_t* handle,
+ const uv_buf_t bufs[],
+ unsigned int nbufs,
+ const struct sockaddr* addr,
+ unsigned int addrlen,
+ uv_udp_send_cb send_cb) {
+ int err;
+ int empty_queue;
+
+ assert(nbufs > 0);
+
+ if (addr) {
+ err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
+ if (err)
+ return err;
+ }
+
+ /* It's legal for send_queue_count > 0 even when the write_queue is empty;
+ * it means there are error-state requests in the write_completed_queue that
+ * will touch up send_queue_size/count later.
+ */
+ empty_queue = (handle->send_queue_count == 0);
+
+ uv__req_init(handle->loop, req, UV_UDP_SEND);
+ assert(addrlen <= sizeof(req->addr));
+ if (addr == NULL)
+ req->addr.ss_family = AF_UNSPEC;
+ else
+ memcpy(&req->addr, addr, addrlen);
+ req->send_cb = send_cb;
+ req->handle = handle;
+ req->nbufs = nbufs;
+
+ req->bufs = req->bufsml;
+ if (nbufs > ARRAY_SIZE(req->bufsml))
+ req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
+
+ if (req->bufs == NULL) {
+ uv__req_unregister(handle->loop, req);
+ return UV_ENOMEM;
+ }
+
+ memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
+ handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
+ handle->send_queue_count++;
+ QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
+ uv__handle_start(handle);
+
+ if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
+ uv__udp_sendmsg(handle);
+
+ /* `uv__udp_sendmsg` may not be able to do non-blocking write straight
+ * away. In such cases the `io_watcher` has to be queued for asynchronous
+ * write.
+ */
+ if (!QUEUE_EMPTY(&handle->write_queue))
+ uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
+ } else {
+ uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
+ }
+
+ return 0;
+}
+
+
+int uv__udp_try_send(uv_udp_t* handle,
+ const uv_buf_t bufs[],
+ unsigned int nbufs,
+ const struct sockaddr* addr,
+ unsigned int addrlen) {
+ int err;
+ struct msghdr h;
+ ssize_t size;
+
+ assert(nbufs > 0);
+
+ /* already sending a message */
+ if (handle->send_queue_count != 0)
+ return UV_EAGAIN;
+
+ if (addr) {
+ err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
+ if (err)
+ return err;
+ } else {
+ assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
+ }
+
+ memset(&h, 0, sizeof h);
+ h.msg_name = (struct sockaddr*) addr;
+ h.msg_namelen = addrlen;
+ h.msg_iov = (struct iovec*) bufs;
+ h.msg_iovlen = nbufs;
+
+ do {
+ size = sendmsg(handle->io_watcher.fd, &h, 0);
+ } while (size == -1 && errno == EINTR);
+
+ if (size == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
+ return UV_EAGAIN;
+ else
+ return UV__ERR(errno);
+ }
+
+ return size;
+}
+
+
+static int uv__udp_set_membership4(uv_udp_t* handle,
+ const struct sockaddr_in* multicast_addr,
+ const char* interface_addr,
+ uv_membership membership) {
+ struct ip_mreq mreq;
+ int optname;
+ int err;
+
+ memset(&mreq, 0, sizeof mreq);
+
+ if (interface_addr) {
+ err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
+ if (err)
+ return err;
+ } else {
+ mreq.imr_interface.s_addr = htonl(INADDR_ANY);
+ }
+
+ mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
+
+ switch (membership) {
+ case UV_JOIN_GROUP:
+ optname = IP_ADD_MEMBERSHIP;
+ break;
+ case UV_LEAVE_GROUP:
+ optname = IP_DROP_MEMBERSHIP;
+ break;
+ default:
+ return UV_EINVAL;
+ }
+
+ if (setsockopt(handle->io_watcher.fd,
+ IPPROTO_IP,
+ optname,
+ &mreq,
+ sizeof(mreq))) {
+#if defined(__MVS__)
+ if (errno == ENXIO)
+ return UV_ENODEV;
+#endif
+ return UV__ERR(errno);
+ }
+
+ return 0;
+}
+
+
+static int uv__udp_set_membership6(uv_udp_t* handle,
+ const struct sockaddr_in6* multicast_addr,
+ const char* interface_addr,
+ uv_membership membership) {
+ int optname;
+ struct ipv6_mreq mreq;
+ struct sockaddr_in6 addr6;
+
+ memset(&mreq, 0, sizeof mreq);
+
+ if (interface_addr) {
+ if (uv_ip6_addr(interface_addr, 0, &addr6))
+ return UV_EINVAL;
+ mreq.ipv6mr_interface = addr6.sin6_scope_id;
+ } else {
+ mreq.ipv6mr_interface = 0;
+ }
+
+ mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr;
+
+ switch (membership) {
+ case UV_JOIN_GROUP:
+ optname = IPV6_ADD_MEMBERSHIP;
+ break;
+ case UV_LEAVE_GROUP:
+ optname = IPV6_DROP_MEMBERSHIP;
+ break;
+ default:
+ return UV_EINVAL;
+ }
+
+ if (setsockopt(handle->io_watcher.fd,
+ IPPROTO_IPV6,
+ optname,
+ &mreq,
+ sizeof(mreq))) {
+#if defined(__MVS__)
+ if (errno == ENXIO)
+ return UV_ENODEV;
+#endif
+ return UV__ERR(errno);
+ }
+
+ return 0;
+}
+
+
+#if !defined(__OpenBSD__) && !defined(__NetBSD__) && !defined(__ANDROID__)
+static int uv__udp_set_source_membership4(uv_udp_t* handle,
+ const struct sockaddr_in* multicast_addr,
+ const char* interface_addr,
+ const struct sockaddr_in* source_addr,
+ uv_membership membership) {
+ struct ip_mreq_source mreq;
+ int optname;
+ int err;
+
+ err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
+ if (err)
+ return err;
+
+ memset(&mreq, 0, sizeof(mreq));
+
+ if (interface_addr != NULL) {
+ err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
+ if (err)
+ return err;
+ } else {
+ mreq.imr_interface.s_addr = htonl(INADDR_ANY);
+ }
+
+ mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
+ mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
+
+ if (membership == UV_JOIN_GROUP)
+ optname = IP_ADD_SOURCE_MEMBERSHIP;
+ else if (membership == UV_LEAVE_GROUP)
+ optname = IP_DROP_SOURCE_MEMBERSHIP;
+ else
+ return UV_EINVAL;
+
+ if (setsockopt(handle->io_watcher.fd,
+ IPPROTO_IP,
+ optname,
+ &mreq,
+ sizeof(mreq))) {
+ return UV__ERR(errno);
+ }
+
+ return 0;
+}
+
+
+static int uv__udp_set_source_membership6(uv_udp_t* handle,
+ const struct sockaddr_in6* multicast_addr,
+ const char* interface_addr,
+ const struct sockaddr_in6* source_addr,
+ uv_membership membership) {
+ struct group_source_req mreq;
+ struct sockaddr_in6 addr6;
+ int optname;
+ int err;
+
+ err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
+ if (err)
+ return err;
+
+ memset(&mreq, 0, sizeof(mreq));
+
+ if (interface_addr != NULL) {
+ err = uv_ip6_addr(interface_addr, 0, &addr6);
+ if (err)
+ return err;
+ mreq.gsr_interface = addr6.sin6_scope_id;
+ } else {
+ mreq.gsr_interface = 0;
+ }
+
+ memcpy(&mreq.gsr_group, multicast_addr, sizeof(mreq.gsr_group));
+ memcpy(&mreq.gsr_source, source_addr, sizeof(mreq.gsr_source));
+
+ if (membership == UV_JOIN_GROUP)
+ optname = MCAST_JOIN_SOURCE_GROUP;
+ else if (membership == UV_LEAVE_GROUP)
+ optname = MCAST_LEAVE_SOURCE_GROUP;
+ else
+ return UV_EINVAL;
+
+ if (setsockopt(handle->io_watcher.fd,
+ IPPROTO_IPV6,
+ optname,
+ &mreq,
+ sizeof(mreq))) {
+ return UV__ERR(errno);
+ }
+
+ return 0;
+}
+#endif
+
+
+int uv_udp_init_ex(uv_loop_t* loop, uv_udp_t* handle, unsigned int flags) {
+ int domain;
+ int err;
+ int fd;
+
+ /* Use the lower 8 bits for the domain */
+ domain = flags & 0xFF;
+ if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
+ return UV_EINVAL;
+
+ if (flags & ~0xFF)
+ return UV_EINVAL;
+
+ if (domain != AF_UNSPEC) {
+ err = uv__socket(domain, SOCK_DGRAM, 0);
+ if (err < 0)
+ return err;
+ fd = err;
+ } else {
+ fd = -1;
+ }
+
+ uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
+ handle->alloc_cb = NULL;
+ handle->recv_cb = NULL;
+ handle->send_queue_size = 0;
+ handle->send_queue_count = 0;
+ uv__io_init(&handle->io_watcher, uv__udp_io, fd);
+ QUEUE_INIT(&handle->write_queue);
+ QUEUE_INIT(&handle->write_completed_queue);
+
+ return 0;
+}
+
+
+int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
+ return uv_udp_init_ex(loop, handle, AF_UNSPEC);
+}
+
+
+int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
+ int err;
+
+ /* Check for already active socket. */
+ if (handle->io_watcher.fd != -1)
+ return UV_EBUSY;
+
+ if (uv__fd_exists(handle->loop, sock))
+ return UV_EEXIST;
+
+ err = uv__nonblock(sock, 1);
+ if (err)
+ return err;
+
+ err = uv__set_reuse(sock);
+ if (err)
+ return err;
+
+ handle->io_watcher.fd = sock;
+ if (uv__udp_is_connected(handle))
+ handle->flags |= UV_HANDLE_UDP_CONNECTED;
+
+ return 0;
+}
+
+
+int uv_udp_set_membership(uv_udp_t* handle,
+ const char* multicast_addr,
+ const char* interface_addr,
+ uv_membership membership) {
+ int err;
+ struct sockaddr_in addr4;
+ struct sockaddr_in6 addr6;
+
+ if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) {
+ err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
+ if (err)
+ return err;
+ return uv__udp_set_membership4(handle, &addr4, interface_addr, membership);
+ } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) {
+ err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
+ if (err)
+ return err;
+ return uv__udp_set_membership6(handle, &addr6, interface_addr, membership);
+ } else {
+ return UV_EINVAL;
+ }
+}
+
+
+int uv_udp_set_source_membership(uv_udp_t* handle,
+ const char* multicast_addr,
+ const char* interface_addr,
+ const char* source_addr,
+ uv_membership membership) {
+#if !defined(__OpenBSD__) && !defined(__NetBSD__) && !defined(__ANDROID__)
+ int err;
+ struct sockaddr_storage mcast_addr;
+ struct sockaddr_in* mcast_addr4;
+ struct sockaddr_in6* mcast_addr6;
+ struct sockaddr_storage src_addr;
+ struct sockaddr_in* src_addr4;
+ struct sockaddr_in6* src_addr6;
+
+ mcast_addr4 = (struct sockaddr_in*)&mcast_addr;
+ mcast_addr6 = (struct sockaddr_in6*)&mcast_addr;
+ src_addr4 = (struct sockaddr_in*)&src_addr;
+ src_addr6 = (struct sockaddr_in6*)&src_addr;
+
+ err = uv_ip4_addr(multicast_addr, 0, mcast_addr4);
+ if (err) {
+ err = uv_ip6_addr(multicast_addr, 0, mcast_addr6);
+ if (err)
+ return err;
+ err = uv_ip6_addr(source_addr, 0, src_addr6);
+ if (err)
+ return err;
+ return uv__udp_set_source_membership6(handle,
+ mcast_addr6,
+ interface_addr,
+ src_addr6,
+ membership);
+ }
+
+ err = uv_ip4_addr(source_addr, 0, src_addr4);
+ if (err)
+ return err;
+ return uv__udp_set_source_membership4(handle,
+ mcast_addr4,
+ interface_addr,
+ src_addr4,
+ membership);
+#else
+ return UV_ENOSYS;
+#endif
+}
+
+
+static int uv__setsockopt(uv_udp_t* handle,
+ int option4,
+ int option6,
+ const void* val,
+ socklen_t size) {
+ int r;
+
+ if (handle->flags & UV_HANDLE_IPV6)
+ r = setsockopt(handle->io_watcher.fd,
+ IPPROTO_IPV6,
+ option6,
+ val,
+ size);
+ else
+ r = setsockopt(handle->io_watcher.fd,
+ IPPROTO_IP,
+ option4,
+ val,
+ size);
+ if (r)
+ return UV__ERR(errno);
+
+ return 0;
+}
+
+static int uv__setsockopt_maybe_char(uv_udp_t* handle,
+ int option4,
+ int option6,
+ int val) {
+#if defined(__sun) || defined(_AIX) || defined(__MVS__)
+ char arg = val;
+#elif defined(__OpenBSD__)
+ unsigned char arg = val;
+#else
+ int arg = val;
+#endif
+
+ if (val < 0 || val > 255)
+ return UV_EINVAL;
+
+ return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg));
+}
+
+
+int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
+ if (setsockopt(handle->io_watcher.fd,
+ SOL_SOCKET,
+ SO_BROADCAST,
+ &on,
+ sizeof(on))) {
+ return UV__ERR(errno);
+ }
+
+ return 0;
+}
+
+
+int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
+ if (ttl < 1 || ttl > 255)
+ return UV_EINVAL;
+
+#if defined(__MVS__)
+ if (!(handle->flags & UV_HANDLE_IPV6))
+ return UV_ENOTSUP; /* zOS does not support setting ttl for IPv4 */
+#endif
+
+/*
+ * On Solaris and derivatives such as SmartOS, the length of socket options
+ * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS,
+ * so hardcode the size of these options on this platform,
+ * and use the general uv__setsockopt_maybe_char call on other platforms.
+ */
+#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
+ defined(__MVS__)
+
+ return uv__setsockopt(handle,
+ IP_TTL,
+ IPV6_UNICAST_HOPS,
+ &ttl,
+ sizeof(ttl));
+
+#else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
+ defined(__MVS__)) */
+
+ return uv__setsockopt_maybe_char(handle,
+ IP_TTL,
+ IPV6_UNICAST_HOPS,
+ ttl);
+
+#endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
+ defined(__MVS__) */
+}
+
+
+int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
+/*
+ * On Solaris and derivatives such as SmartOS, the length of socket options
+ * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for
+ * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case,
+ * and use the general uv__setsockopt_maybe_char call otherwise.
+ */
+#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
+ defined(__MVS__)
+ if (handle->flags & UV_HANDLE_IPV6)
+ return uv__setsockopt(handle,
+ IP_MULTICAST_TTL,
+ IPV6_MULTICAST_HOPS,
+ &ttl,
+ sizeof(ttl));
+#endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
+ defined(__MVS__) */
+
+ return uv__setsockopt_maybe_char(handle,
+ IP_MULTICAST_TTL,
+ IPV6_MULTICAST_HOPS,
+ ttl);
+}
+
+
+int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
+/*
+ * On Solaris and derivatives such as SmartOS, the length of socket options
+ * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for
+ * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case,
+ * and use the general uv__setsockopt_maybe_char call otherwise.
+ */
+#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
+ defined(__MVS__)
+ if (handle->flags & UV_HANDLE_IPV6)
+ return uv__setsockopt(handle,
+ IP_MULTICAST_LOOP,
+ IPV6_MULTICAST_LOOP,
+ &on,
+ sizeof(on));
+#endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) ||
+ defined(__MVS__) */
+
+ return uv__setsockopt_maybe_char(handle,
+ IP_MULTICAST_LOOP,
+ IPV6_MULTICAST_LOOP,
+ on);
+}
+
+int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) {
+ struct sockaddr_storage addr_st;
+ struct sockaddr_in* addr4;
+ struct sockaddr_in6* addr6;
+
+ addr4 = (struct sockaddr_in*) &addr_st;
+ addr6 = (struct sockaddr_in6*) &addr_st;
+
+ if (!interface_addr) {
+ memset(&addr_st, 0, sizeof addr_st);
+ if (handle->flags & UV_HANDLE_IPV6) {
+ addr_st.ss_family = AF_INET6;
+ addr6->sin6_scope_id = 0;
+ } else {
+ addr_st.ss_family = AF_INET;
+ addr4->sin_addr.s_addr = htonl(INADDR_ANY);
+ }
+ } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) {
+ /* nothing, address was parsed */
+ } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) {
+ /* nothing, address was parsed */
+ } else {
+ return UV_EINVAL;
+ }
+
+ if (addr_st.ss_family == AF_INET) {
+ if (setsockopt(handle->io_watcher.fd,
+ IPPROTO_IP,
+ IP_MULTICAST_IF,
+ (void*) &addr4->sin_addr,
+ sizeof(addr4->sin_addr)) == -1) {
+ return UV__ERR(errno);
+ }
+ } else if (addr_st.ss_family == AF_INET6) {
+ if (setsockopt(handle->io_watcher.fd,
+ IPPROTO_IPV6,
+ IPV6_MULTICAST_IF,
+ &addr6->sin6_scope_id,
+ sizeof(addr6->sin6_scope_id)) == -1) {
+ return UV__ERR(errno);
+ }
+ } else {
+ assert(0 && "unexpected address family");
+ abort();
+ }
+
+ return 0;
+}
+
+int uv_udp_getpeername(const uv_udp_t* handle,
+ struct sockaddr* name,
+ int* namelen) {
+
+ return uv__getsockpeername((const uv_handle_t*) handle,
+ getpeername,
+ name,
+ namelen);
+}
+
+int uv_udp_getsockname(const uv_udp_t* handle,
+ struct sockaddr* name,
+ int* namelen) {
+
+ return uv__getsockpeername((const uv_handle_t*) handle,
+ getsockname,
+ name,
+ namelen);
+}
+
+
+int uv__udp_recv_start(uv_udp_t* handle,
+ uv_alloc_cb alloc_cb,
+ uv_udp_recv_cb recv_cb) {
+ int err;
+
+ if (alloc_cb == NULL || recv_cb == NULL)
+ return UV_EINVAL;
+
+ if (uv__io_active(&handle->io_watcher, POLLIN))
+ return UV_EALREADY; /* FIXME(bnoordhuis) Should be UV_EBUSY. */
+
+ err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);
+ if (err)
+ return err;
+
+ handle->alloc_cb = alloc_cb;
+ handle->recv_cb = recv_cb;
+
+ uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
+ uv__handle_start(handle);
+
+ return 0;
+}
+
+
+int uv__udp_recv_stop(uv_udp_t* handle) {
+ uv__io_stop(handle->loop, &handle->io_watcher, POLLIN);
+
+ if (!uv__io_active(&handle->io_watcher, POLLOUT))
+ uv__handle_stop(handle);
+
+ handle->alloc_cb = NULL;
+ handle->recv_cb = NULL;
+
+ return 0;
+}