summaryrefslogtreecommitdiffstats
path: root/Utilities/cmlibuv/src/unix/udp.c
diff options
context:
space:
mode:
Diffstat (limited to 'Utilities/cmlibuv/src/unix/udp.c')
-rw-r--r--Utilities/cmlibuv/src/unix/udp.c368
1 files changed, 352 insertions, 16 deletions
diff --git a/Utilities/cmlibuv/src/unix/udp.c b/Utilities/cmlibuv/src/unix/udp.c
index b578e7b..f2fcae1 100644
--- a/Utilities/cmlibuv/src/unix/udp.c
+++ b/Utilities/cmlibuv/src/unix/udp.c
@@ -32,6 +32,8 @@
#endif
#include <sys/un.h>
+#define UV__UDP_DGRAM_MAXSIZE (64 * 1024)
+
#if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
# define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
#endif
@@ -49,6 +51,36 @@ static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
int domain,
unsigned int flags);
+#if HAVE_MMSG
+
+#define UV__MMSG_MAXWIDTH 20
+
+static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf);
+static void uv__udp_sendmmsg(uv_udp_t* handle);
+
+static int uv__recvmmsg_avail;
+static int uv__sendmmsg_avail;
+static uv_once_t once = UV_ONCE_INIT;
+
+static void uv__udp_mmsg_init(void) {
+ int ret;
+ int s;
+ s = uv__socket(AF_INET, SOCK_DGRAM, 0);
+ if (s < 0)
+ return;
+ ret = uv__sendmmsg(s, NULL, 0, 0);
+ if (ret == 0 || errno != ENOSYS) {
+ uv__sendmmsg_avail = 1;
+ uv__recvmmsg_avail = 1;
+ } else {
+ ret = uv__recvmmsg(s, NULL, 0, 0, NULL);
+ if (ret == 0 || errno != ENOSYS)
+ uv__recvmmsg_avail = 1;
+ }
+ uv__close(s);
+}
+
+#endif
void uv__udp_close(uv_udp_t* handle) {
uv__io_close(handle->loop, &handle->io_watcher);
@@ -148,6 +180,61 @@ static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
}
}
+#if HAVE_MMSG
+static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) {
+ struct sockaddr_in6 peers[UV__MMSG_MAXWIDTH];
+ struct iovec iov[UV__MMSG_MAXWIDTH];
+ struct uv__mmsghdr msgs[UV__MMSG_MAXWIDTH];
+ ssize_t nread;
+ uv_buf_t chunk_buf;
+ size_t chunks;
+ int flags;
+ size_t k;
+
+ /* prepare structures for recvmmsg */
+ chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
+ if (chunks > ARRAY_SIZE(iov))
+ chunks = ARRAY_SIZE(iov);
+ for (k = 0; k < chunks; ++k) {
+ iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
+ iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
+ msgs[k].msg_hdr.msg_iov = iov + k;
+ msgs[k].msg_hdr.msg_iovlen = 1;
+ msgs[k].msg_hdr.msg_name = peers + k;
+ msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]);
+ }
+
+ do
+ nread = uv__recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL);
+ while (nread == -1 && errno == EINTR);
+
+ if (nread < 1) {
+ if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
+ handle->recv_cb(handle, 0, buf, NULL, 0);
+ else
+ handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
+ } else {
+ /* pass each chunk to the application */
+ for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
+ flags = UV_UDP_MMSG_CHUNK;
+ if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
+ flags |= UV_UDP_PARTIAL;
+
+ chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
+ handle->recv_cb(handle,
+ msgs[k].msg_len,
+ &chunk_buf,
+ msgs[k].msg_hdr.msg_name,
+ flags);
+ }
+
+ /* one last callback so the original buffer is freed */
+ if (handle->recv_cb != NULL)
+ handle->recv_cb(handle, 0, buf, NULL, 0);
+ }
+ return nread;
+}
+#endif
static void uv__udp_recvmsg(uv_udp_t* handle) {
struct sockaddr_storage peer;
@@ -165,18 +252,32 @@ static void uv__udp_recvmsg(uv_udp_t* handle) {
*/
count = 32;
- memset(&h, 0, sizeof(h));
- h.msg_name = &peer;
-
do {
buf = uv_buf_init(NULL, 0);
- handle->alloc_cb((uv_handle_t*) handle, 64 * 1024, &buf);
+ handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
if (buf.base == NULL || buf.len == 0) {
handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
return;
}
assert(buf.base != NULL);
+#if HAVE_MMSG
+ uv_once(&once, uv__udp_mmsg_init);
+ if (uv__recvmmsg_avail) {
+ /* Returned space for more than 1 datagram, use it to receive
+ * multiple datagrams. */
+ if (buf.len >= 2 * UV__UDP_DGRAM_MAXSIZE) {
+ nread = uv__udp_recvmmsg(handle, &buf);
+ if (nread > 0)
+ count -= nread;
+ continue;
+ }
+ }
+#endif
+
+ memset(&h, 0, sizeof(h));
+ memset(&peer, 0, sizeof(peer));
+ h.msg_name = &peer;
h.msg_namelen = sizeof(peer);
h.msg_iov = (void*) &buf;
h.msg_iovlen = 1;
@@ -193,33 +294,126 @@ static void uv__udp_recvmsg(uv_udp_t* handle) {
handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
}
else {
- const struct sockaddr *addr;
- if (h.msg_namelen == 0)
- addr = NULL;
- else
- addr = (const struct sockaddr*) &peer;
-
flags = 0;
if (h.msg_flags & MSG_TRUNC)
flags |= UV_UDP_PARTIAL;
- handle->recv_cb(handle, nread, &buf, addr, flags);
+ handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
}
+ count--;
}
/* recv_cb callback may decide to pause or close the handle */
while (nread != -1
- && count-- > 0
+ && count > 0
&& handle->io_watcher.fd != -1
&& handle->recv_cb != NULL);
}
+#if HAVE_MMSG
+static void uv__udp_sendmmsg(uv_udp_t* handle) {
+ uv_udp_send_t* req;
+ struct uv__mmsghdr h[UV__MMSG_MAXWIDTH];
+ struct uv__mmsghdr *p;
+ QUEUE* q;
+ ssize_t npkts;
+ size_t pkts;
+ size_t i;
+
+ if (QUEUE_EMPTY(&handle->write_queue))
+ return;
+
+write_queue_drain:
+ for (pkts = 0, q = QUEUE_HEAD(&handle->write_queue);
+ pkts < UV__MMSG_MAXWIDTH && q != &handle->write_queue;
+ ++pkts, q = QUEUE_HEAD(q)) {
+ assert(q != NULL);
+ req = QUEUE_DATA(q, uv_udp_send_t, queue);
+ assert(req != NULL);
+
+ p = &h[pkts];
+ memset(p, 0, sizeof(*p));
+ if (req->addr.ss_family == AF_UNSPEC) {
+ p->msg_hdr.msg_name = NULL;
+ p->msg_hdr.msg_namelen = 0;
+ } else {
+ p->msg_hdr.msg_name = &req->addr;
+ if (req->addr.ss_family == AF_INET6)
+ p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6);
+ else if (req->addr.ss_family == AF_INET)
+ p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in);
+ else if (req->addr.ss_family == AF_UNIX)
+ p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un);
+ else {
+ assert(0 && "unsupported address family");
+ abort();
+ }
+ }
+ h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs;
+ h[pkts].msg_hdr.msg_iovlen = req->nbufs;
+ }
+
+ do
+ npkts = uv__sendmmsg(handle->io_watcher.fd, h, pkts, 0);
+ while (npkts == -1 && errno == EINTR);
+
+ if (npkts < 1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
+ return;
+ for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
+ i < pkts && q != &handle->write_queue;
+ ++i, q = QUEUE_HEAD(q)) {
+ assert(q != NULL);
+ req = QUEUE_DATA(q, uv_udp_send_t, queue);
+ assert(req != NULL);
+
+ req->status = UV__ERR(errno);
+ QUEUE_REMOVE(&req->queue);
+ QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
+ }
+ uv__io_feed(handle->loop, &handle->io_watcher);
+ return;
+ }
+
+ for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
+ i < pkts && q != &handle->write_queue;
+ ++i, q = QUEUE_HEAD(&handle->write_queue)) {
+ assert(q != NULL);
+ req = QUEUE_DATA(q, uv_udp_send_t, queue);
+ assert(req != NULL);
+
+ req->status = req->bufs[0].len;
+
+ /* Sending a datagram is an atomic operation: either all data
+ * is written or nothing is (and EMSGSIZE is raised). That is
+ * why we don't handle partial writes. Just pop the request
+ * off the write queue and onto the completed queue, done.
+ */
+ QUEUE_REMOVE(&req->queue);
+ QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
+ }
+
+ /* couldn't batch everything, continue sending (jump to avoid stack growth) */
+ if (!QUEUE_EMPTY(&handle->write_queue))
+ goto write_queue_drain;
+ uv__io_feed(handle->loop, &handle->io_watcher);
+ return;
+}
+#endif
static void uv__udp_sendmsg(uv_udp_t* handle) {
uv_udp_send_t* req;
- QUEUE* q;
struct msghdr h;
+ QUEUE* q;
ssize_t size;
+#if HAVE_MMSG
+ uv_once(&once, uv__udp_mmsg_init);
+ if (uv__sendmmsg_avail) {
+ uv__udp_sendmmsg(handle);
+ return;
+ }
+#endif
+
while (!QUEUE_EMPTY(&handle->write_queue)) {
q = QUEUE_HEAD(&handle->write_queue);
assert(q != NULL);
@@ -269,7 +463,6 @@ static void uv__udp_sendmsg(uv_udp_t* handle) {
}
}
-
/* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
* refinements for programs that use multicast.
*
@@ -659,6 +852,100 @@ static int uv__udp_set_membership6(uv_udp_t* handle,
}
+#if !defined(__OpenBSD__) && !defined(__NetBSD__) && !defined(__ANDROID__)
+static int uv__udp_set_source_membership4(uv_udp_t* handle,
+ const struct sockaddr_in* multicast_addr,
+ const char* interface_addr,
+ const struct sockaddr_in* source_addr,
+ uv_membership membership) {
+ struct ip_mreq_source mreq;
+ int optname;
+ int err;
+
+ err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
+ if (err)
+ return err;
+
+ memset(&mreq, 0, sizeof(mreq));
+
+ if (interface_addr != NULL) {
+ err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
+ if (err)
+ return err;
+ } else {
+ mreq.imr_interface.s_addr = htonl(INADDR_ANY);
+ }
+
+ mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
+ mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
+
+ if (membership == UV_JOIN_GROUP)
+ optname = IP_ADD_SOURCE_MEMBERSHIP;
+ else if (membership == UV_LEAVE_GROUP)
+ optname = IP_DROP_SOURCE_MEMBERSHIP;
+ else
+ return UV_EINVAL;
+
+ if (setsockopt(handle->io_watcher.fd,
+ IPPROTO_IP,
+ optname,
+ &mreq,
+ sizeof(mreq))) {
+ return UV__ERR(errno);
+ }
+
+ return 0;
+}
+
+
+static int uv__udp_set_source_membership6(uv_udp_t* handle,
+ const struct sockaddr_in6* multicast_addr,
+ const char* interface_addr,
+ const struct sockaddr_in6* source_addr,
+ uv_membership membership) {
+ struct group_source_req mreq;
+ struct sockaddr_in6 addr6;
+ int optname;
+ int err;
+
+ err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
+ if (err)
+ return err;
+
+ memset(&mreq, 0, sizeof(mreq));
+
+ if (interface_addr != NULL) {
+ err = uv_ip6_addr(interface_addr, 0, &addr6);
+ if (err)
+ return err;
+ mreq.gsr_interface = addr6.sin6_scope_id;
+ } else {
+ mreq.gsr_interface = 0;
+ }
+
+ memcpy(&mreq.gsr_group, multicast_addr, sizeof(mreq.gsr_group));
+ memcpy(&mreq.gsr_source, source_addr, sizeof(mreq.gsr_source));
+
+ if (membership == UV_JOIN_GROUP)
+ optname = MCAST_JOIN_SOURCE_GROUP;
+ else if (membership == UV_LEAVE_GROUP)
+ optname = MCAST_LEAVE_SOURCE_GROUP;
+ else
+ return UV_EINVAL;
+
+ if (setsockopt(handle->io_watcher.fd,
+ IPPROTO_IPV6,
+ optname,
+ &mreq,
+ sizeof(mreq))) {
+ return UV__ERR(errno);
+ }
+
+ return 0;
+}
+#endif
+
+
int uv_udp_init_ex(uv_loop_t* loop, uv_udp_t* handle, unsigned int flags) {
int domain;
int err;
@@ -748,11 +1035,60 @@ int uv_udp_set_membership(uv_udp_t* handle,
}
}
+
+int uv_udp_set_source_membership(uv_udp_t* handle,
+ const char* multicast_addr,
+ const char* interface_addr,
+ const char* source_addr,
+ uv_membership membership) {
+#if !defined(__OpenBSD__) && !defined(__NetBSD__) && !defined(__ANDROID__)
+ int err;
+ struct sockaddr_storage mcast_addr;
+ struct sockaddr_in* mcast_addr4;
+ struct sockaddr_in6* mcast_addr6;
+ struct sockaddr_storage src_addr;
+ struct sockaddr_in* src_addr4;
+ struct sockaddr_in6* src_addr6;
+
+ mcast_addr4 = (struct sockaddr_in*)&mcast_addr;
+ mcast_addr6 = (struct sockaddr_in6*)&mcast_addr;
+ src_addr4 = (struct sockaddr_in*)&src_addr;
+ src_addr6 = (struct sockaddr_in6*)&src_addr;
+
+ err = uv_ip4_addr(multicast_addr, 0, mcast_addr4);
+ if (err) {
+ err = uv_ip6_addr(multicast_addr, 0, mcast_addr6);
+ if (err)
+ return err;
+ err = uv_ip6_addr(source_addr, 0, src_addr6);
+ if (err)
+ return err;
+ return uv__udp_set_source_membership6(handle,
+ mcast_addr6,
+ interface_addr,
+ src_addr6,
+ membership);
+ }
+
+ err = uv_ip4_addr(source_addr, 0, src_addr4);
+ if (err)
+ return err;
+ return uv__udp_set_source_membership4(handle,
+ mcast_addr4,
+ interface_addr,
+ src_addr4,
+ membership);
+#else
+ return UV_ENOSYS;
+#endif
+}
+
+
static int uv__setsockopt(uv_udp_t* handle,
int option4,
int option6,
const void* val,
- size_t size) {
+ socklen_t size) {
int r;
if (handle->flags & UV_HANDLE_IPV6)
@@ -875,7 +1211,7 @@ int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
* and use the general uv__setsockopt_maybe_char call otherwise.
*/
#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
- defined(__MVS__)
+ defined(__MVS__)
if (handle->flags & UV_HANDLE_IPV6)
return uv__setsockopt(handle,
IP_MULTICAST_LOOP,