diff options
Diffstat (limited to 'Utilities/cmcurl/lib/multi.c')
-rw-r--r-- | Utilities/cmcurl/lib/multi.c | 626 |
1 files changed, 460 insertions, 166 deletions
diff --git a/Utilities/cmcurl/lib/multi.c b/Utilities/cmcurl/lib/multi.c index c7c46ee..cef2805 100644 --- a/Utilities/cmcurl/lib/multi.c +++ b/Utilities/cmcurl/lib/multi.c @@ -5,7 +5,7 @@ * | (__| |_| | _ <| |___ * \___|\___/|_| \_\_____| * - * Copyright (C) 1998 - 2019, Daniel Stenberg, <daniel@haxx.se>, et al. + * Copyright (C) 1998 - 2020, Daniel Stenberg, <daniel@haxx.se>, et al. * * This software is licensed as described in the file COPYING, which * you should have received as part of this distribution. The terms @@ -46,6 +46,8 @@ #include "connect.h" #include "http_proxy.h" #include "http2.h" +#include "socketpair.h" +#include "socks.h" /* The last 3 #include files should be in this order */ #include "curl_printf.h" #include "curl_memory.h" @@ -71,8 +73,6 @@ static CURLMcode singlesocket(struct Curl_multi *multi, struct Curl_easy *data); -static int update_timer(struct Curl_multi *multi); - static CURLMcode add_next_timeout(struct curltime now, struct Curl_multi *multi, struct Curl_easy *d); @@ -189,7 +189,7 @@ static void mstate(struct Curl_easy *data, CURLMstate state */ struct Curl_sh_entry { - struct curl_llist list; /* list of easy handles using this socket */ + struct curl_hash transfers; /* hash of transfers using this socket */ unsigned int action; /* what combined action READ/WRITE this socket waits for */ void *socketp; /* settable by users with curl_multi_assign() */ @@ -206,12 +206,36 @@ struct Curl_sh_entry { static struct Curl_sh_entry *sh_getentry(struct curl_hash *sh, curl_socket_t s) { - if(s != CURL_SOCKET_BAD) + if(s != CURL_SOCKET_BAD) { /* only look for proper sockets */ return Curl_hash_pick(sh, (char *)&s, sizeof(curl_socket_t)); + } return NULL; } +#define TRHASH_SIZE 13 +static size_t trhash(void *key, size_t key_length, size_t slots_num) +{ + size_t keyval = (size_t)*(struct Curl_easy **)key; + (void) key_length; + + return (keyval % slots_num); +} + +static size_t trhash_compare(void *k1, size_t k1_len, void *k2, size_t k2_len) +{ + (void)k1_len; + (void)k2_len; + + return *(struct Curl_easy **)k1 == *(struct Curl_easy **)k2; +} + +static void trhash_dtor(void *nada) +{ + (void)nada; +} + + /* make sure this socket is present in the hash for this handle */ static struct Curl_sh_entry *sh_addentry(struct curl_hash *sh, curl_socket_t s) @@ -219,19 +243,25 @@ static struct Curl_sh_entry *sh_addentry(struct curl_hash *sh, struct Curl_sh_entry *there = sh_getentry(sh, s); struct Curl_sh_entry *check; - if(there) + if(there) { /* it is present, return fine */ return there; + } /* not present, add it */ check = calloc(1, sizeof(struct Curl_sh_entry)); if(!check) return NULL; /* major failure */ - Curl_llist_init(&check->list, NULL); + if(Curl_hash_init(&check->transfers, TRHASH_SIZE, trhash, + trhash_compare, trhash_dtor)) { + free(check); + return NULL; + } /* make/add new hash entry */ if(!Curl_hash_add(sh, (char *)&s, sizeof(curl_socket_t), check)) { + Curl_hash_destroy(&check->transfers); free(check); return NULL; /* major failure */ } @@ -241,8 +271,11 @@ static struct Curl_sh_entry *sh_addentry(struct curl_hash *sh, /* delete the given socket + handle from the hash */ -static void sh_delentry(struct curl_hash *sh, curl_socket_t s) +static void sh_delentry(struct Curl_sh_entry *entry, + struct curl_hash *sh, curl_socket_t s) { + Curl_hash_destroy(&entry->transfers); + /* We remove the hash entry. This will end up in a call to sh_freeentry(). */ Curl_hash_delete(sh, (char *)&s, sizeof(curl_socket_t)); @@ -311,17 +344,6 @@ static CURLMcode multi_addmsg(struct Curl_multi *multi, return CURLM_OK; } -/* - * multi_freeamsg() - * - * Callback used by the llist system when a single list entry is destroyed. - */ -static void multi_freeamsg(void *a, void *b) -{ - (void)a; - (void)b; -} - struct Curl_multi *Curl_multi_handle(int hashsize, /* socket hash */ int chashsize) /* connection hash */ { @@ -341,11 +363,30 @@ struct Curl_multi *Curl_multi_handle(int hashsize, /* socket hash */ if(Curl_conncache_init(&multi->conn_cache, chashsize)) goto error; - Curl_llist_init(&multi->msglist, multi_freeamsg); - Curl_llist_init(&multi->pending, multi_freeamsg); + Curl_llist_init(&multi->msglist, NULL); + Curl_llist_init(&multi->pending, NULL); + + multi->multiplexing = TRUE; /* -1 means it not set by user, use the default value */ multi->maxconnects = -1; + multi->max_concurrent_streams = 100; + multi->ipv6_works = Curl_ipv6works(NULL); + +#ifdef ENABLE_WAKEUP + if(Curl_socketpair(AF_UNIX, SOCK_STREAM, 0, multi->wakeup_pair) < 0) { + multi->wakeup_pair[0] = CURL_SOCKET_BAD; + multi->wakeup_pair[1] = CURL_SOCKET_BAD; + } + else if(curlx_nonblock(multi->wakeup_pair[0], TRUE) < 0 || + curlx_nonblock(multi->wakeup_pair[1], TRUE) < 0) { + sclose(multi->wakeup_pair[0]); + sclose(multi->wakeup_pair[1]); + multi->wakeup_pair[0] = CURL_SOCKET_BAD; + multi->wakeup_pair[1] = CURL_SOCKET_BAD; + } +#endif + return multi; error: @@ -453,16 +494,16 @@ CURLMcode curl_multi_add_handle(struct Curl_multi *multi, /* increase the alive-counter */ multi->num_alive++; - /* A somewhat crude work-around for a little glitch in update_timer() that - happens if the lastcall time is set to the same time when the handle is - removed as when the next handle is added, as then the check in - update_timer() that prevents calling the application multiple times with - the same timer info will not trigger and then the new handle's timeout - will not be notified to the app. + /* A somewhat crude work-around for a little glitch in Curl_update_timer() + that happens if the lastcall time is set to the same time when the handle + is removed as when the next handle is added, as then the check in + Curl_update_timer() that prevents calling the application multiple times + with the same timer info will not trigger and then the new handle's + timeout will not be notified to the app. The work-around is thus simply to clear the 'lastcall' variable to force - update_timer() to always trigger a callback to the app when a new easy - handle is added */ + Curl_update_timer() to always trigger a callback to the app when a new + easy handle is added */ memset(&multi->timer_lastcall, 0, sizeof(multi->timer_lastcall)); /* The closure handle only ever has default timeouts set. To improve the @@ -475,7 +516,7 @@ CURLMcode curl_multi_add_handle(struct Curl_multi *multi, data->state.conn_cache->closure_handle->set.no_signal = data->set.no_signal; - update_timer(multi); + Curl_update_timer(multi); return CURLM_OK; } @@ -510,6 +551,8 @@ static CURLcode multi_done(struct Curl_easy *data, /* Stop if multi_done() has already been called */ return CURLE_OK; + conn->data = data; /* ensure the connection uses this transfer now */ + /* Stop the resolver and free its own resources (but not dns_entry yet). */ Curl_resolver_kill(conn); @@ -546,15 +589,20 @@ static CURLcode multi_done(struct Curl_easy *data, process_pending_handles(data->multi); /* connection / multiplex */ + CONN_LOCK(data); detach_connnection(data); if(CONN_INUSE(conn)) { /* Stop if still used. */ + /* conn->data must not remain pointing to this transfer since it is going + away! Find another to own it! */ + conn->data = conn->easyq.head->ptr; + CONN_UNLOCK(data); DEBUGF(infof(data, "Connection still in use %zu, " "no more multi_done now!\n", conn->easyq.size)); return CURLE_OK; } - + conn->data = NULL; /* the connection now has no owner */ data->state.done = TRUE; /* called just now! */ if(conn->dns_entry) { @@ -597,7 +645,10 @@ static CURLcode multi_done(struct Curl_easy *data, #endif ) || conn->bits.close || (premature && !(conn->handler->flags & PROTOPT_STREAM))) { - CURLcode res2 = Curl_disconnect(data, conn, premature); + CURLcode res2; + connclose(conn, "disconnecting"); + CONN_UNLOCK(data); + res2 = Curl_disconnect(data, conn, premature); /* If we had an error already, make sure we return that one. But if we got a new error, return that. */ @@ -614,9 +665,9 @@ static CURLcode multi_done(struct Curl_easy *data, conn->bits.httpproxy ? conn->http_proxy.host.dispname : conn->bits.conn_to_host ? conn->conn_to_host.dispname : conn->host.dispname); - /* the connection is no longer in use by this transfer */ - if(Curl_conncache_return_conn(conn)) { + CONN_UNLOCK(data); + if(Curl_conncache_return_conn(data, conn)) { /* remember the most recently used connection */ data->state.lastconnect = conn; infof(data, "%s\n", buffer); @@ -674,19 +725,14 @@ CURLMcode curl_multi_remove_handle(struct Curl_multi *multi, easy_owns_conn = TRUE; } - /* The timer must be shut down before data->multi is set to NULL, - else the timenode will remain in the splay tree after - curl_easy_cleanup is called. */ - Curl_expire_clear(data); - if(data->conn) { /* we must call multi_done() here (if we still own the connection) so that we don't leave a half-baked one around */ if(easy_owns_conn) { - /* multi_done() clears the conn->data field to lose the association - between the easy handle and the connection + /* multi_done() clears the association between the easy handle and the + connection. Note that this ignores the return code simply because there's nothing really useful to do with it anyway! */ @@ -694,6 +740,11 @@ CURLMcode curl_multi_remove_handle(struct Curl_multi *multi, } } + /* The timer must be shut down before data->multi is set to NULL, else the + timenode will remain in the splay tree after curl_easy_cleanup is + called. Do it after multi_done() in case that sets another time! */ + Curl_expire_clear(data); + if(data->connect_queue.ptr) /* the handle was in the pending list waiting for an available connection, so go ahead and remove it */ @@ -723,10 +774,8 @@ CURLMcode curl_multi_remove_handle(struct Curl_multi *multi, vanish with this handle */ /* Remove the association between the connection and the handle */ - if(data->conn) { - data->conn->data = NULL; + if(data->conn) detach_connnection(data); - } #ifdef USE_LIBPSL /* Remove the PSL association. */ @@ -765,7 +814,7 @@ CURLMcode curl_multi_remove_handle(struct Curl_multi *multi, We do not touch the easy handle here! */ multi->num_easy--; /* one less to care about now */ - update_timer(multi); + Curl_update_timer(multi); return CURLM_OK; } @@ -797,25 +846,30 @@ void Curl_attach_connnection(struct Curl_easy *data, } static int waitconnect_getsock(struct connectdata *conn, - curl_socket_t *sock, - int numsocks) + curl_socket_t *sock) { int i; int s = 0; int rc = 0; - if(!numsocks) - return GETSOCK_BLANK; - #ifdef USE_SSL if(CONNECT_FIRSTSOCKET_PROXY_SSL()) - return Curl_ssl_getsock(conn, sock, numsocks); + return Curl_ssl_getsock(conn, sock); #endif + if(SOCKS_STATE(conn->cnnct.state)) + return Curl_SOCKS_getsock(conn, sock, FIRSTSOCKET); + for(i = 0; i<2; i++) { if(conn->tempsock[i] != CURL_SOCKET_BAD) { sock[s] = conn->tempsock[i]; - rc |= GETSOCK_WRITESOCK(s++); + rc |= GETSOCK_WRITESOCK(s); +#ifdef ENABLE_QUIC + if(conn->transport == TRNSPRT_QUIC) + /* when connecting QUIC, we want to read the socket too */ + rc |= GETSOCK_READSOCK(s); +#endif + s++; } } @@ -823,12 +877,8 @@ static int waitconnect_getsock(struct connectdata *conn, } static int waitproxyconnect_getsock(struct connectdata *conn, - curl_socket_t *sock, - int numsocks) + curl_socket_t *sock) { - if(!numsocks) - return GETSOCK_BLANK; - sock[0] = conn->sock[FIRSTSOCKET]; /* when we've sent a CONNECT to a proxy, we should rather wait for the @@ -840,19 +890,37 @@ static int waitproxyconnect_getsock(struct connectdata *conn, } static int domore_getsock(struct connectdata *conn, - curl_socket_t *socks, - int numsocks) + curl_socket_t *socks) { if(conn && conn->handler->domore_getsock) - return conn->handler->domore_getsock(conn, socks, numsocks); + return conn->handler->domore_getsock(conn, socks); return GETSOCK_BLANK; } -/* returns bitmapped flags for this handle and its sockets */ +static int doing_getsock(struct connectdata *conn, + curl_socket_t *socks) +{ + if(conn && conn->handler->doing_getsock) + return conn->handler->doing_getsock(conn, socks); + return GETSOCK_BLANK; +} + +static int protocol_getsock(struct connectdata *conn, + curl_socket_t *socks) +{ + if(conn->handler->proto_getsock) + return conn->handler->proto_getsock(conn, socks); + /* Backup getsock logic. Since there is a live socket in use, we must wait + for it or it will be removed from watching when the multi_socket API is + used. */ + socks[0] = conn->sock[FIRSTSOCKET]; + return GETSOCK_READSOCK(0) | GETSOCK_WRITESOCK(0); +} + +/* returns bitmapped flags for this handle and its sockets. The 'socks[]' + array contains MAX_SOCKSPEREASYHANDLE entries. */ static int multi_getsock(struct Curl_easy *data, - curl_socket_t *socks, /* points to numsocks number - of sockets */ - int numsocks) + curl_socket_t *socks) { /* The no connection case can happen when this is called from curl_multi_remove_handle() => singlesocket() => multi_getsock(). @@ -884,30 +952,30 @@ static int multi_getsock(struct Curl_easy *data, return 0; case CURLM_STATE_WAITRESOLVE: - return Curl_resolv_getsock(data->conn, socks, numsocks); + return Curl_resolv_getsock(data->conn, socks); case CURLM_STATE_PROTOCONNECT: case CURLM_STATE_SENDPROTOCONNECT: - return Curl_protocol_getsock(data->conn, socks, numsocks); + return protocol_getsock(data->conn, socks); case CURLM_STATE_DO: case CURLM_STATE_DOING: - return Curl_doing_getsock(data->conn, socks, numsocks); + return doing_getsock(data->conn, socks); case CURLM_STATE_WAITPROXYCONNECT: - return waitproxyconnect_getsock(data->conn, socks, numsocks); + return waitproxyconnect_getsock(data->conn, socks); case CURLM_STATE_WAITCONNECT: - return waitconnect_getsock(data->conn, socks, numsocks); + return waitconnect_getsock(data->conn, socks); case CURLM_STATE_DO_MORE: - return domore_getsock(data->conn, socks, numsocks); + return domore_getsock(data->conn, socks); case CURLM_STATE_DO_DONE: /* since is set after DO is completed, we switch to waiting for the same as the *PERFORM states */ case CURLM_STATE_PERFORM: - return Curl_single_getsock(data->conn, socks, numsocks); + return Curl_single_getsock(data->conn, socks); } } @@ -933,7 +1001,7 @@ CURLMcode curl_multi_fdset(struct Curl_multi *multi, data = multi->easyp; while(data) { - int bitmap = multi_getsock(data, sockbunch, MAX_SOCKSPEREASYHANDLE); + int bitmap = multi_getsock(data, sockbunch); for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) { curl_socket_t s = CURL_SOCKET_BAD; @@ -963,12 +1031,13 @@ CURLMcode curl_multi_fdset(struct Curl_multi *multi, #define NUM_POLLS_ON_STACK 10 -CURLMcode Curl_multi_wait(struct Curl_multi *multi, - struct curl_waitfd extra_fds[], - unsigned int extra_nfds, - int timeout_ms, - int *ret, - bool *gotsocket) /* if any socket was checked */ +static CURLMcode Curl_multi_wait(struct Curl_multi *multi, + struct curl_waitfd extra_fds[], + unsigned int extra_nfds, + int timeout_ms, + int *ret, + bool extrawait, /* when no socket, wait */ + bool use_wakeup) { struct Curl_easy *data; curl_socket_t sockbunch[MAX_SOCKSPEREASYHANDLE]; @@ -982,19 +1051,19 @@ CURLMcode Curl_multi_wait(struct Curl_multi *multi, struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK]; struct pollfd *ufds = &a_few_on_stack[0]; - if(gotsocket) - *gotsocket = FALSE; - if(!GOOD_MULTI_HANDLE(multi)) return CURLM_BAD_HANDLE; if(multi->in_callback) return CURLM_RECURSIVE_API_CALL; + if(timeout_ms < 0) + return CURLM_BAD_FUNCTION_ARGUMENT; + /* Count up how many fds we have from the multi handle */ data = multi->easyp; while(data) { - bitmap = multi_getsock(data, sockbunch, MAX_SOCKSPEREASYHANDLE); + bitmap = multi_getsock(data, sockbunch); for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) { curl_socket_t s = CURL_SOCKET_BAD; @@ -1025,6 +1094,12 @@ CURLMcode Curl_multi_wait(struct Curl_multi *multi, curlfds = nfds; /* number of internal file descriptors */ nfds += extra_nfds; /* add the externally provided ones */ +#ifdef ENABLE_WAKEUP + if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { + ++nfds; + } +#endif + if(nfds > NUM_POLLS_ON_STACK) { /* 'nfds' is a 32 bit value and 'struct pollfd' is typically 8 bytes big, so at 2^29 sockets this value might wrap. When a process gets @@ -1044,7 +1119,7 @@ CURLMcode Curl_multi_wait(struct Curl_multi *multi, /* Add the curl handles to our pollfds first */ data = multi->easyp; while(data) { - bitmap = multi_getsock(data, sockbunch, MAX_SOCKSPEREASYHANDLE); + bitmap = multi_getsock(data, sockbunch); for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) { curl_socket_t s = CURL_SOCKET_BAD; @@ -1083,6 +1158,14 @@ CURLMcode Curl_multi_wait(struct Curl_multi *multi, ++nfds; } +#ifdef ENABLE_WAKEUP + if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { + ufds[nfds].fd = multi->wakeup_pair[0]; + ufds[nfds].events = POLLIN; + ++nfds; + } +#endif + if(nfds) { int pollrc; /* wait... */ @@ -1106,6 +1189,29 @@ CURLMcode Curl_multi_wait(struct Curl_multi *multi, extra_fds[i].revents = mask; } + +#ifdef ENABLE_WAKEUP + if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { + if(ufds[curlfds + extra_nfds].revents & POLLIN) { + char buf[64]; + while(1) { + /* the reading socket is non-blocking, try to read + data from it until it receives an error (except EINTR). + In normal cases it will get EAGAIN or EWOULDBLOCK + when there is no more data, breaking the loop. */ + if(sread(multi->wakeup_pair[0], buf, sizeof(buf)) <= 0) { +#ifndef USE_WINSOCK + if(EINTR == SOCKERRNO) + continue; +#endif + break; + } + } + /* do not count the wakeup socket into the returned value */ + retcode--; + } + } +#endif } } @@ -1113,9 +1219,23 @@ CURLMcode Curl_multi_wait(struct Curl_multi *multi, free(ufds); if(ret) *ret = retcode; - if(gotsocket && (extra_fds || curlfds)) + if(!extrawait || nfds) /* if any socket was checked */ - *gotsocket = TRUE; + ; + else { + long sleep_ms = 0; + + /* Avoid busy-looping when there's nothing particular to wait for */ + if(!curl_multi_timeout(multi, &sleep_ms) && sleep_ms) { + if(sleep_ms > timeout_ms) + sleep_ms = timeout_ms; + /* when there are no easy handles in the multi, this holds a -1 + timeout */ + else if((sleep_ms < 0) && extrawait) + sleep_ms = timeout_ms; + Curl_wait_ms((int)sleep_ms); + } + } return CURLM_OK; } @@ -1126,7 +1246,65 @@ CURLMcode curl_multi_wait(struct Curl_multi *multi, int timeout_ms, int *ret) { - return Curl_multi_wait(multi, extra_fds, extra_nfds, timeout_ms, ret, NULL); + return Curl_multi_wait(multi, extra_fds, extra_nfds, timeout_ms, ret, FALSE, + FALSE); +} + +CURLMcode curl_multi_poll(struct Curl_multi *multi, + struct curl_waitfd extra_fds[], + unsigned int extra_nfds, + int timeout_ms, + int *ret) +{ + return Curl_multi_wait(multi, extra_fds, extra_nfds, timeout_ms, ret, TRUE, + TRUE); +} + +CURLMcode curl_multi_wakeup(struct Curl_multi *multi) +{ + /* this function is usually called from another thread, + it has to be careful only to access parts of the + Curl_multi struct that are constant */ + + /* GOOD_MULTI_HANDLE can be safely called */ + if(!GOOD_MULTI_HANDLE(multi)) + return CURLM_BAD_HANDLE; + +#ifdef ENABLE_WAKEUP + /* the wakeup_pair variable is only written during init and cleanup, + making it safe to access from another thread after the init part + and before cleanup */ + if(multi->wakeup_pair[1] != CURL_SOCKET_BAD) { + char buf[1]; + buf[0] = 1; + while(1) { + /* swrite() is not thread-safe in general, because concurrent calls + can have their messages interleaved, but in this case the content + of the messages does not matter, which makes it ok to call. + + The write socket is set to non-blocking, this way this function + cannot block, making it safe to call even from the same thread + that will call Curl_multi_wait(). If swrite() returns that it + would block, it's considered successful because it means that + previous calls to this function will wake up the poll(). */ + if(swrite(multi->wakeup_pair[1], buf, sizeof(buf)) < 0) { + int err = SOCKERRNO; + int return_success; +#ifdef USE_WINSOCK + return_success = WSAEWOULDBLOCK == err; +#else + if(EINTR == err) + continue; + return_success = EWOULDBLOCK == err || EAGAIN == err; +#endif + if(!return_success) + return CURLM_WAKEUP_FAILURE; + } + return CURLM_OK; + } + } +#endif + return CURLM_WAKEUP_FAILURE; } /* @@ -1189,6 +1367,7 @@ static CURLcode multi_do(struct Curl_easy *data, bool *done) DEBUGASSERT(conn); DEBUGASSERT(conn->handler); + DEBUGASSERT(conn->data == data); if(conn->handler->do_it) { /* generic protocol-specific function pointer set in curl_connect() */ @@ -1226,6 +1405,109 @@ static CURLcode multi_do_more(struct connectdata *conn, int *complete) return result; } +/* + * We are doing protocol-specific connecting and this is being called over and + * over from the multi interface until the connection phase is done on + * protocol layer. + */ + +static CURLcode protocol_connecting(struct connectdata *conn, + bool *done) +{ + CURLcode result = CURLE_OK; + + if(conn && conn->handler->connecting) { + *done = FALSE; + result = conn->handler->connecting(conn, done); + } + else + *done = TRUE; + + return result; +} + +/* + * We are DOING this is being called over and over from the multi interface + * until the DOING phase is done on protocol layer. + */ + +static CURLcode protocol_doing(struct connectdata *conn, bool *done) +{ + CURLcode result = CURLE_OK; + + if(conn && conn->handler->doing) { + *done = FALSE; + result = conn->handler->doing(conn, done); + } + else + *done = TRUE; + + return result; +} + +/* + * We have discovered that the TCP connection has been successful, we can now + * proceed with some action. + * + */ +static CURLcode protocol_connect(struct connectdata *conn, + bool *protocol_done) +{ + CURLcode result = CURLE_OK; + + DEBUGASSERT(conn); + DEBUGASSERT(protocol_done); + + *protocol_done = FALSE; + + if(conn->bits.tcpconnect[FIRSTSOCKET] && conn->bits.protoconnstart) { + /* We already are connected, get back. This may happen when the connect + worked fine in the first call, like when we connect to a local server + or proxy. Note that we don't know if the protocol is actually done. + + Unless this protocol doesn't have any protocol-connect callback, as + then we know we're done. */ + if(!conn->handler->connecting) + *protocol_done = TRUE; + + return CURLE_OK; + } + + if(!conn->bits.protoconnstart) { + + result = Curl_proxy_connect(conn, FIRSTSOCKET); + if(result) + return result; + + if(CONNECT_FIRSTSOCKET_PROXY_SSL()) + /* wait for HTTPS proxy SSL initialization to complete */ + return CURLE_OK; + + if(conn->bits.tunnel_proxy && conn->bits.httpproxy && + Curl_connect_ongoing(conn)) + /* when using an HTTP tunnel proxy, await complete tunnel establishment + before proceeding further. Return CURLE_OK so we'll be called again */ + return CURLE_OK; + + if(conn->handler->connect_it) { + /* is there a protocol-specific connect() procedure? */ + + /* Call the protocol-specific connect function */ + result = conn->handler->connect_it(conn, protocol_done); + } + else + *protocol_done = TRUE; + + /* it has started, possibly even completed but that knowledge isn't stored + in this bit! */ + if(!result) + conn->bits.protoconnstart = TRUE; + } + + return result; /* pass back status */ +} + + static CURLMcode multi_runsingle(struct Curl_multi *multi, struct curltime now, struct Curl_easy *data) @@ -1233,7 +1515,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, struct Curl_message *msg = NULL; bool connected; bool async; - bool protocol_connect = FALSE; + bool protocol_connected = FALSE; bool dophase_done = FALSE; bool done = FALSE; CURLMcode rc; @@ -1252,6 +1534,9 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, bool stream_error = FALSE; rc = CURLM_OK; + DEBUGASSERT((data->mstate <= CURLM_STATE_CONNECT) || + (data->mstate >= CURLM_STATE_DONE) || + data->conn); if(!data->conn && data->mstate > CURLM_STATE_CONNECT && data->mstate < CURLM_STATE_DONE) { @@ -1349,7 +1634,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, if(data->set.connecttimeout) Curl_expire(data, data->set.connecttimeout, EXPIRE_CONNECTTIMEOUT); - result = Curl_connect(data, &async, &protocol_connect); + result = Curl_connect(data, &async, &protocol_connected); if(CURLE_NO_CONNECTION_AVAILABLE == result) { /* There was no connection available. We will go to the pending state and wait for an available connection. */ @@ -1377,7 +1662,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, WAITDO or DO! */ rc = CURLM_CALL_MULTI_PERFORM; - if(protocol_connect) + if(protocol_connected) multistate(data, CURLM_STATE_DO); else { #ifndef CURL_DISABLE_HTTP @@ -1432,7 +1717,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, if(dns) { /* Perform the next step in the connection phase, and then move on to the WAITCONNECT state */ - result = Curl_once_resolved(data->conn, &protocol_connect); + result = Curl_once_resolved(data->conn, &protocol_connected); if(result) /* if Curl_once_resolved() returns failure, the connection struct @@ -1441,7 +1726,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, else { /* call again please so that we get the next socket setup */ rc = CURLM_CALL_MULTI_PERFORM; - if(protocol_connect) + if(protocol_connected) multistate(data, CURLM_STATE_DO); else { #ifndef CURL_DISABLE_HTTP @@ -1466,7 +1751,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, case CURLM_STATE_WAITPROXYCONNECT: /* this is HTTP-specific, but sending CONNECT to a proxy is HTTP... */ DEBUGASSERT(data->conn); - result = Curl_http_connect(data->conn, &protocol_connect); + result = Curl_http_connect(data->conn, &protocol_connected); if(data->conn->bits.proxy_connect_closed) { rc = CURLM_CALL_MULTI_PERFORM; @@ -1517,8 +1802,8 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, break; case CURLM_STATE_SENDPROTOCONNECT: - result = Curl_protocol_connect(data->conn, &protocol_connect); - if(!result && !protocol_connect) + result = protocol_connect(data->conn, &protocol_connected); + if(!result && !protocol_connected) /* switch to waiting state */ multistate(data, CURLM_STATE_PROTOCONNECT); else if(!result) { @@ -1536,8 +1821,8 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, case CURLM_STATE_PROTOCONNECT: /* protocol-specific connect phase */ - result = Curl_protocol_connecting(data->conn, &protocol_connect); - if(!result && protocol_connect) { + result = protocol_connecting(data->conn, &protocol_connected); + if(!result && protocol_connected) { /* after the connect has completed, go WAITDO or DO */ multistate(data, CURLM_STATE_DO); rc = CURLM_CALL_MULTI_PERFORM; @@ -1659,8 +1944,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, case CURLM_STATE_DOING: /* we continue DOING until the DO phase is complete */ DEBUGASSERT(data->conn); - result = Curl_protocol_doing(data->conn, - &dophase_done); + result = protocol_doing(data->conn, &dophase_done); if(!result) { if(dophase_done) { /* after DO, go DO_DONE or DO_MORE */ @@ -1911,8 +2195,13 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, } } } - else if(comeback) - rc = CURLM_CALL_MULTI_PERFORM; + else if(comeback) { + /* This avoids CURLM_CALL_MULTI_PERFORM so that a very fast transfer + won't get stuck on this transfer at the expense of other concurrent + transfers */ + Curl_expire(data, 0, EXPIRE_RUN_NOW); + rc = CURLM_OK; + } break; } @@ -1988,13 +2277,15 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, if(stream_error) { /* Don't attempt to send data over a connection that timed out */ bool dead_connection = result == CURLE_OPERATION_TIMEDOUT; - /* disconnect properly */ - Curl_disconnect(data, data->conn, dead_connection); + struct connectdata *conn = data->conn; /* This is where we make sure that the conn pointer is reset. We don't have to do this in every case block above where a failure is detected */ detach_connnection(data); + + /* disconnect properly */ + Curl_disconnect(data, conn, dead_connection); } } else if(data->mstate == CURLM_STATE_CONNECT) { @@ -2093,7 +2384,7 @@ CURLMcode curl_multi_perform(struct Curl_multi *multi, int *running_handles) *running_handles = multi->num_alive; if(CURLM_OK >= returncode) - update_timer(multi); + Curl_update_timer(multi); return returncode; } @@ -2145,6 +2436,11 @@ CURLMcode curl_multi_cleanup(struct Curl_multi *multi) Curl_hash_destroy(&multi->hostcache); Curl_psl_destroy(&multi->psl); + +#ifdef ENABLE_WAKEUP + sclose(multi->wakeup_pair[0]); + sclose(multi->wakeup_pair[1]); +#endif free(multi); return CURLM_OK; @@ -2210,7 +2506,7 @@ static CURLMcode singlesocket(struct Curl_multi *multi, /* Fill in the 'current' struct with the state as it is now: what sockets to supervise and for what actions */ - curraction = multi_getsock(data, socks, MAX_SOCKSPEREASYHANDLE); + curraction = multi_getsock(data, socks); /* We have 0 .. N sockets already and we get to know about the 0 .. M sockets we should have from now on. Detect the differences, remove no @@ -2238,14 +2534,14 @@ static CURLMcode singlesocket(struct Curl_multi *multi, actions[i] = action; if(entry) { /* check if new for this transfer */ - for(i = 0; i< data->numsocks; i++) { - if(s == data->sockets[i]) { - prevaction = data->actions[i]; + int j; + for(j = 0; j< data->numsocks; j++) { + if(s == data->sockets[j]) { + prevaction = data->actions[j]; sincebefore = TRUE; break; } } - } else { /* this is a socket we didn't have before, add it to the hash! */ @@ -2273,29 +2569,22 @@ static CURLMcode singlesocket(struct Curl_multi *multi, if(action & CURL_POLL_OUT) entry->writers++; - /* add 'data' to the list of handles using this socket! */ - Curl_llist_insert_next(&entry->list, entry->list.tail, - data, &data->sh_queue); + /* add 'data' to the transfer hash on this socket! */ + if(!Curl_hash_add(&entry->transfers, (char *)&data, /* hash key */ + sizeof(struct Curl_easy *), data)) + return CURLM_OUT_OF_MEMORY; } comboaction = (entry->writers? CURL_POLL_OUT : 0) | (entry->readers ? CURL_POLL_IN : 0); -#if 0 - infof(data, "--- Comboaction: %u readers %u writers\n", - entry->readers, entry->writers); -#endif - /* check if it has the same action set */ - if(entry->action == comboaction) + /* socket existed before and has the same action set as before */ + if(sincebefore && (entry->action == comboaction)) /* same, continue */ continue; - /* we know (entry != NULL) at this point, see the logic above */ if(multi->socket_cb) - multi->socket_cb(data, - s, - comboaction, - multi->socket_userp, + multi->socket_cb(data, s, comboaction, multi->socket_userp, entry->socketp); entry->action = comboaction; /* store the current action state */ @@ -2335,11 +2624,14 @@ static CURLMcode singlesocket(struct Curl_multi *multi, multi->socket_cb(data, s, CURL_POLL_REMOVE, multi->socket_userp, entry->socketp); - sh_delentry(&multi->sockhash, s); + sh_delentry(entry, &multi->sockhash, s); } else { - /* remove this transfer as a user of this socket */ - Curl_llist_remove(&entry->list, &data->sh_queue, NULL); + /* still users, but remove this handle as a user of this socket */ + if(Curl_hash_delete(&entry->transfers, (char *)&data, + sizeof(struct Curl_easy *))) { + DEBUGASSERT(NULL); + } } } } /* for loop over numsocks */ @@ -2383,7 +2675,7 @@ void Curl_multi_closed(struct Curl_easy *data, curl_socket_t s) entry->socketp); /* now remove it from the socket hash */ - sh_delentry(&multi->sockhash, s); + sh_delentry(entry, &multi->sockhash, s); } } } @@ -2474,7 +2766,6 @@ static CURLMcode multi_socket(struct Curl_multi *multi, return result; } if(s != CURL_SOCKET_TIMEOUT) { - struct Curl_sh_entry *entry = sh_getentry(&multi->sockhash, s); if(!entry) @@ -2485,37 +2776,22 @@ static CURLMcode multi_socket(struct Curl_multi *multi, and just move on. */ ; else { - struct curl_llist *list = &entry->list; - struct curl_llist_element *e; - SIGPIPE_VARIABLE(pipe_st); + struct curl_hash_iterator iter; + struct curl_hash_element *he; /* the socket can be shared by many transfers, iterate */ - for(e = list->head; e; e = e->next) { - data = (struct Curl_easy *)e->ptr; - - if(data->magic != CURLEASY_MAGIC_NUMBER) - /* bad bad bad bad bad bad bad */ - return CURLM_INTERNAL_ERROR; + Curl_hash_start_iterate(&entry->transfers, &iter); + for(he = Curl_hash_next_element(&iter); he; + he = Curl_hash_next_element(&iter)) { + data = (struct Curl_easy *)he->ptr; + DEBUGASSERT(data); + DEBUGASSERT(data->magic == CURLEASY_MAGIC_NUMBER); if(data->conn && !(data->conn->handler->flags & PROTOPT_DIRLOCK)) /* set socket event bitmask if they're not locked */ data->conn->cselect_bits = ev_bitmask; - sigpipe_ignore(data, &pipe_st); - result = multi_runsingle(multi, now, data); - sigpipe_restore(&pipe_st); - - if(data->conn && !(data->conn->handler->flags & PROTOPT_DIRLOCK)) - /* clear the bitmask only if not locked */ - data->conn->cselect_bits = 0; - - if(CURLM_OK >= result) { - /* get the socket(s) and check if the state has been changed since - last */ - result = singlesocket(multi, data); - if(result) - return result; - } + Curl_expire(data, 0, EXPIRE_RUN_NOW); } /* Now we fall-through and do the timer-based stuff, since we don't want @@ -2530,9 +2806,10 @@ static CURLMcode multi_socket(struct Curl_multi *multi, } else { /* Asked to run due to time-out. Clear the 'lastcall' variable to force - update_timer() to trigger a callback to the app again even if the same - timeout is still the one to run after this call. That handles the case - when the application asks libcurl to run the timeout prematurely. */ + Curl_update_timer() to trigger a callback to the app again even if the + same timeout is still the one to run after this call. That handles the + case when the application asks libcurl to run the timeout + prematurely. */ memset(&multi->timer_lastcall, 0, sizeof(multi->timer_lastcall)); } @@ -2631,6 +2908,16 @@ CURLMcode curl_multi_setopt(struct Curl_multi *multi, break; case CURLMOPT_PIPELINING_SERVER_BL: break; + case CURLMOPT_MAX_CONCURRENT_STREAMS: + { + long streams = va_arg(param, long); + if(streams < 1) + streams = 100; + multi->max_concurrent_streams = + (streams > (long)INITIAL_MAX_CONCURRENT_STREAMS)? + INITIAL_MAX_CONCURRENT_STREAMS : (unsigned int)streams; + } + break; default: res = CURLM_UNKNOWN_OPTION; break; @@ -2650,7 +2937,7 @@ CURLMcode curl_multi_socket(struct Curl_multi *multi, curl_socket_t s, return CURLM_RECURSIVE_API_CALL; result = multi_socket(multi, FALSE, s, 0, running_handles); if(CURLM_OK >= result) - update_timer(multi); + Curl_update_timer(multi); return result; } @@ -2662,7 +2949,7 @@ CURLMcode curl_multi_socket_action(struct Curl_multi *multi, curl_socket_t s, return CURLM_RECURSIVE_API_CALL; result = multi_socket(multi, FALSE, s, ev_bitmask, running_handles); if(CURLM_OK >= result) - update_timer(multi); + Curl_update_timer(multi); return result; } @@ -2674,7 +2961,7 @@ CURLMcode curl_multi_socket_all(struct Curl_multi *multi, int *running_handles) return CURLM_RECURSIVE_API_CALL; result = multi_socket(multi, TRUE, CURL_SOCKET_BAD, 0, running_handles); if(CURLM_OK >= result) - update_timer(multi); + Curl_update_timer(multi); return result; } @@ -2734,14 +3021,14 @@ CURLMcode curl_multi_timeout(struct Curl_multi *multi, * Tell the application it should update its timers, if it subscribes to the * update timer callback. */ -static int update_timer(struct Curl_multi *multi) +void Curl_update_timer(struct Curl_multi *multi) { long timeout_ms; if(!multi->timer_cb) - return 0; + return; if(multi_timeout(multi, &timeout_ms)) { - return -1; + return; } if(timeout_ms < 0) { static const struct curltime none = {0, 0}; @@ -2749,9 +3036,10 @@ static int update_timer(struct Curl_multi *multi) multi->timer_lastcall = none; /* there's no timeout now but there was one previously, tell the app to disable it */ - return multi->timer_cb(multi, -1, multi->timer_userp); + multi->timer_cb(multi, -1, multi->timer_userp); + return; } - return 0; + return; } /* When multi_timeout() is done, multi->timetree points to the node with the @@ -2759,11 +3047,11 @@ static int update_timer(struct Curl_multi *multi) * if this is the same (fixed) time as we got in a previous call and then * avoid calling the callback again. */ if(Curl_splaycomparekeys(multi->timetree->key, multi->timer_lastcall) == 0) - return 0; + return; multi->timer_lastcall = multi->timetree->key; - return multi->timer_cb(multi, timeout_ms, multi->timer_userp); + multi->timer_cb(multi, timeout_ms, multi->timer_userp); } /* @@ -2840,7 +3128,7 @@ multi_addtimeout(struct Curl_easy *data, * * Expire replaces a former timeout using the same id if already set. */ -void Curl_expire(struct Curl_easy *data, time_t milli, expire_id id) +void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id id) { struct Curl_multi *multi = data->multi; struct curltime *nowp = &data->state.expiretime; @@ -2854,7 +3142,7 @@ void Curl_expire(struct Curl_easy *data, time_t milli, expire_id id) DEBUGASSERT(id < EXPIRE_LAST); set = Curl_now(); - set.tv_sec += milli/1000; + set.tv_sec += (time_t)(milli/1000); /* might be a 64 to 32 bit conversion */ set.tv_usec += (unsigned int)(milli%1000)*1000; if(set.tv_usec >= 1000000) { @@ -3068,3 +3356,9 @@ void Curl_multi_dump(struct Curl_multi *multi) } } #endif + +unsigned int Curl_multi_max_concurrent_streams(struct Curl_multi *multi) +{ + DEBUGASSERT(multi); + return multi->max_concurrent_streams; +} |