diff options
Diffstat (limited to 'Utilities/cmcurl/lib/vquic/curl_ngtcp2.c')
-rw-r--r-- | Utilities/cmcurl/lib/vquic/curl_ngtcp2.c | 429 |
1 files changed, 263 insertions, 166 deletions
diff --git a/Utilities/cmcurl/lib/vquic/curl_ngtcp2.c b/Utilities/cmcurl/lib/vquic/curl_ngtcp2.c index 7627940..a430aa1 100644 --- a/Utilities/cmcurl/lib/vquic/curl_ngtcp2.c +++ b/Utilities/cmcurl/lib/vquic/curl_ngtcp2.c @@ -33,7 +33,7 @@ #ifdef OPENSSL_IS_BORINGSSL #include <ngtcp2/ngtcp2_crypto_boringssl.h> #else -#include <ngtcp2/ngtcp2_crypto_openssl.h> +#include <ngtcp2/ngtcp2_crypto_quictls.h> #endif #include "vtls/openssl.h" #elif defined(USE_GNUTLS) @@ -165,17 +165,19 @@ struct cf_ngtcp2_ctx { }; /* How to access `call_data` from a cf_ngtcp2 filter */ +#undef CF_CTX_CALL_DATA #define CF_CTX_CALL_DATA(cf) \ ((struct cf_ngtcp2_ctx *)(cf)->ctx)->call_data /** * All about the H3 internals of a stream */ -struct stream_ctx { +struct h3_stream_ctx { int64_t id; /* HTTP/3 protocol identifier */ struct bufq sendbuf; /* h3 request body */ struct bufq recvbuf; /* h3 response body */ size_t sendbuf_len_in_flight; /* sendbuf amount "in flight" */ + size_t upload_blocked_len; /* the amount written last and EGAINed */ size_t recv_buf_nonflow; /* buffered bytes, not counting for flow control */ uint64_t error3; /* HTTP/3 stream error code */ curl_off_t upload_left; /* number of request bytes left to upload */ @@ -186,18 +188,18 @@ struct stream_ctx { bool send_closed; /* stream is local closed */ }; -#define H3_STREAM_CTX(d) ((struct stream_ctx *)(((d) && (d)->req.p.http)? \ - ((struct HTTP *)(d)->req.p.http)->h3_ctx \ - : NULL)) -#define H3_STREAM_LCTX(d) ((struct HTTP *)(d)->req.p.http)->h3_ctx -#define H3_STREAM_ID(d) (H3_STREAM_CTX(d)? \ - H3_STREAM_CTX(d)->id : -2) +#define H3_STREAM_CTX(d) ((struct h3_stream_ctx *)(((d) && (d)->req.p.http)? \ + ((struct HTTP *)(d)->req.p.http)->h3_ctx \ + : NULL)) +#define H3_STREAM_LCTX(d) ((struct HTTP *)(d)->req.p.http)->h3_ctx +#define H3_STREAM_ID(d) (H3_STREAM_CTX(d)? \ + H3_STREAM_CTX(d)->id : -2) static CURLcode h3_data_setup(struct Curl_cfilter *cf, struct Curl_easy *data) { struct cf_ngtcp2_ctx *ctx = cf->ctx; - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); if(!data || !data->req.p.http) { failf(data, "initialization failure, transfer not http initialized"); @@ -223,13 +225,13 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf, stream->recv_buf_nonflow = 0; H3_STREAM_LCTX(data) = stream; - DEBUGF(LOG_CF(data, cf, "data setup (easy %p)", (void *)data)); + DEBUGF(LOG_CF(data, cf, "data setup")); return CURLE_OK; } static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) { - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); (void)cf; if(stream) { @@ -246,10 +248,37 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) the maximum packet burst to MAX_PKT_BURST packets. */ #define MAX_PKT_BURST 10 -static CURLcode cf_process_ingress(struct Curl_cfilter *cf, - struct Curl_easy *data); -static CURLcode cf_flush_egress(struct Curl_cfilter *cf, - struct Curl_easy *data); +struct pkt_io_ctx { + struct Curl_cfilter *cf; + struct Curl_easy *data; + ngtcp2_tstamp ts; + size_t pkt_count; + ngtcp2_path_storage ps; +}; + +static ngtcp2_tstamp timestamp(void) +{ + struct curltime ct = Curl_now(); + return ct.tv_sec * NGTCP2_SECONDS + ct.tv_usec * NGTCP2_MICROSECONDS; +} + +static void pktx_init(struct pkt_io_ctx *pktx, + struct Curl_cfilter *cf, + struct Curl_easy *data) +{ + pktx->cf = cf; + pktx->data = data; + pktx->ts = timestamp(); + pktx->pkt_count = 0; + ngtcp2_path_storage_zero(&pktx->ps); +} + +static CURLcode cf_progress_ingress(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct pkt_io_ctx *pktx); +static CURLcode cf_progress_egress(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct pkt_io_ctx *pktx); static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id, uint64_t datalen, void *user_data, void *stream_user_data); @@ -261,12 +290,6 @@ static ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) return ctx->qconn; } -static ngtcp2_tstamp timestamp(void) -{ - struct curltime ct = Curl_now(); - return ct.tv_sec * NGTCP2_SECONDS + ct.tv_usec * NGTCP2_MICROSECONDS; -} - #ifdef DEBUG_NGTCP2 static void quic_printf(void *user_data, const char *fmt, ...) { @@ -300,7 +323,8 @@ static void qlog_callback(void *user_data, uint32_t flags, } static void quic_settings(struct cf_ngtcp2_ctx *ctx, - struct Curl_easy *data) + struct Curl_easy *data, + struct pkt_io_ctx *pktx) { ngtcp2_settings *s = &ctx->settings; ngtcp2_transport_params *t = &ctx->transport_params; @@ -314,7 +338,7 @@ static void quic_settings(struct cf_ngtcp2_ctx *ctx, #endif (void)data; - s->initial_ts = timestamp(); + s->initial_ts = pktx->ts; s->handshake_timeout = QUIC_HANDSHAKE_TIMEOUT; s->max_window = 100 * ctx->max_stream_window; s->max_stream_window = ctx->max_stream_window; @@ -327,7 +351,7 @@ static void quic_settings(struct cf_ngtcp2_ctx *ctx, t->initial_max_streams_uni = QUIC_MAX_STREAMS; t->max_idle_timeout = QUIC_IDLE_TIMEOUT; if(ctx->qlogfd != -1) { - s->qlog.write = qlog_callback; + s->qlog_write = qlog_callback; } } @@ -383,8 +407,8 @@ static CURLcode quic_ssl_ctx(SSL_CTX **pssl_ctx, goto out; } #else - if(ngtcp2_crypto_openssl_configure_client_context(ssl_ctx) != 0) { - failf(data, "ngtcp2_crypto_openssl_configure_client_context failed"); + if(ngtcp2_crypto_quictls_configure_client_context(ssl_ctx) != 0) { + failf(data, "ngtcp2_crypto_quictls_configure_client_context failed"); goto out; } #endif @@ -686,7 +710,7 @@ static void report_consumed_data(struct Curl_cfilter *cf, struct Curl_easy *data, size_t consumed) { - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); struct cf_ngtcp2_ctx *ctx = cf->ctx; if(!stream) @@ -902,13 +926,13 @@ static int cb_get_new_connection_id(ngtcp2_conn *tconn, ngtcp2_cid *cid, return 0; } -static int cb_recv_rx_key(ngtcp2_conn *tconn, ngtcp2_crypto_level level, +static int cb_recv_rx_key(ngtcp2_conn *tconn, ngtcp2_encryption_level level, void *user_data) { struct Curl_cfilter *cf = user_data; (void)tconn; - if(level != NGTCP2_CRYPTO_LEVEL_APPLICATION) { + if(level != NGTCP2_ENCRYPTION_LEVEL_1RTT) { return 0; } @@ -962,6 +986,61 @@ static ngtcp2_callbacks ng_callbacks = { NULL, /* early_data_rejected */ }; +/** + * Connection maintenance like timeouts on packet ACKs etc. are done by us, not + * the OS like for TCP. POLL events on the socket therefore are not + * sufficient. + * ngtcp2 tells us when it wants to be invoked again. We handle that via + * the `Curl_expire()` mechanisms. + */ +static CURLcode check_and_set_expiry(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct pkt_io_ctx *pktx) +{ + struct cf_ngtcp2_ctx *ctx = cf->ctx; + struct pkt_io_ctx local_pktx; + ngtcp2_tstamp expiry; + + if(!pktx) { + pktx_init(&local_pktx, cf, data); + pktx = &local_pktx; + } + else { + pktx->ts = timestamp(); + } + + expiry = ngtcp2_conn_get_expiry(ctx->qconn); + if(expiry != UINT64_MAX) { + if(expiry <= pktx->ts) { + CURLcode result; + int rv = ngtcp2_conn_handle_expiry(ctx->qconn, pktx->ts); + if(rv) { + failf(data, "ngtcp2_conn_handle_expiry returned error: %s", + ngtcp2_strerror(rv)); + ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0); + return CURLE_SEND_ERROR; + } + result = cf_progress_ingress(cf, data, pktx); + if(result) + return result; + result = cf_progress_egress(cf, data, pktx); + if(result) + return result; + /* ask again, things might have changed */ + expiry = ngtcp2_conn_get_expiry(ctx->qconn); + } + + if(expiry > pktx->ts) { + ngtcp2_duration timeout = expiry - pktx->ts; + if(timeout % NGTCP2_MILLISECONDS) { + timeout += NGTCP2_MILLISECONDS; + } + Curl_expire(data, timeout / NGTCP2_MILLISECONDS, EXPIRE_QUIC); + } + } + return CURLE_OK; +} + static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf, struct Curl_easy *data, curl_socket_t *socks) @@ -969,7 +1048,7 @@ static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf, struct cf_ngtcp2_ctx *ctx = cf->ctx; struct SingleRequest *k = &data->req; int rv = GETSOCK_BLANK; - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); struct cf_call_data save; CF_DATA_SAVE(save, cf, data); @@ -991,15 +1070,15 @@ static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf, return rv; } -static void drain_stream(struct Curl_cfilter *cf, - struct Curl_easy *data) +static void h3_drain_stream(struct Curl_cfilter *cf, + struct Curl_easy *data) { - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); unsigned char bits; (void)cf; bits = CURL_CSELECT_IN; - if(stream && !stream->send_closed && stream->upload_left) + if(stream && stream->upload_left && !stream->send_closed) bits |= CURL_CSELECT_OUT; if(data->state.dselect_bits != bits) { data->state.dselect_bits = bits; @@ -1013,7 +1092,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id, { struct Curl_cfilter *cf = user_data; struct Curl_easy *data = stream_user_data; - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); (void)conn; (void)stream_id; (void)app_error_code; @@ -1031,7 +1110,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id, stream->reset = TRUE; stream->send_closed = TRUE; } - drain_stream(cf, data); + h3_drain_stream(cf, data); return 0; } @@ -1045,7 +1124,7 @@ static CURLcode write_resp_raw(struct Curl_cfilter *cf, const void *mem, size_t memlen, bool flow) { - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); CURLcode result = CURLE_OK; ssize_t nwritten; @@ -1085,7 +1164,7 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id, (void)stream3_id; result = write_resp_raw(cf, data, buf, buflen, TRUE); - drain_stream(cf, data); + h3_drain_stream(cf, data); return result? -1 : 0; } @@ -1110,7 +1189,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t stream_id, { struct Curl_cfilter *cf = user_data; struct Curl_easy *data = stream_user_data; - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); CURLcode result = CURLE_OK; (void)conn; (void)stream_id; @@ -1130,7 +1209,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t stream_id, if(stream->status_code / 100 != 1) { stream->resp_hds_complete = TRUE; } - drain_stream(cf, data); + h3_drain_stream(cf, data); return 0; } @@ -1143,7 +1222,7 @@ static int cb_h3_recv_header(nghttp3_conn *conn, int64_t stream_id, nghttp3_vec h3name = nghttp3_rcbuf_get_buf(name); nghttp3_vec h3val = nghttp3_rcbuf_get_buf(value); struct Curl_easy *data = stream_user_data; - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); CURLcode result = CURLE_OK; (void)conn; (void)stream_id; @@ -1207,7 +1286,8 @@ static int cb_h3_stop_sending(nghttp3_conn *conn, int64_t stream_id, (void)conn; (void)stream_user_data; - rv = ngtcp2_conn_shutdown_stream_read(ctx->qconn, stream_id, app_error_code); + rv = ngtcp2_conn_shutdown_stream_read(ctx->qconn, 0, stream_id, + app_error_code); if(rv && rv != NGTCP2_ERR_STREAM_NOT_FOUND) { return NGTCP2_ERR_CALLBACK_FAILURE; } @@ -1225,7 +1305,7 @@ static int cb_h3_reset_stream(nghttp3_conn *conn, int64_t stream_id, (void)conn; (void)data; - rv = ngtcp2_conn_shutdown_stream_write(ctx->qconn, stream_id, + rv = ngtcp2_conn_shutdown_stream_write(ctx->qconn, 0, stream_id, app_error_code); DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] reset -> %d", stream_id, rv)); if(rv && rv != NGTCP2_ERR_STREAM_NOT_FOUND) { @@ -1249,7 +1329,8 @@ static nghttp3_callbacks ngh3_callbacks = { cb_h3_stop_sending, NULL, /* end_stream */ cb_h3_reset_stream, - NULL /* shutdown */ + NULL, /* shutdown */ + NULL /* recv_settings */ }; static int init_ngh3_conn(struct Curl_cfilter *cf) @@ -1314,7 +1395,7 @@ fail: static ssize_t recv_closed_stream(struct Curl_cfilter *cf, struct Curl_easy *data, - struct stream_ctx *stream, + struct h3_stream_ctx *stream, CURLcode *err) { ssize_t nread = -1; @@ -1364,9 +1445,10 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, char *buf, size_t len, CURLcode *err) { struct cf_ngtcp2_ctx *ctx = cf->ctx; - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); ssize_t nread = -1; struct cf_call_data save; + struct pkt_io_ctx pktx; (void)ctx; @@ -1377,6 +1459,8 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, DEBUGASSERT(ctx->h3conn); *err = CURLE_OK; + pktx_init(&pktx, cf, data); + if(!stream) { *err = CURLE_RECV_ERROR; goto out; @@ -1392,7 +1476,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, report_consumed_data(cf, data, nread); } - if(cf_process_ingress(cf, data)) { + if(cf_progress_ingress(cf, data, &pktx)) { *err = CURLE_RECV_ERROR; nread = -1; goto out; @@ -1410,7 +1494,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, } if(nread > 0) { - drain_stream(cf, data); + h3_drain_stream(cf, data); } else { if(stream->closed) { @@ -1422,10 +1506,17 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, } out: - if(cf_flush_egress(cf, data)) { + if(cf_progress_egress(cf, data, &pktx)) { *err = CURLE_SEND_ERROR; nread = -1; } + else { + CURLcode result2 = check_and_set_expiry(cf, data, &pktx); + if(result2) { + *err = result2; + nread = -1; + } + } DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv(len=%zu) -> %zd, %d", stream? stream->id : -1, len, nread, *err)); CF_DATA_RESTORE(cf, save); @@ -1438,7 +1529,7 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id, { struct Curl_cfilter *cf = user_data; struct Curl_easy *data = stream_user_data; - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); size_t skiplen; (void)cf; @@ -1454,10 +1545,8 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id, Curl_bufq_skip(&stream->sendbuf, skiplen); stream->sendbuf_len_in_flight -= skiplen; - /* `sendbuf` *might* now have more room. If so, resume this - * possibly paused stream. And also tell our transfer engine that - * it may continue KEEP_SEND if told to PAUSE. */ - if(!Curl_bufq_is_full(&stream->sendbuf)) { + /* Everything ACKed, we resume upload processing */ + if(!stream->sendbuf_len_in_flight) { int rv = nghttp3_conn_resume_stream(conn, stream_id); if(rv) { return NGTCP2_ERR_CALLBACK_FAILURE; @@ -1465,7 +1554,7 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id, if((data->req.keepon & KEEP_SEND_HOLD) && (data->req.keepon & KEEP_SEND)) { data->req.keepon &= ~KEEP_SEND_HOLD; - drain_stream(cf, data); + h3_drain_stream(cf, data); DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] unpausing acks", stream_id)); } @@ -1481,7 +1570,7 @@ cb_h3_read_req_body(nghttp3_conn *conn, int64_t stream_id, { struct Curl_cfilter *cf = user_data; struct Curl_easy *data = stream_user_data; - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); ssize_t nwritten = 0; size_t nvecs = 0; (void)cf; @@ -1530,8 +1619,10 @@ cb_h3_read_req_body(nghttp3_conn *conn, int64_t stream_id, } DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] read req body -> " - "%d vecs%s with %zu (buffered=%zu, left=%zd)", stream->id, - (int)nvecs, *pflags == NGHTTP3_DATA_FLAG_EOF?" EOF":"", + "%d vecs%s with %zu (buffered=%zu, left=%" + CURL_FORMAT_CURL_OFF_T ")", + stream->id, (int)nvecs, + *pflags == NGHTTP3_DATA_FLAG_EOF?" EOF":"", nwritten, Curl_bufq_len(&stream->sendbuf), stream->upload_left)); return (nghttp3_ssize)nvecs; @@ -1547,7 +1638,7 @@ static ssize_t h3_stream_open(struct Curl_cfilter *cf, CURLcode *err) { struct cf_ngtcp2_ctx *ctx = cf->ctx; - struct stream_ctx *stream = NULL; + struct h3_stream_ctx *stream = NULL; struct h1_req_parser h1; struct dynhds h2_headers; size_t nheader; @@ -1614,16 +1705,19 @@ static ssize_t h3_stream_open(struct Curl_cfilter *cf, else /* data sending without specifying the data amount up front */ stream->upload_left = -1; /* unknown */ - reader.read_data = cb_h3_read_req_body; - preader = &reader; break; default: /* there is not request body */ stream->upload_left = 0; /* no request body */ - preader = NULL; break; } + stream->send_closed = (stream->upload_left == 0); + if(!stream->send_closed) { + reader.read_data = cb_h3_read_req_body; + preader = &reader; + } + rc = nghttp3_conn_submit_request(ctx->h3conn, stream->id, nva, nheader, preader, data); if(rc) { @@ -1642,8 +1736,7 @@ static ssize_t h3_stream_open(struct Curl_cfilter *cf, goto out; } - infof(data, "Using HTTP/3 Stream ID: %" PRId64 " (easy handle %p)", - stream->id, (void *)data); + infof(data, "Using HTTP/3 Stream ID: %" PRId64, stream->id); DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] opened for %s", stream->id, data->state.url)); @@ -1658,20 +1751,23 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data, const void *buf, size_t len, CURLcode *err) { struct cf_ngtcp2_ctx *ctx = cf->ctx; - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); ssize_t sent = 0; struct cf_call_data save; + struct pkt_io_ctx pktx; + CURLcode result; CF_DATA_SAVE(save, cf, data); DEBUGASSERT(cf->connected); DEBUGASSERT(ctx->qconn); DEBUGASSERT(ctx->h3conn); + pktx_init(&pktx, cf, data); *err = CURLE_OK; - if(stream && stream->closed) { - *err = CURLE_HTTP3; + result = cf_progress_ingress(cf, data, &pktx); + if(result) { + *err = result; sent = -1; - goto out; } if(!stream || stream->id < 0) { @@ -1681,32 +1777,66 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data, goto out; } } + else if(stream->upload_blocked_len) { + /* the data in `buf` has alread been submitted or added to the + * buffers, but have been EAGAINed on the last invocation. */ + DEBUGASSERT(len >= stream->upload_blocked_len); + if(len < stream->upload_blocked_len) { + /* Did we get called again with a smaller `len`? This should not + * happen. We are not prepared to handle that. */ + failf(data, "HTTP/3 send again with decreased length"); + *err = CURLE_HTTP3; + sent = -1; + goto out; + } + sent = (ssize_t)stream->upload_blocked_len; + stream->upload_blocked_len = 0; + } + else if(stream->closed) { + *err = CURLE_HTTP3; + sent = -1; + goto out; + } else { sent = Curl_bufq_write(&stream->sendbuf, buf, len, err); DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send, add to " "sendbuf(len=%zu) -> %zd, %d", stream->id, len, sent, *err)); if(sent < 0) { - if(*err == CURLE_AGAIN) { - /* Can't add more to the send buf, needs to drain first. - * Pause the sending to avoid a busy loop. */ - data->req.keepon |= KEEP_SEND_HOLD; - DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] pause send", - stream->id)); - } goto out; } (void)nghttp3_conn_resume_stream(ctx->h3conn, stream->id); } - if(cf_flush_egress(cf, data)) { - *err = CURLE_SEND_ERROR; + result = cf_progress_egress(cf, data, &pktx); + if(result) { + *err = result; sent = -1; - goto out; + } + + if(stream && sent > 0 && stream->sendbuf_len_in_flight) { + /* We have unacknowledged DATA and cannot report success to our + * caller. Instead we EAGAIN and remember how much we have already + * "written" into our various internal connection buffers. + * We put the stream upload on HOLD, until this gets ACKed. */ + stream->upload_blocked_len = sent; + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send(len=%zu), " + "%zu bytes in flight -> EGAIN", stream->id, len, + stream->sendbuf_len_in_flight)); + *err = CURLE_AGAIN; + sent = -1; + data->req.keepon |= KEEP_SEND_HOLD; } out: + result = check_and_set_expiry(cf, data, &pktx); + if(result) { + *err = result; + sent = -1; + } + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send(len=%zu) -> %zd, %d", + stream? stream->id : -1, len, sent, *err)); CF_DATA_RESTORE(cf, save); return sent; } @@ -1763,34 +1893,27 @@ static CURLcode qng_verify_peer(struct Curl_cfilter *cf, return result; } -struct recv_ctx { - struct Curl_cfilter *cf; - struct Curl_easy *data; - ngtcp2_tstamp ts; - size_t pkt_count; -}; - static CURLcode recv_pkt(const unsigned char *pkt, size_t pktlen, struct sockaddr_storage *remote_addr, socklen_t remote_addrlen, int ecn, void *userp) { - struct recv_ctx *r = userp; - struct cf_ngtcp2_ctx *ctx = r->cf->ctx; + struct pkt_io_ctx *pktx = userp; + struct cf_ngtcp2_ctx *ctx = pktx->cf->ctx; ngtcp2_pkt_info pi; ngtcp2_path path; int rv; - ++r->pkt_count; + ++pktx->pkt_count; ngtcp2_addr_init(&path.local, (struct sockaddr *)&ctx->q.local_addr, ctx->q.local_addrlen); ngtcp2_addr_init(&path.remote, (struct sockaddr *)remote_addr, remote_addrlen); pi.ecn = (uint32_t)ecn; - rv = ngtcp2_conn_read_pkt(ctx->qconn, &path, &pi, pkt, pktlen, r->ts); + rv = ngtcp2_conn_read_pkt(ctx->qconn, &path, &pi, pkt, pktlen, pktx->ts); if(rv) { - DEBUGF(LOG_CF(r->data, r->cf, "ingress, read_pkt -> %s", + DEBUGF(LOG_CF(pktx->data, pktx->cf, "ingress, read_pkt -> %s", ngtcp2_strerror(rv))); if(!ctx->last_error.error_code) { if(rv == NGTCP2_ERR_CRYPTO) { @@ -1813,41 +1936,40 @@ static CURLcode recv_pkt(const unsigned char *pkt, size_t pktlen, return CURLE_OK; } -static CURLcode cf_process_ingress(struct Curl_cfilter *cf, - struct Curl_easy *data) +static CURLcode cf_progress_ingress(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct pkt_io_ctx *pktx) { struct cf_ngtcp2_ctx *ctx = cf->ctx; - struct recv_ctx rctx; + struct pkt_io_ctx local_pktx; size_t pkts_chunk = 128, i; size_t pkts_max = 10 * pkts_chunk; - CURLcode result; + CURLcode result = CURLE_OK; - rctx.cf = cf; - rctx.data = data; - rctx.ts = timestamp(); - rctx.pkt_count = 0; + if(!pktx) { + pktx_init(&local_pktx, cf, data); + pktx = &local_pktx; + } + else { + pktx->ts = timestamp(); + } for(i = 0; i < pkts_max; i += pkts_chunk) { - rctx.pkt_count = 0; + pktx->pkt_count = 0; result = vquic_recv_packets(cf, data, &ctx->q, pkts_chunk, - recv_pkt, &rctx); + recv_pkt, pktx); if(result) /* error */ break; - if(rctx.pkt_count < pkts_chunk) /* got less than we could */ + if(pktx->pkt_count < pkts_chunk) /* got less than we could */ break; /* give egress a chance before we receive more */ - result = cf_flush_egress(cf, data); + result = cf_progress_egress(cf, data, pktx); + if(result) /* error */ + break; } return result; } -struct read_ctx { - struct Curl_cfilter *cf; - struct Curl_easy *data; - ngtcp2_tstamp ts; - ngtcp2_path_storage *ps; -}; - /** * Read a network packet to send from ngtcp2 into `buf`. * Return number of bytes written or -1 with *err set. @@ -1856,7 +1978,7 @@ static ssize_t read_pkt_to_send(void *userp, unsigned char *buf, size_t buflen, CURLcode *err) { - struct read_ctx *x = userp; + struct pkt_io_ctx *x = userp; struct cf_ngtcp2_ctx *ctx = x->cf->ctx; nghttp3_vec vec[16]; nghttp3_ssize veccnt; @@ -1896,7 +2018,7 @@ static ssize_t read_pkt_to_send(void *userp, flags = NGTCP2_WRITE_STREAM_FLAG_MORE | (fin ? NGTCP2_WRITE_STREAM_FLAG_FIN : 0); - n = ngtcp2_conn_writev_stream(ctx->qconn, x->ps? &x->ps->path : NULL, + n = ngtcp2_conn_writev_stream(ctx->qconn, &x->ps.path, NULL, buf, buflen, &ndatalen, flags, stream_id, (const ngtcp2_vec *)vec, veccnt, x->ts); @@ -1955,28 +2077,25 @@ out: return nwritten; } -static CURLcode cf_flush_egress(struct Curl_cfilter *cf, - struct Curl_easy *data) +static CURLcode cf_progress_egress(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct pkt_io_ctx *pktx) { struct cf_ngtcp2_ctx *ctx = cf->ctx; - int rv; ssize_t nread; size_t max_payload_size, path_max_payload_size, max_pktcnt; size_t pktcnt = 0; size_t gsolen = 0; /* this disables gso until we have a clue */ - ngtcp2_path_storage ps; - ngtcp2_tstamp ts = timestamp(); - ngtcp2_tstamp expiry; - ngtcp2_duration timeout; CURLcode curlcode; - struct read_ctx readx; + struct pkt_io_ctx local_pktx; - rv = ngtcp2_conn_handle_expiry(ctx->qconn, ts); - if(rv) { - failf(data, "ngtcp2_conn_handle_expiry returned error: %s", - ngtcp2_strerror(rv)); - ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0); - return CURLE_SEND_ERROR; + if(!pktx) { + pktx_init(&local_pktx, cf, data); + pktx = &local_pktx; + } + else { + pktx->ts = timestamp(); + ngtcp2_path_storage_zero(&pktx->ps); } curlcode = vquic_flush(cf, data, &ctx->q); @@ -1988,8 +2107,6 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf, return curlcode; } - ngtcp2_path_storage_zero(&ps); - /* In UDP, there is a maximum theoretical packet paload length and * a minimum payload length that is "guarantueed" to work. * To detect if this minimum payload can be increased, ngtcp2 sends @@ -2008,15 +2125,10 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf, max_pktcnt = CURLMIN(MAX_PKT_BURST, ctx->q.sendbuf.chunk_size / max_payload_size); - readx.cf = cf; - readx.data = data; - readx.ts = ts; - readx.ps = &ps; - for(;;) { /* add the next packet to send, if any, to our buffer */ nread = Curl_bufq_sipn(&ctx->q.sendbuf, max_payload_size, - read_pkt_to_send, &readx, &curlcode); + read_pkt_to_send, pktx, &curlcode); /* DEBUGF(LOG_CF(data, cf, "sip packet(maxlen=%zu) -> %zd, %d", max_payload_size, nread, curlcode)); */ if(nread < 0) { @@ -2076,21 +2188,6 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf, } out: - /* non-errored exit. check when we should run again. */ - expiry = ngtcp2_conn_get_expiry(ctx->qconn); - if(expiry != UINT64_MAX) { - if(expiry <= ts) { - timeout = 0; - } - else { - timeout = expiry - ts; - if(timeout % NGTCP2_MILLISECONDS) { - timeout += NGTCP2_MILLISECONDS; - } - } - Curl_expire(data, timeout / NGTCP2_MILLISECONDS, EXPIRE_QUIC); - } - return CURLE_OK; } @@ -2101,7 +2198,7 @@ out: static bool cf_ngtcp2_data_pending(struct Curl_cfilter *cf, const struct Curl_easy *data) { - const struct stream_ctx *stream = H3_STREAM_CTX(data); + const struct h3_stream_ctx *stream = H3_STREAM_CTX(data); (void)cf; return stream && !Curl_bufq_is_empty(&stream->recvbuf); } @@ -2113,7 +2210,7 @@ static CURLcode h3_data_pause(struct Curl_cfilter *cf, /* TODO: there seems right now no API in ngtcp2 to shrink/enlarge * the streams windows. As we do in HTTP/2. */ if(!pause) { - drain_stream(cf, data); + h3_drain_stream(cf, data); Curl_expire(data, 0, EXPIRE_RUN_NOW); } return CURLE_OK; @@ -2141,7 +2238,7 @@ static CURLcode cf_ngtcp2_data_event(struct Curl_cfilter *cf, break; } case CF_CTRL_DATA_DONE_SEND: { - struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h3_stream_ctx *stream = H3_STREAM_CTX(data); if(stream && !stream->send_closed) { stream->send_closed = TRUE; stream->upload_left = Curl_bufq_len(&stream->sendbuf); @@ -2150,11 +2247,7 @@ static CURLcode cf_ngtcp2_data_event(struct Curl_cfilter *cf, break; } case CF_CTRL_DATA_IDLE: - if(timestamp() >= ngtcp2_conn_get_expiry(ctx->qconn)) { - if(cf_flush_egress(cf, data)) { - result = CURLE_SEND_ERROR; - } - } + result = check_and_set_expiry(cf, data, NULL); break; default: break; @@ -2250,7 +2343,8 @@ static void cf_ngtcp2_destroy(struct Curl_cfilter *cf, struct Curl_easy *data) * Might be called twice for happy eyeballs. */ static CURLcode cf_connect_start(struct Curl_cfilter *cf, - struct Curl_easy *data) + struct Curl_easy *data, + struct pkt_io_ctx *pktx) { struct cf_ngtcp2_ctx *ctx = cf->ctx; int rc; @@ -2294,7 +2388,7 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf, (void)Curl_qlogdir(data, ctx->scid.data, NGTCP2_MAX_CIDLEN, &qfd); ctx->qlogfd = qfd; /* -1 if failure above */ - quic_settings(ctx, data); + quic_settings(ctx, data, pktx); result = vquic_ctx_init(&ctx->q); if(result) @@ -2344,6 +2438,7 @@ static CURLcode cf_ngtcp2_connect(struct Curl_cfilter *cf, CURLcode result = CURLE_OK; struct cf_call_data save; struct curltime now; + struct pkt_io_ctx pktx; if(cf->connected) { *done = TRUE; @@ -2359,6 +2454,7 @@ static CURLcode cf_ngtcp2_connect(struct Curl_cfilter *cf, *done = FALSE; now = Curl_now(); + pktx_init(&pktx, cf, data); CF_DATA_SAVE(save, cf, data); @@ -2370,19 +2466,19 @@ static CURLcode cf_ngtcp2_connect(struct Curl_cfilter *cf, if(!ctx->qconn) { ctx->started_at = now; - result = cf_connect_start(cf, data); + result = cf_connect_start(cf, data, &pktx); if(result) goto out; - result = cf_flush_egress(cf, data); + result = cf_progress_egress(cf, data, &pktx); /* we do not expect to be able to recv anything yet */ goto out; } - result = cf_process_ingress(cf, data); + result = cf_progress_ingress(cf, data, &pktx); if(result) goto out; - result = cf_flush_egress(cf, data); + result = cf_progress_egress(cf, data, &pktx); if(result) goto out; @@ -2402,7 +2498,7 @@ static CURLcode cf_ngtcp2_connect(struct Curl_cfilter *cf, out: if(result == CURLE_RECV_ERROR && ctx->qconn && - ngtcp2_conn_is_in_draining_period(ctx->qconn)) { + ngtcp2_conn_in_draining_period(ctx->qconn)) { /* When a QUIC server instance is shutting down, it may send us a * CONNECTION_CLOSE right away. Our connection then enters the DRAINING * state. @@ -2439,6 +2535,9 @@ out: r_ip, r_port, curl_easy_strerror(result)); } #endif + if(!result && ctx->qconn) { + result = check_and_set_expiry(cf, data, &pktx); + } DEBUGF(LOG_CF(data, cf, "connect -> %d, done=%d", result, *done)); CF_DATA_RESTORE(cf, save); return result; @@ -2510,13 +2609,11 @@ static bool cf_ngtcp2_conn_is_alive(struct Curl_cfilter *cf, not in use by any other transfer, there shouldn't be any data here, only "protocol frames" */ *input_pending = FALSE; - Curl_attach_connection(data, cf->conn); - if(cf_process_ingress(cf, data)) + if(cf_progress_ingress(cf, data, NULL)) alive = FALSE; else { alive = TRUE; } - Curl_detach_connection(data); } return alive; |