diff options
Diffstat (limited to 'lib/cf-h2-proxy.c')
-rw-r--r-- | lib/cf-h2-proxy.c | 531 |
1 files changed, 329 insertions, 202 deletions
diff --git a/lib/cf-h2-proxy.c b/lib/cf-h2-proxy.c index 8e76ff8..f6acfc5 100644 --- a/lib/cf-h2-proxy.c +++ b/lib/cf-h2-proxy.c @@ -44,26 +44,25 @@ #include "curl_memory.h" #include "memdebug.h" -#define H2_NW_CHUNK_SIZE (128*1024) -#define H2_NW_RECV_CHUNKS 1 -#define H2_NW_SEND_CHUNKS 1 +#define H2_CHUNK_SIZE (16*1024) -#define HTTP2_HUGE_WINDOW_SIZE (32 * 1024 * 1024) /* 32 MB */ +#define PROXY_HTTP2_HUGE_WINDOW_SIZE (100 * 1024 * 1024) +#define H2_TUNNEL_WINDOW_SIZE (10 * 1024 * 1024) + +#define PROXY_H2_NW_RECV_CHUNKS (H2_TUNNEL_WINDOW_SIZE / H2_CHUNK_SIZE) +#define PROXY_H2_NW_SEND_CHUNKS 1 + +#define H2_TUNNEL_RECV_CHUNKS (H2_TUNNEL_WINDOW_SIZE / H2_CHUNK_SIZE) +#define H2_TUNNEL_SEND_CHUNKS ((128 * 1024) / H2_CHUNK_SIZE) -#define H2_TUNNEL_WINDOW_SIZE (1024 * 1024) -#define H2_TUNNEL_CHUNK_SIZE (32 * 1024) -#define H2_TUNNEL_RECV_CHUNKS \ - (H2_TUNNEL_WINDOW_SIZE / H2_TUNNEL_CHUNK_SIZE) -#define H2_TUNNEL_SEND_CHUNKS \ - (H2_TUNNEL_WINDOW_SIZE / H2_TUNNEL_CHUNK_SIZE) typedef enum { - TUNNEL_INIT, /* init/default/no tunnel state */ - TUNNEL_CONNECT, /* CONNECT request is being send */ - TUNNEL_RESPONSE, /* CONNECT response received completely */ - TUNNEL_ESTABLISHED, - TUNNEL_FAILED -} tunnel_state; + H2_TUNNEL_INIT, /* init/default/no tunnel state */ + H2_TUNNEL_CONNECT, /* CONNECT request is being send */ + H2_TUNNEL_RESPONSE, /* CONNECT response received completely */ + H2_TUNNEL_ESTABLISHED, + H2_TUNNEL_FAILED +} h2_tunnel_state; struct tunnel_stream { struct http_resp *resp; @@ -72,10 +71,11 @@ struct tunnel_stream { char *authority; int32_t stream_id; uint32_t error; - tunnel_state state; - bool has_final_response; - bool closed; - bool reset; + size_t upload_blocked_len; + h2_tunnel_state state; + BIT(has_final_response); + BIT(closed); + BIT(reset); }; static CURLcode tunnel_stream_init(struct Curl_cfilter *cf, @@ -85,11 +85,11 @@ static CURLcode tunnel_stream_init(struct Curl_cfilter *cf, int port; bool ipv6_ip = cf->conn->bits.ipv6_ip; - ts->state = TUNNEL_INIT; + ts->state = H2_TUNNEL_INIT; ts->stream_id = -1; - Curl_bufq_init2(&ts->recvbuf, H2_TUNNEL_CHUNK_SIZE, H2_TUNNEL_RECV_CHUNKS, + Curl_bufq_init2(&ts->recvbuf, H2_CHUNK_SIZE, H2_TUNNEL_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT); - Curl_bufq_init(&ts->sendbuf, H2_TUNNEL_CHUNK_SIZE, H2_TUNNEL_SEND_CHUNKS); + Curl_bufq_init(&ts->sendbuf, H2_CHUNK_SIZE, H2_TUNNEL_SEND_CHUNKS); if(cf->conn->bits.conn_to_host) hostname = cf->conn->conn_to_host.name; @@ -123,13 +123,13 @@ static void tunnel_stream_clear(struct tunnel_stream *ts) Curl_bufq_free(&ts->sendbuf); Curl_safefree(ts->authority); memset(ts, 0, sizeof(*ts)); - ts->state = TUNNEL_INIT; + ts->state = H2_TUNNEL_INIT; } -static void tunnel_go_state(struct Curl_cfilter *cf, - struct tunnel_stream *ts, - tunnel_state new_state, - struct Curl_easy *data) +static void h2_tunnel_go_state(struct Curl_cfilter *cf, + struct tunnel_stream *ts, + h2_tunnel_state new_state, + struct Curl_easy *data) { (void)cf; @@ -137,7 +137,7 @@ static void tunnel_go_state(struct Curl_cfilter *cf, return; /* leaving this one */ switch(ts->state) { - case TUNNEL_CONNECT: + case H2_TUNNEL_CONNECT: data->req.ignorebody = FALSE; break; default: @@ -145,29 +145,29 @@ static void tunnel_go_state(struct Curl_cfilter *cf, } /* entering this one */ switch(new_state) { - case TUNNEL_INIT: + case H2_TUNNEL_INIT: DEBUGF(LOG_CF(data, cf, "new tunnel state 'init'")); tunnel_stream_clear(ts); break; - case TUNNEL_CONNECT: + case H2_TUNNEL_CONNECT: DEBUGF(LOG_CF(data, cf, "new tunnel state 'connect'")); - ts->state = TUNNEL_CONNECT; + ts->state = H2_TUNNEL_CONNECT; break; - case TUNNEL_RESPONSE: + case H2_TUNNEL_RESPONSE: DEBUGF(LOG_CF(data, cf, "new tunnel state 'response'")); - ts->state = TUNNEL_RESPONSE; + ts->state = H2_TUNNEL_RESPONSE; break; - case TUNNEL_ESTABLISHED: + case H2_TUNNEL_ESTABLISHED: DEBUGF(LOG_CF(data, cf, "new tunnel state 'established'")); infof(data, "CONNECT phase completed"); data->state.authproxy.done = TRUE; data->state.authproxy.multipass = FALSE; /* FALLTHROUGH */ - case TUNNEL_FAILED: - if(new_state == TUNNEL_FAILED) + case H2_TUNNEL_FAILED: + if(new_state == H2_TUNNEL_FAILED) DEBUGF(LOG_CF(data, cf, "new tunnel state 'failed'")); ts->state = new_state; /* If a proxy-authorization header was used for the proxy, then we should @@ -191,9 +191,11 @@ struct cf_h2_proxy_ctx { int32_t last_stream_id; BIT(conn_closed); BIT(goaway); + BIT(nw_out_blocked); }; /* How to access `call_data` from a cf_h2 filter */ +#undef CF_CTX_CALL_DATA #define CF_CTX_CALL_DATA(cf) \ ((struct cf_h2_proxy_ctx *)(cf)->ctx)->call_data @@ -219,35 +221,54 @@ static void cf_h2_proxy_ctx_free(struct cf_h2_proxy_ctx *ctx) } } -static ssize_t nw_in_reader(void *reader_ctx, - unsigned char *buf, size_t buflen, - CURLcode *err) +static void drain_tunnel(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct tunnel_stream *tunnel) +{ + unsigned char bits; + + (void)cf; + bits = CURL_CSELECT_IN; + if(!tunnel->closed && !tunnel->reset && tunnel->upload_blocked_len) + bits |= CURL_CSELECT_OUT; + if(data->state.dselect_bits != bits) { + DEBUGF(LOG_CF(data, cf, "[h2sid=%d] DRAIN dselect_bits=%x", + tunnel->stream_id, bits)); + data->state.dselect_bits = bits; + Curl_expire(data, 0, EXPIRE_RUN_NOW); + } +} + +static ssize_t proxy_nw_in_reader(void *reader_ctx, + unsigned char *buf, size_t buflen, + CURLcode *err) { struct Curl_cfilter *cf = reader_ctx; struct Curl_easy *data = CF_DATA_CURRENT(cf); ssize_t nread; nread = Curl_conn_cf_recv(cf->next, data, (char *)buf, buflen, err); - DEBUGF(LOG_CF(data, cf, "nw_in recv(len=%zu) -> %zd, %d", + DEBUGF(LOG_CF(data, cf, "nw_in_reader(len=%zu) -> %zd, %d", buflen, nread, *err)); return nread; } -static ssize_t nw_out_writer(void *writer_ctx, - const unsigned char *buf, size_t buflen, - CURLcode *err) +static ssize_t proxy_h2_nw_out_writer(void *writer_ctx, + const unsigned char *buf, size_t buflen, + CURLcode *err) { struct Curl_cfilter *cf = writer_ctx; struct Curl_easy *data = CF_DATA_CURRENT(cf); ssize_t nwritten; nwritten = Curl_conn_cf_send(cf->next, data, (const char *)buf, buflen, err); - DEBUGF(LOG_CF(data, cf, "nw_out send(len=%zu) -> %zd", buflen, nwritten)); + DEBUGF(LOG_CF(data, cf, "nw_out_writer(len=%zu) -> %zd, %d", + buflen, nwritten, *err)); return nwritten; } -static int h2_client_new(struct Curl_cfilter *cf, - nghttp2_session_callbacks *cbs) +static int proxy_h2_client_new(struct Curl_cfilter *cf, + nghttp2_session_callbacks *cbs) { struct cf_h2_proxy_ctx *ctx = cf->ctx; nghttp2_option *o; @@ -271,15 +292,18 @@ static int h2_client_new(struct Curl_cfilter *cf, static ssize_t on_session_send(nghttp2_session *h2, const uint8_t *buf, size_t blen, int flags, void *userp); -static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, - void *userp); -static int on_stream_close(nghttp2_session *session, int32_t stream_id, - uint32_t error_code, void *userp); -static int on_header(nghttp2_session *session, const nghttp2_frame *frame, - const uint8_t *name, size_t namelen, - const uint8_t *value, size_t valuelen, - uint8_t flags, - void *userp); +static int proxy_h2_on_frame_recv(nghttp2_session *session, + const nghttp2_frame *frame, + void *userp); +static int proxy_h2_on_stream_close(nghttp2_session *session, + int32_t stream_id, + uint32_t error_code, void *userp); +static int proxy_h2_on_header(nghttp2_session *session, + const nghttp2_frame *frame, + const uint8_t *name, size_t namelen, + const uint8_t *value, size_t valuelen, + uint8_t flags, + void *userp); static int tunnel_recv_callback(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *mem, size_t len, void *userp); @@ -298,8 +322,8 @@ static CURLcode cf_h2_proxy_ctx_init(struct Curl_cfilter *cf, DEBUGASSERT(!ctx->h2); memset(&ctx->tunnel, 0, sizeof(ctx->tunnel)); - Curl_bufq_init(&ctx->inbufq, H2_NW_CHUNK_SIZE, H2_NW_RECV_CHUNKS); - Curl_bufq_init(&ctx->outbufq, H2_NW_CHUNK_SIZE, H2_NW_SEND_CHUNKS); + Curl_bufq_init(&ctx->inbufq, H2_CHUNK_SIZE, PROXY_H2_NW_RECV_CHUNKS); + Curl_bufq_init(&ctx->outbufq, H2_CHUNK_SIZE, PROXY_H2_NW_SEND_CHUNKS); if(tunnel_stream_init(cf, &ctx->tunnel)) goto out; @@ -311,14 +335,16 @@ static CURLcode cf_h2_proxy_ctx_init(struct Curl_cfilter *cf, } nghttp2_session_callbacks_set_send_callback(cbs, on_session_send); - nghttp2_session_callbacks_set_on_frame_recv_callback(cbs, on_frame_recv); + nghttp2_session_callbacks_set_on_frame_recv_callback( + cbs, proxy_h2_on_frame_recv); nghttp2_session_callbacks_set_on_data_chunk_recv_callback( cbs, tunnel_recv_callback); - nghttp2_session_callbacks_set_on_stream_close_callback(cbs, on_stream_close); - nghttp2_session_callbacks_set_on_header_callback(cbs, on_header); + nghttp2_session_callbacks_set_on_stream_close_callback( + cbs, proxy_h2_on_stream_close); + nghttp2_session_callbacks_set_on_header_callback(cbs, proxy_h2_on_header); /* The nghttp2 session is not yet setup, do it */ - rc = h2_client_new(cf, cbs); + rc = proxy_h2_client_new(cf, cbs); if(rc) { failf(data, "Couldn't initialize nghttp2"); goto out; @@ -343,7 +369,7 @@ static CURLcode cf_h2_proxy_ctx_init(struct Curl_cfilter *cf, } rc = nghttp2_session_set_local_window_size(ctx->h2, NGHTTP2_FLAG_NONE, 0, - HTTP2_HUGE_WINDOW_SIZE); + PROXY_HTTP2_HUGE_WINDOW_SIZE); if(rc) { failf(data, "nghttp2_session_set_local_window_size() failed: %s(%d)", nghttp2_strerror(rc), rc); @@ -362,27 +388,35 @@ out: return result; } -static CURLcode nw_out_flush(struct Curl_cfilter *cf, - struct Curl_easy *data) +static int should_close_session(struct cf_h2_proxy_ctx *ctx) +{ + return !nghttp2_session_want_read(ctx->h2) && + !nghttp2_session_want_write(ctx->h2); +} + +static CURLcode proxy_h2_nw_out_flush(struct Curl_cfilter *cf, + struct Curl_easy *data) { struct cf_h2_proxy_ctx *ctx = cf->ctx; - size_t buflen = Curl_bufq_len(&ctx->outbufq); ssize_t nwritten; CURLcode result; (void)data; - if(!buflen) + if(Curl_bufq_is_empty(&ctx->outbufq)) return CURLE_OK; - DEBUGF(LOG_CF(data, cf, "h2 conn flush %zu bytes", buflen)); - nwritten = Curl_bufq_pass(&ctx->outbufq, nw_out_writer, cf, &result); + nwritten = Curl_bufq_pass(&ctx->outbufq, proxy_h2_nw_out_writer, cf, + &result); if(nwritten < 0) { + if(result == CURLE_AGAIN) { + DEBUGF(LOG_CF(data, cf, "flush nw send buffer(%zu) -> EAGAIN", + Curl_bufq_len(&ctx->outbufq))); + ctx->nw_out_blocked = 1; + } return result; } - if((size_t)nwritten < buflen) { - return CURLE_AGAIN; - } - return CURLE_OK; + DEBUGF(LOG_CF(data, cf, "nw send buffer flushed")); + return Curl_bufq_is_empty(&ctx->outbufq)? CURLE_OK: CURLE_AGAIN; } /* @@ -390,9 +424,9 @@ static CURLcode nw_out_flush(struct Curl_cfilter *cf, * This function returns 0 if it succeeds, or -1 and error code will * be assigned to *err. */ -static int h2_process_pending_input(struct Curl_cfilter *cf, - struct Curl_easy *data, - CURLcode *err) +static int proxy_h2_process_pending_input(struct Curl_cfilter *cf, + struct Curl_easy *data, + CURLcode *err) { struct cf_h2_proxy_ctx *ctx = cf->ctx; const unsigned char *buf; @@ -422,19 +456,11 @@ static int h2_process_pending_input(struct Curl_cfilter *cf, } } - if(nghttp2_session_check_request_allowed(ctx->h2) == 0) { - /* No more requests are allowed in the current session, so - the connection may not be reused. This is set when a - GOAWAY frame has been received or when the limit of stream - identifiers has been reached. */ - connclose(cf->conn, "http/2: No new requests allowed"); - } - return 0; } -static CURLcode h2_progress_ingress(struct Curl_cfilter *cf, - struct Curl_easy *data) +static CURLcode proxy_h2_progress_ingress(struct Curl_cfilter *cf, + struct Curl_easy *data) { struct cf_h2_proxy_ctx *ctx = cf->ctx; CURLcode result = CURLE_OK; @@ -442,9 +468,9 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf, /* Process network input buffer fist */ if(!Curl_bufq_is_empty(&ctx->inbufq)) { - DEBUGF(LOG_CF(data, cf, "Process %zd bytes in connection buffer", + DEBUGF(LOG_CF(data, cf, "Process %zu bytes in connection buffer", Curl_bufq_len(&ctx->inbufq))); - if(h2_process_pending_input(cf, data, &result) < 0) + if(proxy_h2_process_pending_input(cf, data, &result) < 0) return result; } @@ -455,8 +481,8 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf, Curl_bufq_is_empty(&ctx->inbufq) && /* and we consumed our input */ !Curl_bufq_is_full(&ctx->tunnel.recvbuf)) { - nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result); - DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d", + nread = Curl_bufq_slurp(&ctx->inbufq, proxy_nw_in_reader, cf, &result); + DEBUGF(LOG_CF(data, cf, "read %zu bytes nw data -> %zd, %d", Curl_bufq_len(&ctx->inbufq), nread, result)); if(nread < 0) { if(result != CURLE_AGAIN) { @@ -470,7 +496,7 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf, break; } - if(h2_process_pending_input(cf, data, &result)) + if(proxy_h2_process_pending_input(cf, data, &result)) return result; } @@ -481,25 +507,22 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf, return CURLE_OK; } -/* - * Check if there's been an update in the priority / - * dependency settings and if so it submits a PRIORITY frame with the updated - * info. - * Flush any out data pending in the network buffer. - */ -static CURLcode h2_progress_egress(struct Curl_cfilter *cf, - struct Curl_easy *data) +static CURLcode proxy_h2_progress_egress(struct Curl_cfilter *cf, + struct Curl_easy *data) { struct cf_h2_proxy_ctx *ctx = cf->ctx; int rv = 0; - rv = nghttp2_session_send(ctx->h2); + ctx->nw_out_blocked = 0; + while(!rv && !ctx->nw_out_blocked && nghttp2_session_want_write(ctx->h2)) + rv = nghttp2_session_send(ctx->h2); + if(nghttp2_is_fatal(rv)) { DEBUGF(LOG_CF(data, cf, "nghttp2_session_send error (%s)%d", nghttp2_strerror(rv), rv)); return CURLE_SEND_ERROR; } - return nw_out_flush(cf, data); + return proxy_h2_nw_out_flush(cf, data); } static ssize_t on_session_send(nghttp2_session *h2, @@ -517,7 +540,7 @@ static ssize_t on_session_send(nghttp2_session *h2, DEBUGASSERT(data); nwritten = Curl_bufq_write_pass(&ctx->outbufq, buf, blen, - nw_out_writer, cf, &result); + proxy_h2_nw_out_writer, cf, &result); if(nwritten < 0) { if(result == CURLE_AGAIN) { return NGHTTP2_ERR_WOULDBLOCK; @@ -532,8 +555,9 @@ static ssize_t on_session_send(nghttp2_session *h2, return nwritten; } -static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, - void *userp) +static int proxy_h2_on_frame_recv(nghttp2_session *session, + const nghttp2_frame *frame, + void *userp) { struct Curl_cfilter *cf = userp; struct cf_h2_proxy_ctx *ctx = cf->ctx; @@ -616,11 +640,12 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, return 0; } -static int on_header(nghttp2_session *session, const nghttp2_frame *frame, - const uint8_t *name, size_t namelen, - const uint8_t *value, size_t valuelen, - uint8_t flags, - void *userp) +static int proxy_h2_on_header(nghttp2_session *session, + const nghttp2_frame *frame, + const uint8_t *name, size_t namelen, + const uint8_t *value, size_t valuelen, + uint8_t flags, + void *userp) { struct Curl_cfilter *cf = userp; struct cf_h2_proxy_ctx *ctx = cf->ctx; @@ -752,8 +777,9 @@ static int tunnel_recv_callback(nghttp2_session *session, uint8_t flags, return 0; } -static int on_stream_close(nghttp2_session *session, int32_t stream_id, - uint32_t error_code, void *userp) +static int proxy_h2_on_stream_close(nghttp2_session *session, + int32_t stream_id, + uint32_t error_code, void *userp) { struct Curl_cfilter *cf = userp; struct cf_h2_proxy_ctx *ctx = cf->ctx; @@ -765,7 +791,7 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id, if(stream_id != ctx->tunnel.stream_id) return 0; - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] on_stream_close, %s (err %d)", + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] proxy_h2_on_stream_close, %s (err %d)", stream_id, nghttp2_http2_strerror(error_code), error_code)); ctx->tunnel.closed = TRUE; ctx->tunnel.error = error_code; @@ -773,15 +799,15 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id, return 0; } -static CURLcode h2_submit(int32_t *pstream_id, - struct Curl_cfilter *cf, - struct Curl_easy *data, - nghttp2_session *h2, - struct httpreq *req, - const nghttp2_priority_spec *pri_spec, - void *stream_user_data, - nghttp2_data_source_read_callback read_callback, - void *read_ctx) +static CURLcode proxy_h2_submit(int32_t *pstream_id, + struct Curl_cfilter *cf, + struct Curl_easy *data, + nghttp2_session *h2, + struct httpreq *req, + const nghttp2_priority_spec *pri_spec, + void *stream_user_data, + nghttp2_data_source_read_callback read_callback, + void *read_ctx) { struct dynhds h2_headers; nghttp2_nv *nva = NULL; @@ -881,8 +907,8 @@ static CURLcode submit_CONNECT(struct Curl_cfilter *cf, if(result) goto out; - result = h2_submit(&ts->stream_id, cf, data, ctx->h2, req, - NULL, ts, tunnel_send_callback, cf); + result = proxy_h2_submit(&ts->stream_id, cf, data, ctx->h2, req, + NULL, ts, tunnel_send_callback, cf); if(result) { DEBUGF(LOG_CF(data, cf, "send: nghttp2_submit_request error (%s)%u", nghttp2_strerror(ts->stream_id), ts->stream_id)); @@ -907,7 +933,7 @@ static CURLcode inspect_response(struct Curl_cfilter *cf, DEBUGASSERT(ts->resp); if(ts->resp->status/100 == 2) { infof(data, "CONNECT tunnel established, response %d", ts->resp->status); - tunnel_go_state(cf, ts, TUNNEL_ESTABLISHED, data); + h2_tunnel_go_state(cf, ts, H2_TUNNEL_ESTABLISHED, data); return CURLE_OK; } @@ -928,7 +954,7 @@ static CURLcode inspect_response(struct Curl_cfilter *cf, if(data->req.newurl) { /* Inidicator that we should try again */ Curl_safefree(data->req.newurl); - tunnel_go_state(cf, ts, TUNNEL_INIT, data); + h2_tunnel_go_state(cf, ts, H2_TUNNEL_INIT, data); return CURLE_OK; } } @@ -937,9 +963,9 @@ static CURLcode inspect_response(struct Curl_cfilter *cf, return CURLE_RECV_ERROR; } -static CURLcode CONNECT(struct Curl_cfilter *cf, - struct Curl_easy *data, - struct tunnel_stream *ts) +static CURLcode H2_CONNECT(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct tunnel_stream *ts) { struct cf_h2_proxy_ctx *ctx = cf->ctx; CURLcode result = CURLE_OK; @@ -948,27 +974,27 @@ static CURLcode CONNECT(struct Curl_cfilter *cf, DEBUGASSERT(ts->authority); do { switch(ts->state) { - case TUNNEL_INIT: + case H2_TUNNEL_INIT: /* Prepare the CONNECT request and make a first attempt to send. */ DEBUGF(LOG_CF(data, cf, "CONNECT start for %s", ts->authority)); result = submit_CONNECT(cf, data, ts); if(result) goto out; - tunnel_go_state(cf, ts, TUNNEL_CONNECT, data); + h2_tunnel_go_state(cf, ts, H2_TUNNEL_CONNECT, data); /* FALLTHROUGH */ - case TUNNEL_CONNECT: + case H2_TUNNEL_CONNECT: /* see that the request is completely sent */ - result = h2_progress_ingress(cf, data); + result = proxy_h2_progress_ingress(cf, data); if(!result) - result = h2_progress_egress(cf, data); - if(result) { - tunnel_go_state(cf, ts, TUNNEL_FAILED, data); + result = proxy_h2_progress_egress(cf, data); + if(result && result != CURLE_AGAIN) { + h2_tunnel_go_state(cf, ts, H2_TUNNEL_FAILED, data); break; } if(ts->has_final_response) { - tunnel_go_state(cf, ts, TUNNEL_RESPONSE, data); + h2_tunnel_go_state(cf, ts, H2_TUNNEL_RESPONSE, data); } else { result = CURLE_OK; @@ -976,28 +1002,28 @@ static CURLcode CONNECT(struct Curl_cfilter *cf, } /* FALLTHROUGH */ - case TUNNEL_RESPONSE: + case H2_TUNNEL_RESPONSE: DEBUGASSERT(ts->has_final_response); result = inspect_response(cf, data, ts); if(result) goto out; break; - case TUNNEL_ESTABLISHED: + case H2_TUNNEL_ESTABLISHED: return CURLE_OK; - case TUNNEL_FAILED: + case H2_TUNNEL_FAILED: return CURLE_RECV_ERROR; default: break; } - } while(ts->state == TUNNEL_INIT); + } while(ts->state == H2_TUNNEL_INIT); out: if(result || ctx->tunnel.closed) - tunnel_go_state(cf, ts, TUNNEL_FAILED, data); + h2_tunnel_go_state(cf, ts, H2_TUNNEL_FAILED, data); return result; } @@ -1043,10 +1069,10 @@ static CURLcode cf_h2_proxy_connect(struct Curl_cfilter *cf, /* for the secondary socket (FTP), use the "connect to host" * but ignore the "connect to port" (use the secondary port) */ - result = CONNECT(cf, data, ts); + result = H2_CONNECT(cf, data, ts); out: - *done = (result == CURLE_OK) && (ts->state == TUNNEL_ESTABLISHED); + *done = (result == CURLE_OK) && (ts->state == H2_TUNNEL_ESTABLISHED); cf->connected = *done; CF_DATA_RESTORE(cf, save); return result; @@ -1082,7 +1108,7 @@ static bool cf_h2_proxy_data_pending(struct Curl_cfilter *cf, { struct cf_h2_proxy_ctx *ctx = cf->ctx; if((ctx && !Curl_bufq_is_empty(&ctx->inbufq)) || - (ctx && ctx->tunnel.state == TUNNEL_ESTABLISHED && + (ctx && ctx->tunnel.state == H2_TUNNEL_ESTABLISHED && !Curl_bufq_is_empty(&ctx->tunnel.recvbuf))) return TRUE; return cf->next? cf->next->cft->has_data_pending(cf->next, data) : FALSE; @@ -1188,14 +1214,14 @@ static ssize_t cf_h2_proxy_recv(struct Curl_cfilter *cf, struct cf_call_data save; CURLcode result; - if(ctx->tunnel.state != TUNNEL_ESTABLISHED) { + if(ctx->tunnel.state != H2_TUNNEL_ESTABLISHED) { *err = CURLE_RECV_ERROR; return -1; } CF_DATA_SAVE(save, cf, data); if(Curl_bufq_is_empty(&ctx->tunnel.recvbuf)) { - *err = h2_progress_ingress(cf, data); + *err = proxy_h2_progress_ingress(cf, data); if(*err) goto out; } @@ -1208,13 +1234,19 @@ static ssize_t cf_h2_proxy_recv(struct Curl_cfilter *cf, nghttp2_session_consume(ctx->h2, ctx->tunnel.stream_id, (size_t)nread); } - result = h2_progress_egress(cf, data); - if(result) { + result = proxy_h2_progress_egress(cf, data); + if(result && result != CURLE_AGAIN) { *err = result; nread = -1; } out: + if(!Curl_bufq_is_empty(&ctx->tunnel.recvbuf) && + (nread >= 0 || *err == CURLE_AGAIN)) { + /* data pending and no fatal error to report. Need to trigger + * draining to avoid stalling when no socket events happen. */ + drain_tunnel(cf, data, &ctx->tunnel); + } DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv(len=%zu) -> %zd %d", ctx->tunnel.stream_id, len, nread, *err)); CF_DATA_RESTORE(cf, save); @@ -1223,93 +1255,188 @@ out: static ssize_t cf_h2_proxy_send(struct Curl_cfilter *cf, struct Curl_easy *data, - const void *mem, size_t len, CURLcode *err) + const void *buf, size_t len, CURLcode *err) { struct cf_h2_proxy_ctx *ctx = cf->ctx; struct cf_call_data save; - ssize_t nwritten = -1; - const unsigned char *buf = mem; - size_t start_len = len; int rv; + ssize_t nwritten; + CURLcode result; + int blocked = 0; - if(ctx->tunnel.state != TUNNEL_ESTABLISHED) { + if(ctx->tunnel.state != H2_TUNNEL_ESTABLISHED) { *err = CURLE_SEND_ERROR; return -1; } CF_DATA_SAVE(save, cf, data); - while(len) { + if(ctx->tunnel.closed) { + nwritten = -1; + *err = CURLE_SEND_ERROR; + goto out; + } + else if(ctx->tunnel.upload_blocked_len) { + /* the data in `buf` has alread been submitted or added to the + * buffers, but have been EAGAINed on the last invocation. */ + DEBUGASSERT(len >= ctx->tunnel.upload_blocked_len); + if(len < ctx->tunnel.upload_blocked_len) { + /* Did we get called again with a smaller `len`? This should not + * happend. We are not prepared to handle that. */ + failf(data, "HTTP/2 proxy, send again with decreased length"); + *err = CURLE_HTTP2; + nwritten = -1; + goto out; + } + nwritten = (ssize_t)ctx->tunnel.upload_blocked_len; + ctx->tunnel.upload_blocked_len = 0; + } + else { nwritten = Curl_bufq_write(&ctx->tunnel.sendbuf, buf, len, err); - if(nwritten <= 0) { - if(*err && *err != CURLE_AGAIN) { - DEBUGF(LOG_CF(data, cf, "error adding data to tunnel sendbuf: %d", - *err)); - nwritten = -1; + if(nwritten < 0) { + if(*err != CURLE_AGAIN) goto out; - } - /* blocked */ nwritten = 0; } - else { - DEBUGASSERT((size_t)nwritten <= len); - buf += (size_t)nwritten; - len -= (size_t)nwritten; - } + } - /* resume the tunnel stream and let the h2 session send, which - * triggers reading from tunnel.sendbuf */ + if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) { + /* req body data is buffered, resume the potentially suspended stream */ rv = nghttp2_session_resume_data(ctx->h2, ctx->tunnel.stream_id); if(nghttp2_is_fatal(rv)) { *err = CURLE_SEND_ERROR; nwritten = -1; goto out; } - *err = h2_progress_egress(cf, data); - if(*err) { - nwritten = -1; - goto out; - } - - if(!nwritten && Curl_bufq_is_full(&ctx->tunnel.sendbuf)) { - size_t rwin; - /* we could not add to the buffer and after session processing, - * it is still full. */ - rwin = nghttp2_session_get_stream_remote_window_size( - ctx->h2, ctx->tunnel.stream_id); - DEBUGF(LOG_CF(data, cf, "cf_send: tunnel win %u/%zu", - nghttp2_session_get_remote_window_size(ctx->h2), rwin)); - if(rwin == 0) { - /* We cannot upload more as the stream's remote window size - * is 0. We need to receive WIN_UPDATEs before we can continue. - */ - data->req.keepon |= KEEP_SEND_HOLD; - DEBUGF(LOG_CF(data, cf, "pausing send as remote flow " - "window is exhausted")); - } - break; - } } - nwritten = start_len - len; - if(nwritten > 0) { - *err = CURLE_OK; + /* Call the nghttp2 send loop and flush to write ALL buffered data, + * headers and/or request body completely out to the network */ + result = proxy_h2_progress_egress(cf, data); + if(result == CURLE_AGAIN) { + blocked = 1; } - else if(ctx->tunnel.closed) { + else if(result) { + *err = result; nwritten = -1; - *err = CURLE_SEND_ERROR; + goto out; } - else { - nwritten = -1; + else if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) { + /* although we wrote everything that nghttp2 wants to send now, + * there is data left in our stream send buffer unwritten. This may + * be due to the stream's HTTP/2 flow window being exhausted. */ + blocked = 1; + } + + if(blocked) { + /* Unable to send all data, due to connection blocked or H2 window + * exhaustion. Data is left in our stream buffer, or nghttp2's internal + * frame buffer or our network out buffer. */ + size_t rwin = nghttp2_session_get_stream_remote_window_size( + ctx->h2, ctx->tunnel.stream_id); + if(rwin == 0) { + /* H2 flow window exhaustion. + * FIXME: there is no way to HOLD all transfers that use this + * proxy connection AND to UNHOLD all of them again when the + * window increases. + * We *could* iterate over all data on this conn maybe? */ + DEBUGF(LOG_CF(data, cf, "[h2sid=%d] remote flow " + "window is exhausted", ctx->tunnel.stream_id)); + } + + /* Whatever the cause, we need to return CURL_EAGAIN for this call. + * We have unwritten state that needs us being invoked again and EAGAIN + * is the only way to ensure that. */ + ctx->tunnel.upload_blocked_len = nwritten; + DEBUGF(LOG_CF(data, cf, "[h2sid=%d] cf_send(len=%zu) BLOCK: win %u/%zu " + "blocked_len=%zu", + ctx->tunnel.stream_id, len, + nghttp2_session_get_remote_window_size(ctx->h2), rwin, + nwritten)); *err = CURLE_AGAIN; + nwritten = -1; + goto out; + } + else if(should_close_session(ctx)) { + /* nghttp2 thinks this session is done. If the stream has not been + * closed, this is an error state for out transfer */ + if(ctx->tunnel.closed) { + *err = CURLE_SEND_ERROR; + nwritten = -1; + } + else { + DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session")); + *err = CURLE_HTTP2; + nwritten = -1; + } } out: - DEBUGF(LOG_CF(data, cf, "cf_send(len=%zu) -> %zd, %d ", - start_len, nwritten, *err)); + DEBUGF(LOG_CF(data, cf, "[h2sid=%d] cf_send(len=%zu) -> %zd, %d, " + "h2 windows %d-%d (stream-conn), " + "buffers %zu-%zu (stream-conn)", + ctx->tunnel.stream_id, len, nwritten, *err, + nghttp2_session_get_stream_remote_window_size( + ctx->h2, ctx->tunnel.stream_id), + nghttp2_session_get_remote_window_size(ctx->h2), + Curl_bufq_len(&ctx->tunnel.sendbuf), + Curl_bufq_len(&ctx->outbufq))); CF_DATA_RESTORE(cf, save); return nwritten; } +static bool proxy_h2_connisalive(struct Curl_cfilter *cf, + struct Curl_easy *data, + bool *input_pending) +{ + struct cf_h2_proxy_ctx *ctx = cf->ctx; + bool alive = TRUE; + + *input_pending = FALSE; + if(!cf->next || !cf->next->cft->is_alive(cf->next, data, input_pending)) + return FALSE; + + if(*input_pending) { + /* This happens before we've sent off a request and the connection is + not in use by any other transfer, there shouldn't be any data here, + only "protocol frames" */ + CURLcode result; + ssize_t nread = -1; + + *input_pending = FALSE; + nread = Curl_bufq_slurp(&ctx->inbufq, proxy_nw_in_reader, cf, &result); + if(nread != -1) { + if(proxy_h2_process_pending_input(cf, data, &result) < 0) + /* immediate error, considered dead */ + alive = FALSE; + else { + alive = !should_close_session(ctx); + } + } + else if(result != CURLE_AGAIN) { + /* the read failed so let's say this is dead anyway */ + alive = FALSE; + } + } + + return alive; +} + +static bool cf_h2_proxy_is_alive(struct Curl_cfilter *cf, + struct Curl_easy *data, + bool *input_pending) +{ + struct cf_h2_proxy_ctx *ctx = cf->ctx; + CURLcode result; + struct cf_call_data save; + + CF_DATA_SAVE(save, cf, data); + result = (ctx && ctx->h2 && proxy_h2_connisalive(cf, data, input_pending)); + DEBUGF(LOG_CF(data, cf, "conn alive -> %d, input_pending=%d", + result, *input_pending)); + CF_DATA_RESTORE(cf, save); + return result; +} + struct Curl_cftype Curl_cft_h2_proxy = { "H2-PROXY", CF_TYPE_IP_CONNECT, @@ -1323,7 +1450,7 @@ struct Curl_cftype Curl_cft_h2_proxy = { cf_h2_proxy_send, cf_h2_proxy_recv, Curl_cf_def_cntrl, - Curl_cf_def_conn_is_alive, + cf_h2_proxy_is_alive, Curl_cf_def_conn_keep_alive, Curl_cf_def_query, }; |