/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to * deal in the Software without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or * sell copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS * IN THE SOFTWARE. */ #include "uv.h" #include "internal.h" #include #include #include #include #include #if defined(__MVS__) #include #endif #include #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP) # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP #endif #if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP) # define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP #endif union uv__sockaddr { struct sockaddr_in6 in6; struct sockaddr_in in; struct sockaddr addr; }; static void uv__udp_run_completed(uv_udp_t* handle); static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents); static void uv__udp_recvmsg(uv_udp_t* handle); static void uv__udp_sendmsg(uv_udp_t* handle); static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain, unsigned int flags); #if HAVE_MMSG #define UV__MMSG_MAXWIDTH 20 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf); static void uv__udp_sendmmsg(uv_udp_t* handle); static int uv__recvmmsg_avail; static int uv__sendmmsg_avail; static uv_once_t once = UV_ONCE_INIT; static void uv__udp_mmsg_init(void) { int ret; int s; s = uv__socket(AF_INET, SOCK_DGRAM, 0); if (s < 0) return; ret = uv__sendmmsg(s, NULL, 0); if (ret == 0 || errno != ENOSYS) { uv__sendmmsg_avail = 1; uv__recvmmsg_avail = 1; } else { ret = uv__recvmmsg(s, NULL, 0); if (ret == 0 || errno != ENOSYS) uv__recvmmsg_avail = 1; } uv__close(s); } #endif void uv__udp_close(uv_udp_t* handle) { uv__io_close(handle->loop, &handle->io_watcher); uv__handle_stop(handle); if (handle->io_watcher.fd != -1) { uv__close(handle->io_watcher.fd); handle->io_watcher.fd = -1; } } void uv__udp_finish_close(uv_udp_t* handle) { uv_udp_send_t* req; QUEUE* q; assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT)); assert(handle->io_watcher.fd == -1); while (!QUEUE_EMPTY(&handle->write_queue)) { q = QUEUE_HEAD(&handle->write_queue); QUEUE_REMOVE(q); req = QUEUE_DATA(q, uv_udp_send_t, queue); req->status = UV_ECANCELED; QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); } uv__udp_run_completed(handle); assert(handle->send_queue_size == 0); assert(handle->send_queue_count == 0); /* Now tear down the handle. */ handle->recv_cb = NULL; handle->alloc_cb = NULL; /* but _do not_ touch close_cb */ } static void uv__udp_run_completed(uv_udp_t* handle) { uv_udp_send_t* req; QUEUE* q; assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING)); handle->flags |= UV_HANDLE_UDP_PROCESSING; while (!QUEUE_EMPTY(&handle->write_completed_queue)) { q = QUEUE_HEAD(&handle->write_completed_queue); QUEUE_REMOVE(q); req = QUEUE_DATA(q, uv_udp_send_t, queue); uv__req_unregister(handle->loop, req); handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs); handle->send_queue_count--; if (req->bufs != req->bufsml) uv__free(req->bufs); req->bufs = NULL; if (req->send_cb == NULL) continue; /* req->status >= 0 == bytes written * req->status < 0 == errno */ if (req->status >= 0) req->send_cb(req, 0); else req->send_cb(req, req->status); } if (QUEUE_EMPTY(&handle->write_queue)) { /* Pending queue and completion queue empty, stop watcher. */ uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT); if (!uv__io_active(&handle->io_watcher, POLLIN)) uv__handle_stop(handle); } handle->flags &= ~UV_HANDLE_UDP_PROCESSING; } static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) { uv_udp_t* handle; handle = container_of(w, uv_udp_t, io_watcher); assert(handle->type == UV_UDP); if (revents & POLLIN) uv__udp_recvmsg(handle); if (revents & POLLOUT) { uv__udp_sendmsg(handle); uv__udp_run_completed(handle); } } #if HAVE_MMSG static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) { struct sockaddr_in6 peers[UV__MMSG_MAXWIDTH]; struct iovec iov[UV__MMSG_MAXWIDTH]; struct uv__mmsghdr msgs[UV__MMSG_MAXWIDTH]; ssize_t nread; uv_buf_t chunk_buf; size_t chunks; int flags; size_t k; #ifdef __clang_analyzer__ /* Tell clang-analyzer the array is initialized. The part we use is initialized below. */ memset(iov, 0, sizeof(iov)); #endif /* prepare structures for recvmmsg */ chunks = buf->len / UV__UDP_DGRAM_MAXSIZE; if (chunks > ARRAY_SIZE(iov)) chunks = ARRAY_SIZE(iov); for (k = 0; k < chunks; ++k) { iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE; iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE; memset(&msgs[k].msg_hdr, 0, sizeof(msgs[k].msg_hdr)); msgs[k].msg_hdr.msg_iov = iov + k; msgs[k].msg_hdr.msg_iovlen = 1; msgs[k].msg_hdr.msg_name = peers + k; msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]); msgs[k].msg_hdr.msg_control = NULL; msgs[k].msg_hdr.msg_controllen = 0; msgs[k].msg_hdr.msg_flags = 0; } do nread = uv__recvmmsg(handle->io_watcher.fd, msgs, chunks); while (nread == -1 && errno == EINTR); if (nread < 1) { if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK) handle->recv_cb(handle, 0, buf, NULL, 0); else handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0); } else { /* pass each chunk to the application */ for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) { flags = UV_UDP_MMSG_CHUNK; if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC) flags |= UV_UDP_PARTIAL; chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len); handle->recv_cb(handle, msgs[k].msg_len, &chunk_buf, msgs[k].msg_hdr.msg_name, flags); } /* one last callback so the original buffer is freed */ if (handle->recv_cb != NULL) handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE); } return nread; } #endif static void uv__udp_recvmsg(uv_udp_t* handle) { struct sockaddr_storage peer; struct msghdr h; ssize_t nread; uv_buf_t buf; int flags; int count; assert(handle->recv_cb != NULL); assert(handle->alloc_cb != NULL); /* Prevent loop starvation when the data comes in as fast as (or faster than) * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. */ count = 32; do { buf = uv_buf_init(NULL, 0); handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf); if (buf.base == NULL || buf.len == 0) { handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0); return; } assert(buf.base != NULL); #if HAVE_MMSG if (uv_udp_using_recvmmsg(handle)) { nread = uv__udp_recvmmsg(handle, &buf); if (nread > 0) count -= nread; continue; } #endif memset(&h, 0, sizeof(h)); memset(&peer, 0, sizeof(peer)); h.msg_name = &peer; h.msg_namelen = sizeof(peer); h.msg_iov = (void*) &buf; h.msg_iovlen = 1; do { nread = recvmsg(handle->io_watcher.fd, &h, 0); } while (nread == -1 && errno == EINTR); if (nread == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) handle->recv_cb(handle, 0, &buf, NULL, 0); else handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0); } else { flags = 0; if (h.msg_flags & MSG_TRUNC) flags |= UV_UDP_PARTIAL; handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags); } count--; } /* recv_cb callback may decide to pause or close the handle */ while (nread != -1 && count > 0 && handle->io_watcher.fd != -1 && handle->recv_cb != NULL); } #if HAVE_MMSG static void uv__udp_sendmmsg(uv_udp_t* handle) { uv_udp_send_t* req; struct uv__mmsghdr h[UV__MMSG_MAXWIDTH]; struct uv__mmsghdr *p; QUEUE* q; ssize_t npkts; size_t pkts; size_t i; if (QUEUE_EMPTY(&handle->write_queue)) return; write_queue_drain: for (pkts = 0, q = QUEUE_HEAD(&handle->write_queue); pkts < UV__MMSG_MAXWIDTH && q != &handle->write_queue; ++pkts, q = QUEUE_HEAD(q)) { assert(q != NULL); req = QUEUE_DATA(q, uv_udp_send_t, queue); assert(req != NULL); p = &h[pkts]; memset(p, 0, sizeof(*p)); if (req->addr.ss_family == AF_UNSPEC) { p->msg_hdr.msg_name = NULL; p->msg_hdr.msg_namelen = 0; } else { p->msg_hdr.msg_name = &req->addr; if (req->addr.ss_family == AF_INET6) p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6); else if (req->addr.ss_family == AF_INET) p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in); else if (req->addr.ss_family == AF_UNIX) p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un); else { assert(0 && "unsupported address family"); abort(); } } h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs; h[pkts].msg_hdr.msg_iovlen = req->nbufs; } do npkts = uv__sendmmsg(handle->io_watcher.fd, h, pkts); while (npkts == -1 && errno == EINTR); if (npkts < 1) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) return; for (i = 0, q = QUEUE_HEAD(&handle->write_queue); i < pkts && q != &handle->write_queue; ++i, q = QUEUE_HEAD(&handle->write_queue)) { assert(q != NULL); req = QUEUE_DATA(q, uv_udp_send_t, queue); assert(req != NULL); req->status = UV__ERR(errno); QUEUE_REMOVE(&req->queue); QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); } uv__io_feed(handle->loop, &handle->io_watcher); return; } /* Safety: npkts known to be >0 below. Hence cast from ssize_t * to size_t safe. */ for (i = 0, q = QUEUE_HEAD(&handle->write_queue); i < (size_t)npkts && q != &handle->write_queue; ++i, q = QUEUE_HEAD(&handle->write_queue)) { assert(q != NULL); req = QUEUE_DATA(q, uv_udp_send_t, queue); assert(req != NULL); req->status = req->bufs[0].len; /* Sending a datagram is an atomic operation: either all data * is written or nothing is (and EMSGSIZE is raised). That is * why we don't handle partial writes. Just pop the request * off the write queue and onto the completed queue, done. */ QUEUE_REMOVE(&req->queue); QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); } /* couldn't batch everything, continue sending (jump to avoid stack growth) */ if (!QUEUE_EMPTY(&handle->write_queue)) goto write_queue_drain; uv__io_feed(handle->loop, &handle->io_watcher); return; } #endif static void uv__udp_sendmsg(uv_udp_t* handle) { uv_udp_send_t* req; struct msghdr h; QUEUE* q; ssize_t size; #if HAVE_MMSG uv_once(&once, uv__udp_mmsg_init); if (uv__sendmmsg_avail) { uv__udp_sendmmsg(handle); return; } #endif while (!QUEUE_EMPTY(&handle->write_queue)) { q = QUEUE_HEAD(&handle->write_queue); assert(q != NULL); req = QUEUE_DATA(q, uv_udp_send_t, queue); assert(req != NULL); memset(&h, 0, sizeof h); if (req->addr.ss_family == AF_UNSPEC) { h.msg_name = NULL; h.msg_namelen = 0; } else { h.msg_name = &req->addr; if (req->addr.ss_family == AF_INET6) h.msg_namelen = sizeof(struct sockaddr_in6); else if (req->addr.ss_family == AF_INET) h.msg_namelen = sizeof(struct sockaddr_in); else if (req->addr.ss_family == AF_UNIX) h.msg_namelen = sizeof(struct sockaddr_un); else { assert(0 && "unsupported address family"); abort(); } } h.msg_iov = (struct iovec*) req->bufs; h.msg_iovlen = req->nbufs; do { size = sendmsg(handle->io_watcher.fd, &h, 0); } while (size == -1 && errno == EINTR); if (size == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) break; } req->status = (size == -1 ? UV__ERR(errno) : size); /* Sending a datagram is an atomic operation: either all data * is written or nothing is (and EMSGSIZE is raised). That is * why we don't handle partial writes. Just pop the request * off the write queue and onto the completed queue, done. */ QUEUE_REMOVE(&req->queue); QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); uv__io_feed(handle->loop, &handle->io_watcher); } } /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional * refinements for programs that use multicast. * * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that * are different from the BSDs: it _shares_ the port rather than steal it * from the current listener. While useful, it's not something we can emulate * on other platforms so we don't enable it. * * zOS does not support getsockname with SO_REUSEPORT option when using * AF_UNIX. */ static int uv__set_reuse(int fd) { int yes; yes = 1; #if defined(SO_REUSEPORT) && defined(__MVS__) struct sockaddr_in sockfd; unsigned int sockfd_len = sizeof(sockfd); if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1) return UV__ERR(errno); if (sockfd.sin_family == AF_UNIX) { if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))) return UV__ERR(errno); } else { if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) return UV__ERR(errno); } #elif defined(SO_REUSEPORT) && !defined(__linux__) && !defined(__GNU__) if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) return UV__ERR(errno); #else if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))) return UV__ERR(errno); #endif return 0; } /* * The Linux kernel suppresses some ICMP error messages by default for UDP * sockets. Setting IP_RECVERR/IPV6_RECVERR on the socket enables full ICMP * error reporting, hopefully resulting in faster failover to working name * servers. */ static int uv__set_recverr(int fd, sa_family_t ss_family) { #if defined(__linux__) int yes; yes = 1; if (ss_family == AF_INET) { if (setsockopt(fd, IPPROTO_IP, IP_RECVERR, &yes, sizeof(yes))) return UV__ERR(errno); } else if (ss_family == AF_INET6) { if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVERR, &yes, sizeof(yes))) return UV__ERR(errno); } #endif return 0; } int uv__udp_bind(uv_udp_t* handle, const struct sockaddr* addr, unsigned int addrlen, unsigned int flags) { int err; int yes; int fd; /* Check for bad flags. */ if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR | UV_UDP_LINUX_RECVERR)) return UV_EINVAL; /* Cannot set IPv6-only mode on non-IPv6 socket. */ if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6) return UV_EINVAL; fd = handle->io_watcher.fd; if (fd == -1) { err = uv__socket(addr->sa_family, SOCK_DGRAM, 0); if (err < 0) return err; fd = err; handle->io_watcher.fd = fd; } if (flags & UV_UDP_LINUX_RECVERR) { err = uv__set_recverr(fd, addr->sa_family); if (err) return err; } if (flags & UV_UDP_REUSEADDR) { err = uv__set_reuse(fd); if (err) return err; } if (flags & UV_UDP_IPV6ONLY) { #ifdef IPV6_V6ONLY yes = 1; if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) { err = UV__ERR(errno); return err; } #else err = UV_ENOTSUP; return err; #endif } if (bind(fd, addr, addrlen)) { err = UV__ERR(errno); if (errno == EAFNOSUPPORT) /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a * socket created with AF_INET to an AF_INET6 address or vice versa. */ err = UV_EINVAL; return err; } if (addr->sa_family == AF_INET6) handle->flags |= UV_HANDLE_IPV6; handle->flags |= UV_HANDLE_BOUND; return 0; } static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain, unsigned int flags) { union uv__sockaddr taddr; socklen_t addrlen; if (handle->io_watcher.fd != -1) return 0; switch (domain) { case AF_INET: { struct sockaddr_in* addr = &taddr.in; memset(addr, 0, sizeof *addr); addr->sin_family = AF_INET; addr->sin_addr.s_addr = INADDR_ANY; addrlen = sizeof *addr; break; } case AF_INET6: { struct sockaddr_in6* addr = &taddr.in6; memset(addr, 0, sizeof *addr); addr->sin6_family = AF_INET6; addr->sin6_addr = in6addr_any; addrlen = sizeof *addr; break; } default: assert(0 && "unsupported address family"); abort(); } return uv__udp_bind(handle, &taddr.addr, addrlen, flags); } int uv__udp_connect(uv_udp_t* handle, const struct sockaddr* addr, unsigned int addrlen) { int err; err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0); if (err) return err; do { errno = 0; err = connect(handle->io_watcher.fd, addr, addrlen); } while (err == -1 && errno == EINTR); if (err) return UV__ERR(errno); handle->flags |= UV_HANDLE_UDP_CONNECTED; return 0; } /* From https://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html * Any of uv supported UNIXs kernel should be standardized, but the kernel * implementation logic not same, let's use pseudocode to explain the udp * disconnect behaviors: * * Predefined stubs for pseudocode: * 1. sodisconnect: The function to perform the real udp disconnect * 2. pru_connect: The function to perform the real udp connect * 3. so: The kernel object match with socket fd * 4. addr: The sockaddr parameter from user space * * BSDs: * if(sodisconnect(so) == 0) { // udp disconnect succeed * if (addr->sa_len != so->addr->sa_len) return EINVAL; * if (addr->sa_family != so->addr->sa_family) return EAFNOSUPPORT; * pru_connect(so); * } * else return EISCONN; * * z/OS (same with Windows): * if(addr->sa_len < so->addr->sa_len) return EINVAL; * if (addr->sa_family == AF_UNSPEC) sodisconnect(so); * * AIX: * if(addr->sa_len != sizeof(struct sockaddr)) return EINVAL; // ignore ip proto version * if (addr->sa_family == AF_UNSPEC) sodisconnect(so); * * Linux,Others: * if(addr->sa_len < sizeof(struct sockaddr)) return EINVAL; * if (addr->sa_family == AF_UNSPEC) sodisconnect(so); */ int uv__udp_disconnect(uv_udp_t* handle) { int r; #if defined(__MVS__) struct sockaddr_storage addr; #else struct sockaddr addr; #endif memset(&addr, 0, sizeof(addr)); #if defined(__MVS__) addr.ss_family = AF_UNSPEC; #else addr.sa_family = AF_UNSPEC; #endif do { errno = 0; #ifdef __PASE__ /* On IBMi a connectionless transport socket can be disconnected by * either setting the addr parameter to NULL or setting the * addr_length parameter to zero, and issuing another connect(). * https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/connec.htm */ r = connect(handle->io_watcher.fd, (struct sockaddr*) NULL, 0); #else r = connect(handle->io_watcher.fd, (struct sockaddr*) &addr, sizeof(addr)); #endif } while (r == -1 && errno == EINTR); if (r == -1) { #if defined(BSD) /* The macro BSD is from sys/param.h */ if (errno != EAFNOSUPPORT && errno != EINVAL) return UV__ERR(errno); #else return UV__ERR(errno); #endif } handle->flags &= ~UV_HANDLE_UDP_CONNECTED; return 0; } int uv__udp_send(uv_udp_send_t* req, uv_udp_t* handle, const uv_buf_t bufs[], unsigned int nbufs, const struct sockaddr* addr, unsigned int addrlen, uv_udp_send_cb send_cb) { int err; int empty_queue; assert(nbufs > 0); if (addr) { err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0); if (err) return err; } /* It's legal for send_queue_count > 0 even when the write_queue is empty; * it means there are error-state requests in the write_completed_queue that * will touch up send_queue_size/count later. */ empty_queue = (handle->send_queue_count == 0); uv__req_init(handle->loop, req, UV_UDP_SEND); assert(addrlen <= sizeof(req->addr)); if (addr == NULL) req->addr.ss_family = AF_UNSPEC; else memcpy(&req->addr, addr, addrlen); req->send_cb = send_cb; req->handle = handle; req->nbufs = nbufs; req->bufs = req->bufsml; if (nbufs > ARRAY_SIZE(req->bufsml)) req->bufs = uv__malloc(nbufs * sizeof(bufs[0])); if (req->bufs == NULL) { uv__req_unregister(handle->loop, req); return UV_ENOMEM; } memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0])); handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs); handle->send_queue_count++; QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue); uv__handle_start(handle); if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) { uv__udp_sendmsg(handle); /* `uv__udp_sendmsg` may not be able to do non-blocking write straight * away. In such cases the `io_watcher` has to be queued for asynchronous * write. */ if (!QUEUE_EMPTY(&handle->write_queue)) uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); } else { uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); } return 0; } int uv__udp_try_send(uv_udp_t* handle, const uv_buf_t bufs[], unsigned int nbufs, const struct sockaddr* addr, unsigned int addrlen) { int err; struct msghdr h; ssize_t size; assert(nbufs > 0); /* already sending a message */ if (handle->send_queue_count != 0) return UV_EAGAIN; if (addr) { err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0); if (err) return err; } else { assert(handle->flags & UV_HANDLE_UDP_CONNECTED); } memset(&h, 0, sizeof h); h.msg_name = (struct sockaddr*) addr; h.msg_namelen = addrlen; h.msg_iov = (struct iovec*) bufs; h.msg_iovlen = nbufs; do { size = sendmsg(handle->io_watcher.fd, &h, 0); } while (size == -1 && errno == EINTR); if (size == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) return UV_EAGAIN; else return UV__ERR(errno); } return size; } static int uv__udp_set_membership4(uv_udp_t* handle, const struct sockaddr_in* multicast_addr, const char* interface_addr, uv_membership membership) { struct ip_mreq mreq; int optname; int err; memset(&mreq, 0, sizeof mreq); if (interface_addr) { err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr); if (err) return err; } else { mreq.imr_interface.s_addr = htonl(INADDR_ANY); } mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr; switch (membership) { case UV_JOIN_GROUP: optname = IP_ADD_MEMBERSHIP; break; case UV_LEAVE_GROUP: optname = IP_DROP_MEMBERSHIP; break; default: return UV_EINVAL; } if (setsockopt(handle->io_watcher.fd, IPPROTO_IP, optname, &mreq, sizeof(mreq))) { #if defined(__MVS__) if (errno == ENXIO) return UV_ENODEV; #endif return UV__ERR(errno); } return 0; } static int uv__udp_set_membership6(uv_udp_t* handle, const struct sockaddr_in6* multicast_addr, const char* interface_addr, uv_membership membership) { int optname; struct ipv6_mreq mreq; struct sockaddr_in6 addr6; memset(&mreq, 0, sizeof mreq); if (interface_addr) { if (uv_ip6_addr(interface_addr, 0, &addr6)) return UV_EINVAL; mreq.ipv6mr_interface = addr6.sin6_scope_id; } else { mreq.ipv6mr_interface = 0; } mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr; switch (membership) { case UV_JOIN_GROUP: optname = IPV6_ADD_MEMBERSHIP; break; case UV_LEAVE_GROUP: optname = IPV6_DROP_MEMBERSHIP; break; default: return UV_EINVAL; } if (setsockopt(handle->io_watcher.fd, IPPROTO_IPV6, optname, &mreq, sizeof(mreq))) { #if defined(__MVS__) if (errno == ENXIO) return UV_ENODEV; #endif return UV__ERR(errno); } return 0; } #if !defined(__OpenBSD__) && \ !defined(__NetBSD__) && \ !defined(__ANDROID__) && \ !defined(__DragonFly__) && \ !defined(__QNX__) && \ !defined(__GNU__) static int uv__udp_set_source_membership4(uv_udp_t* handle, const struct sockaddr_in* multicast_addr, const char* interface_addr, const struct sockaddr_in* source_addr, uv_membership membership) { struct ip_mreq_source mreq; int optname; int err; err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR); if (err) return err; memset(&mreq, 0, sizeof(mreq)); if (interface_addr != NULL) { err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr); if (err) return err; } else { mreq.imr_interface.s_addr = htonl(INADDR_ANY); } mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr; mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr; if (membership == UV_JOIN_GROUP) optname = IP_ADD_SOURCE_MEMBERSHIP; else if (membership == UV_LEAVE_GROUP) optname = IP_DROP_SOURCE_MEMBERSHIP; else return UV_EINVAL; if (setsockopt(handle->io_watcher.fd, IPPROTO_IP, optname, &mreq, sizeof(mreq))) { return UV__ERR(errno); } return 0; } static int uv__udp_set_source_membership6(uv_udp_t* handle, const struct sockaddr_in6* multicast_addr, const char* interface_addr, const struct sockaddr_in6* source_addr, uv_membership membership) { struct group_source_req mreq; struct sockaddr_in6 addr6; int optname; int err; err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR); if (err) return err; memset(&mreq, 0, sizeof(mreq)); if (interface_addr != NULL) { err = uv_ip6_addr(interface_addr, 0, &addr6); if (err) return err; mreq.gsr_interface = addr6.sin6_scope_id; } else { mreq.gsr_interface = 0; } STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr)); STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr)); memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr)); memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr)); if (membership == UV_JOIN_GROUP) optname = MCAST_JOIN_SOURCE_GROUP; else if (membership == UV_LEAVE_GROUP) optname = MCAST_LEAVE_SOURCE_GROUP; else return UV_EINVAL; if (setsockopt(handle->io_watcher.fd, IPPROTO_IPV6, optname, &mreq, sizeof(mreq))) { return UV__ERR(errno); } return 0; } #endif int uv__udp_init_ex(uv_loop_t* loop, uv_udp_t* handle, unsigned flags, int domain) { int fd; fd = -1; if (domain != AF_UNSPEC) { fd = uv__socket(domain, SOCK_DGRAM, 0); if (fd < 0) return fd; } uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP); handle->alloc_cb = NULL; handle->recv_cb = NULL; handle->send_queue_size = 0; handle->send_queue_count = 0; uv__io_init(&handle->io_watcher, uv__udp_io, fd); QUEUE_INIT(&handle->write_queue); QUEUE_INIT(&handle->write_completed_queue); return 0; } int uv_udp_using_recvmmsg(const uv_udp_t* handle) { #if HAVE_MMSG if (handle->flags & UV_HANDLE_UDP_RECVMMSG) { uv_once(&once, uv__udp_mmsg_init); return uv__recvmmsg_avail; } #endif return 0; } int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) { int err; /* Check for already active socket. */ if (handle->io_watcher.fd != -1) return UV_EBUSY; if (uv__fd_exists(handle->loop, sock)) return UV_EEXIST; err = uv__nonblock(sock, 1); if (err) return err; err = uv__set_reuse(sock); if (err) return err; handle->io_watcher.fd = sock; if (uv__udp_is_connected(handle)) handle->flags |= UV_HANDLE_UDP_CONNECTED; return 0; } int uv_udp_set_membership(uv_udp_t* handle, const char* multicast_addr, const char* interface_addr, uv_membership membership) { int err; struct sockaddr_in addr4; struct sockaddr_in6 addr6; if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) { err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR); if (err) return err; return uv__udp_set_membership4(handle, &addr4, interface_addr, membership); } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) { err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR); if (err) return err; return uv__udp_set_membership6(handle, &addr6, interface_addr, membership); } else { return UV_EINVAL; } } int uv_udp_set_source_membership(uv_udp_t* handle, const char* multicast_addr, const char* interface_addr, const char* source_addr, uv_membership membership) { #if !defined(__OpenBSD__) && \ !defined(__NetBSD__) && \ !defined(__ANDROID__) && \ !defined(__DragonFly__) && \ !defined(__QNX__) && \ !defined(__GNU__) int err; union uv__sockaddr mcast_addr; union uv__sockaddr src_addr; err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in); if (err) { err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6); if (err) return err; err = uv_ip6_addr(source_addr, 0, &src_addr.in6); if (err) return err; return uv__udp_set_source_membership6(handle, &mcast_addr.in6, interface_addr, &src_addr.in6, membership); } err = uv_ip4_addr(source_addr, 0, &src_addr.in); if (err) return err; return uv__udp_set_source_membership4(handle, &mcast_addr.in, interface_addr, &src_addr.in, membership); #else return UV_ENOSYS; #endif } static int uv__setsockopt(uv_udp_t* handle, int option4, int option6, const void* val, socklen_t size) { int r; if (handle->flags & UV_HANDLE_IPV6) r = setsockopt(handle->io_watcher.fd, IPPROTO_IPV6, option6, val, size); else r = setsockopt(handle->io_watcher.fd, IPPROTO_IP, option4, val, size); if (r) return UV__ERR(errno); return 0; } static int uv__setsockopt_maybe_char(uv_udp_t* handle, int option4, int option6, int val) { #if defined(__sun) || defined(_AIX) || defined(__MVS__) char arg = val; #elif defined(__OpenBSD__) unsigned char arg = val; #else int arg = val; #endif if (val < 0 || val > 255) return UV_EINVAL; return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg)); } int uv_udp_set_broadcast(uv_udp_t* handle, int on) { if (setsockopt(handle->io_watcher.fd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) { return UV__ERR(errno); } return 0; } int uv_udp_set_ttl(uv_udp_t* handle, int ttl) { if (ttl < 1 || ttl > 255) return UV_EINVAL; #if defined(__MVS__) if (!(handle->flags & UV_HANDLE_IPV6)) return UV_ENOTSUP; /* zOS does not support setting ttl for IPv4 */ #endif /* * On Solaris and derivatives such as SmartOS, the length of socket options * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS, * so hardcode the size of these options on this platform, * and use the general uv__setsockopt_maybe_char call on other platforms. */ #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ defined(__MVS__) || defined(__QNX__) return uv__setsockopt(handle, IP_TTL, IPV6_UNICAST_HOPS, &ttl, sizeof(ttl)); #else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) || defined(__MVS__) || defined(__QNX__)) */ return uv__setsockopt_maybe_char(handle, IP_TTL, IPV6_UNICAST_HOPS, ttl); #endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) || defined(__MVS__) || defined(__QNX__) */ } int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) { /* * On Solaris and derivatives such as SmartOS, the length of socket options * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case, * and use the general uv__setsockopt_maybe_char call otherwise. */ #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ defined(__MVS__) || defined(__QNX__) if (handle->flags & UV_HANDLE_IPV6) return uv__setsockopt(handle, IP_MULTICAST_TTL, IPV6_MULTICAST_HOPS, &ttl, sizeof(ttl)); #endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ defined(__MVS__) || defined(__QNX__) */ return uv__setsockopt_maybe_char(handle, IP_MULTICAST_TTL, IPV6_MULTICAST_HOPS, ttl); } int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) { /* * On Solaris and derivatives such as SmartOS, the length of socket options * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case, * and use the general uv__setsockopt_maybe_char call otherwise. */ #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ defined(__MVS__) || defined(__QNX__) if (handle->flags & UV_HANDLE_IPV6) return uv__setsockopt(handle, IP_MULTICAST_LOOP, IPV6_MULTICAST_LOOP, &on, sizeof(on)); #endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) || defined(__MVS__) || defined(__QNX__) */ return uv__setsockopt_maybe_char(handle, IP_MULTICAST_LOOP, IPV6_MULTICAST_LOOP, on); } int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) { struct sockaddr_storage addr_st; struct sockaddr_in* addr4; struct sockaddr_in6* addr6; addr4 = (struct sockaddr_in*) &addr_st; addr6 = (struct sockaddr_in6*) &addr_st; if (!interface_addr) { memset(&addr_st, 0, sizeof addr_st); if (handle->flags & UV_HANDLE_IPV6) { addr_st.ss_family = AF_INET6; addr6->sin6_scope_id = 0; } else { addr_st.ss_family = AF_INET; addr4->sin_addr.s_addr = htonl(INADDR_ANY); } } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) { /* nothing, address was parsed */ } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) { /* nothing, address was parsed */ } else { return UV_EINVAL; } if (addr_st.ss_family == AF_INET) { if (setsockopt(handle->io_watcher.fd, IPPROTO_IP, IP_MULTICAST_IF, (void*) &addr4->sin_addr, sizeof(addr4->sin_addr)) == -1) { return UV__ERR(errno); } } else if (addr_st.ss_family == AF_INET6) { if (setsockopt(handle->io_watcher.fd, IPPROTO_IPV6, IPV6_MULTICAST_IF, &addr6->sin6_scope_id, sizeof(addr6->sin6_scope_id)) == -1) { return UV__ERR(errno); } } else { assert(0 && "unexpected address family"); abort(); } return 0; } int uv_udp_getpeername(const uv_udp_t* handle, struct sockaddr* name, int* namelen) { return uv__getsockpeername((const uv_handle_t*) handle, getpeername, name, namelen); } int uv_udp_getsockname(const uv_udp_t* handle, struct sockaddr* name, int* namelen) { return uv__getsockpeername((const uv_handle_t*) handle, getsockname, name, namelen); } int uv__udp_recv_start(uv_udp_t* handle, uv_alloc_cb alloc_cb, uv_udp_recv_cb recv_cb) { int err; if (alloc_cb == NULL || recv_cb == NULL) return UV_EINVAL; if (uv__io_active(&handle->io_watcher, POLLIN)) return UV_EALREADY; /* FIXME(bnoordhuis) Should be UV_EBUSY. */ err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0); if (err) return err; handle->alloc_cb = alloc_cb; handle->recv_cb = recv_cb; uv__io_start(handle->loop, &handle->io_watcher, POLLIN); uv__handle_start(handle); return 0; } int uv__udp_recv_stop(uv_udp_t* handle) { uv__io_stop(handle->loop, &handle->io_watcher, POLLIN); if (!uv__io_active(&handle->io_watcher, POLLOUT)) uv__handle_stop(handle); handle->alloc_cb = NULL; handle->recv_cb = NULL; return 0; }