summaryrefslogtreecommitdiffstats
path: root/Utilities/cmcurl/lib/vquic/curl_ngtcp2.c
diff options
context:
space:
mode:
Diffstat (limited to 'Utilities/cmcurl/lib/vquic/curl_ngtcp2.c')
-rw-r--r--Utilities/cmcurl/lib/vquic/curl_ngtcp2.c429
1 files changed, 263 insertions, 166 deletions
diff --git a/Utilities/cmcurl/lib/vquic/curl_ngtcp2.c b/Utilities/cmcurl/lib/vquic/curl_ngtcp2.c
index 7627940..a430aa1 100644
--- a/Utilities/cmcurl/lib/vquic/curl_ngtcp2.c
+++ b/Utilities/cmcurl/lib/vquic/curl_ngtcp2.c
@@ -33,7 +33,7 @@
#ifdef OPENSSL_IS_BORINGSSL
#include <ngtcp2/ngtcp2_crypto_boringssl.h>
#else
-#include <ngtcp2/ngtcp2_crypto_openssl.h>
+#include <ngtcp2/ngtcp2_crypto_quictls.h>
#endif
#include "vtls/openssl.h"
#elif defined(USE_GNUTLS)
@@ -165,17 +165,19 @@ struct cf_ngtcp2_ctx {
};
/* How to access `call_data` from a cf_ngtcp2 filter */
+#undef CF_CTX_CALL_DATA
#define CF_CTX_CALL_DATA(cf) \
((struct cf_ngtcp2_ctx *)(cf)->ctx)->call_data
/**
* All about the H3 internals of a stream
*/
-struct stream_ctx {
+struct h3_stream_ctx {
int64_t id; /* HTTP/3 protocol identifier */
struct bufq sendbuf; /* h3 request body */
struct bufq recvbuf; /* h3 response body */
size_t sendbuf_len_in_flight; /* sendbuf amount "in flight" */
+ size_t upload_blocked_len; /* the amount written last and EGAINed */
size_t recv_buf_nonflow; /* buffered bytes, not counting for flow control */
uint64_t error3; /* HTTP/3 stream error code */
curl_off_t upload_left; /* number of request bytes left to upload */
@@ -186,18 +188,18 @@ struct stream_ctx {
bool send_closed; /* stream is local closed */
};
-#define H3_STREAM_CTX(d) ((struct stream_ctx *)(((d) && (d)->req.p.http)? \
- ((struct HTTP *)(d)->req.p.http)->h3_ctx \
- : NULL))
-#define H3_STREAM_LCTX(d) ((struct HTTP *)(d)->req.p.http)->h3_ctx
-#define H3_STREAM_ID(d) (H3_STREAM_CTX(d)? \
- H3_STREAM_CTX(d)->id : -2)
+#define H3_STREAM_CTX(d) ((struct h3_stream_ctx *)(((d) && (d)->req.p.http)? \
+ ((struct HTTP *)(d)->req.p.http)->h3_ctx \
+ : NULL))
+#define H3_STREAM_LCTX(d) ((struct HTTP *)(d)->req.p.http)->h3_ctx
+#define H3_STREAM_ID(d) (H3_STREAM_CTX(d)? \
+ H3_STREAM_CTX(d)->id : -2)
static CURLcode h3_data_setup(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
if(!data || !data->req.p.http) {
failf(data, "initialization failure, transfer not http initialized");
@@ -223,13 +225,13 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf,
stream->recv_buf_nonflow = 0;
H3_STREAM_LCTX(data) = stream;
- DEBUGF(LOG_CF(data, cf, "data setup (easy %p)", (void *)data));
+ DEBUGF(LOG_CF(data, cf, "data setup"));
return CURLE_OK;
}
static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
{
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
(void)cf;
if(stream) {
@@ -246,10 +248,37 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
the maximum packet burst to MAX_PKT_BURST packets. */
#define MAX_PKT_BURST 10
-static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
- struct Curl_easy *data);
-static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
- struct Curl_easy *data);
+struct pkt_io_ctx {
+ struct Curl_cfilter *cf;
+ struct Curl_easy *data;
+ ngtcp2_tstamp ts;
+ size_t pkt_count;
+ ngtcp2_path_storage ps;
+};
+
+static ngtcp2_tstamp timestamp(void)
+{
+ struct curltime ct = Curl_now();
+ return ct.tv_sec * NGTCP2_SECONDS + ct.tv_usec * NGTCP2_MICROSECONDS;
+}
+
+static void pktx_init(struct pkt_io_ctx *pktx,
+ struct Curl_cfilter *cf,
+ struct Curl_easy *data)
+{
+ pktx->cf = cf;
+ pktx->data = data;
+ pktx->ts = timestamp();
+ pktx->pkt_count = 0;
+ ngtcp2_path_storage_zero(&pktx->ps);
+}
+
+static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct pkt_io_ctx *pktx);
+static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct pkt_io_ctx *pktx);
static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
uint64_t datalen, void *user_data,
void *stream_user_data);
@@ -261,12 +290,6 @@ static ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref)
return ctx->qconn;
}
-static ngtcp2_tstamp timestamp(void)
-{
- struct curltime ct = Curl_now();
- return ct.tv_sec * NGTCP2_SECONDS + ct.tv_usec * NGTCP2_MICROSECONDS;
-}
-
#ifdef DEBUG_NGTCP2
static void quic_printf(void *user_data, const char *fmt, ...)
{
@@ -300,7 +323,8 @@ static void qlog_callback(void *user_data, uint32_t flags,
}
static void quic_settings(struct cf_ngtcp2_ctx *ctx,
- struct Curl_easy *data)
+ struct Curl_easy *data,
+ struct pkt_io_ctx *pktx)
{
ngtcp2_settings *s = &ctx->settings;
ngtcp2_transport_params *t = &ctx->transport_params;
@@ -314,7 +338,7 @@ static void quic_settings(struct cf_ngtcp2_ctx *ctx,
#endif
(void)data;
- s->initial_ts = timestamp();
+ s->initial_ts = pktx->ts;
s->handshake_timeout = QUIC_HANDSHAKE_TIMEOUT;
s->max_window = 100 * ctx->max_stream_window;
s->max_stream_window = ctx->max_stream_window;
@@ -327,7 +351,7 @@ static void quic_settings(struct cf_ngtcp2_ctx *ctx,
t->initial_max_streams_uni = QUIC_MAX_STREAMS;
t->max_idle_timeout = QUIC_IDLE_TIMEOUT;
if(ctx->qlogfd != -1) {
- s->qlog.write = qlog_callback;
+ s->qlog_write = qlog_callback;
}
}
@@ -383,8 +407,8 @@ static CURLcode quic_ssl_ctx(SSL_CTX **pssl_ctx,
goto out;
}
#else
- if(ngtcp2_crypto_openssl_configure_client_context(ssl_ctx) != 0) {
- failf(data, "ngtcp2_crypto_openssl_configure_client_context failed");
+ if(ngtcp2_crypto_quictls_configure_client_context(ssl_ctx) != 0) {
+ failf(data, "ngtcp2_crypto_quictls_configure_client_context failed");
goto out;
}
#endif
@@ -686,7 +710,7 @@ static void report_consumed_data(struct Curl_cfilter *cf,
struct Curl_easy *data,
size_t consumed)
{
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_ngtcp2_ctx *ctx = cf->ctx;
if(!stream)
@@ -902,13 +926,13 @@ static int cb_get_new_connection_id(ngtcp2_conn *tconn, ngtcp2_cid *cid,
return 0;
}
-static int cb_recv_rx_key(ngtcp2_conn *tconn, ngtcp2_crypto_level level,
+static int cb_recv_rx_key(ngtcp2_conn *tconn, ngtcp2_encryption_level level,
void *user_data)
{
struct Curl_cfilter *cf = user_data;
(void)tconn;
- if(level != NGTCP2_CRYPTO_LEVEL_APPLICATION) {
+ if(level != NGTCP2_ENCRYPTION_LEVEL_1RTT) {
return 0;
}
@@ -962,6 +986,61 @@ static ngtcp2_callbacks ng_callbacks = {
NULL, /* early_data_rejected */
};
+/**
+ * Connection maintenance like timeouts on packet ACKs etc. are done by us, not
+ * the OS like for TCP. POLL events on the socket therefore are not
+ * sufficient.
+ * ngtcp2 tells us when it wants to be invoked again. We handle that via
+ * the `Curl_expire()` mechanisms.
+ */
+static CURLcode check_and_set_expiry(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct pkt_io_ctx *pktx)
+{
+ struct cf_ngtcp2_ctx *ctx = cf->ctx;
+ struct pkt_io_ctx local_pktx;
+ ngtcp2_tstamp expiry;
+
+ if(!pktx) {
+ pktx_init(&local_pktx, cf, data);
+ pktx = &local_pktx;
+ }
+ else {
+ pktx->ts = timestamp();
+ }
+
+ expiry = ngtcp2_conn_get_expiry(ctx->qconn);
+ if(expiry != UINT64_MAX) {
+ if(expiry <= pktx->ts) {
+ CURLcode result;
+ int rv = ngtcp2_conn_handle_expiry(ctx->qconn, pktx->ts);
+ if(rv) {
+ failf(data, "ngtcp2_conn_handle_expiry returned error: %s",
+ ngtcp2_strerror(rv));
+ ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0);
+ return CURLE_SEND_ERROR;
+ }
+ result = cf_progress_ingress(cf, data, pktx);
+ if(result)
+ return result;
+ result = cf_progress_egress(cf, data, pktx);
+ if(result)
+ return result;
+ /* ask again, things might have changed */
+ expiry = ngtcp2_conn_get_expiry(ctx->qconn);
+ }
+
+ if(expiry > pktx->ts) {
+ ngtcp2_duration timeout = expiry - pktx->ts;
+ if(timeout % NGTCP2_MILLISECONDS) {
+ timeout += NGTCP2_MILLISECONDS;
+ }
+ Curl_expire(data, timeout / NGTCP2_MILLISECONDS, EXPIRE_QUIC);
+ }
+ }
+ return CURLE_OK;
+}
+
static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf,
struct Curl_easy *data,
curl_socket_t *socks)
@@ -969,7 +1048,7 @@ static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf,
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct SingleRequest *k = &data->req;
int rv = GETSOCK_BLANK;
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_call_data save;
CF_DATA_SAVE(save, cf, data);
@@ -991,15 +1070,15 @@ static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf,
return rv;
}
-static void drain_stream(struct Curl_cfilter *cf,
- struct Curl_easy *data)
+static void h3_drain_stream(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
{
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
unsigned char bits;
(void)cf;
bits = CURL_CSELECT_IN;
- if(stream && !stream->send_closed && stream->upload_left)
+ if(stream && stream->upload_left && !stream->send_closed)
bits |= CURL_CSELECT_OUT;
if(data->state.dselect_bits != bits) {
data->state.dselect_bits = bits;
@@ -1013,7 +1092,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
{
struct Curl_cfilter *cf = user_data;
struct Curl_easy *data = stream_user_data;
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
(void)conn;
(void)stream_id;
(void)app_error_code;
@@ -1031,7 +1110,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
stream->reset = TRUE;
stream->send_closed = TRUE;
}
- drain_stream(cf, data);
+ h3_drain_stream(cf, data);
return 0;
}
@@ -1045,7 +1124,7 @@ static CURLcode write_resp_raw(struct Curl_cfilter *cf,
const void *mem, size_t memlen,
bool flow)
{
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
CURLcode result = CURLE_OK;
ssize_t nwritten;
@@ -1085,7 +1164,7 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
(void)stream3_id;
result = write_resp_raw(cf, data, buf, buflen, TRUE);
- drain_stream(cf, data);
+ h3_drain_stream(cf, data);
return result? -1 : 0;
}
@@ -1110,7 +1189,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t stream_id,
{
struct Curl_cfilter *cf = user_data;
struct Curl_easy *data = stream_user_data;
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
CURLcode result = CURLE_OK;
(void)conn;
(void)stream_id;
@@ -1130,7 +1209,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t stream_id,
if(stream->status_code / 100 != 1) {
stream->resp_hds_complete = TRUE;
}
- drain_stream(cf, data);
+ h3_drain_stream(cf, data);
return 0;
}
@@ -1143,7 +1222,7 @@ static int cb_h3_recv_header(nghttp3_conn *conn, int64_t stream_id,
nghttp3_vec h3name = nghttp3_rcbuf_get_buf(name);
nghttp3_vec h3val = nghttp3_rcbuf_get_buf(value);
struct Curl_easy *data = stream_user_data;
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
CURLcode result = CURLE_OK;
(void)conn;
(void)stream_id;
@@ -1207,7 +1286,8 @@ static int cb_h3_stop_sending(nghttp3_conn *conn, int64_t stream_id,
(void)conn;
(void)stream_user_data;
- rv = ngtcp2_conn_shutdown_stream_read(ctx->qconn, stream_id, app_error_code);
+ rv = ngtcp2_conn_shutdown_stream_read(ctx->qconn, 0, stream_id,
+ app_error_code);
if(rv && rv != NGTCP2_ERR_STREAM_NOT_FOUND) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
@@ -1225,7 +1305,7 @@ static int cb_h3_reset_stream(nghttp3_conn *conn, int64_t stream_id,
(void)conn;
(void)data;
- rv = ngtcp2_conn_shutdown_stream_write(ctx->qconn, stream_id,
+ rv = ngtcp2_conn_shutdown_stream_write(ctx->qconn, 0, stream_id,
app_error_code);
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] reset -> %d", stream_id, rv));
if(rv && rv != NGTCP2_ERR_STREAM_NOT_FOUND) {
@@ -1249,7 +1329,8 @@ static nghttp3_callbacks ngh3_callbacks = {
cb_h3_stop_sending,
NULL, /* end_stream */
cb_h3_reset_stream,
- NULL /* shutdown */
+ NULL, /* shutdown */
+ NULL /* recv_settings */
};
static int init_ngh3_conn(struct Curl_cfilter *cf)
@@ -1314,7 +1395,7 @@ fail:
static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
struct Curl_easy *data,
- struct stream_ctx *stream,
+ struct h3_stream_ctx *stream,
CURLcode *err)
{
ssize_t nread = -1;
@@ -1364,9 +1445,10 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
char *buf, size_t len, CURLcode *err)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
ssize_t nread = -1;
struct cf_call_data save;
+ struct pkt_io_ctx pktx;
(void)ctx;
@@ -1377,6 +1459,8 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
DEBUGASSERT(ctx->h3conn);
*err = CURLE_OK;
+ pktx_init(&pktx, cf, data);
+
if(!stream) {
*err = CURLE_RECV_ERROR;
goto out;
@@ -1392,7 +1476,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
report_consumed_data(cf, data, nread);
}
- if(cf_process_ingress(cf, data)) {
+ if(cf_progress_ingress(cf, data, &pktx)) {
*err = CURLE_RECV_ERROR;
nread = -1;
goto out;
@@ -1410,7 +1494,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
}
if(nread > 0) {
- drain_stream(cf, data);
+ h3_drain_stream(cf, data);
}
else {
if(stream->closed) {
@@ -1422,10 +1506,17 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
}
out:
- if(cf_flush_egress(cf, data)) {
+ if(cf_progress_egress(cf, data, &pktx)) {
*err = CURLE_SEND_ERROR;
nread = -1;
}
+ else {
+ CURLcode result2 = check_and_set_expiry(cf, data, &pktx);
+ if(result2) {
+ *err = result2;
+ nread = -1;
+ }
+ }
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv(len=%zu) -> %zd, %d",
stream? stream->id : -1, len, nread, *err));
CF_DATA_RESTORE(cf, save);
@@ -1438,7 +1529,7 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
{
struct Curl_cfilter *cf = user_data;
struct Curl_easy *data = stream_user_data;
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
size_t skiplen;
(void)cf;
@@ -1454,10 +1545,8 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
Curl_bufq_skip(&stream->sendbuf, skiplen);
stream->sendbuf_len_in_flight -= skiplen;
- /* `sendbuf` *might* now have more room. If so, resume this
- * possibly paused stream. And also tell our transfer engine that
- * it may continue KEEP_SEND if told to PAUSE. */
- if(!Curl_bufq_is_full(&stream->sendbuf)) {
+ /* Everything ACKed, we resume upload processing */
+ if(!stream->sendbuf_len_in_flight) {
int rv = nghttp3_conn_resume_stream(conn, stream_id);
if(rv) {
return NGTCP2_ERR_CALLBACK_FAILURE;
@@ -1465,7 +1554,7 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
if((data->req.keepon & KEEP_SEND_HOLD) &&
(data->req.keepon & KEEP_SEND)) {
data->req.keepon &= ~KEEP_SEND_HOLD;
- drain_stream(cf, data);
+ h3_drain_stream(cf, data);
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] unpausing acks",
stream_id));
}
@@ -1481,7 +1570,7 @@ cb_h3_read_req_body(nghttp3_conn *conn, int64_t stream_id,
{
struct Curl_cfilter *cf = user_data;
struct Curl_easy *data = stream_user_data;
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
ssize_t nwritten = 0;
size_t nvecs = 0;
(void)cf;
@@ -1530,8 +1619,10 @@ cb_h3_read_req_body(nghttp3_conn *conn, int64_t stream_id,
}
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] read req body -> "
- "%d vecs%s with %zu (buffered=%zu, left=%zd)", stream->id,
- (int)nvecs, *pflags == NGHTTP3_DATA_FLAG_EOF?" EOF":"",
+ "%d vecs%s with %zu (buffered=%zu, left=%"
+ CURL_FORMAT_CURL_OFF_T ")",
+ stream->id, (int)nvecs,
+ *pflags == NGHTTP3_DATA_FLAG_EOF?" EOF":"",
nwritten, Curl_bufq_len(&stream->sendbuf),
stream->upload_left));
return (nghttp3_ssize)nvecs;
@@ -1547,7 +1638,7 @@ static ssize_t h3_stream_open(struct Curl_cfilter *cf,
CURLcode *err)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
- struct stream_ctx *stream = NULL;
+ struct h3_stream_ctx *stream = NULL;
struct h1_req_parser h1;
struct dynhds h2_headers;
size_t nheader;
@@ -1614,16 +1705,19 @@ static ssize_t h3_stream_open(struct Curl_cfilter *cf,
else
/* data sending without specifying the data amount up front */
stream->upload_left = -1; /* unknown */
- reader.read_data = cb_h3_read_req_body;
- preader = &reader;
break;
default:
/* there is not request body */
stream->upload_left = 0; /* no request body */
- preader = NULL;
break;
}
+ stream->send_closed = (stream->upload_left == 0);
+ if(!stream->send_closed) {
+ reader.read_data = cb_h3_read_req_body;
+ preader = &reader;
+ }
+
rc = nghttp3_conn_submit_request(ctx->h3conn, stream->id,
nva, nheader, preader, data);
if(rc) {
@@ -1642,8 +1736,7 @@ static ssize_t h3_stream_open(struct Curl_cfilter *cf,
goto out;
}
- infof(data, "Using HTTP/3 Stream ID: %" PRId64 " (easy handle %p)",
- stream->id, (void *)data);
+ infof(data, "Using HTTP/3 Stream ID: %" PRId64, stream->id);
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] opened for %s",
stream->id, data->state.url));
@@ -1658,20 +1751,23 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
const void *buf, size_t len, CURLcode *err)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
ssize_t sent = 0;
struct cf_call_data save;
+ struct pkt_io_ctx pktx;
+ CURLcode result;
CF_DATA_SAVE(save, cf, data);
DEBUGASSERT(cf->connected);
DEBUGASSERT(ctx->qconn);
DEBUGASSERT(ctx->h3conn);
+ pktx_init(&pktx, cf, data);
*err = CURLE_OK;
- if(stream && stream->closed) {
- *err = CURLE_HTTP3;
+ result = cf_progress_ingress(cf, data, &pktx);
+ if(result) {
+ *err = result;
sent = -1;
- goto out;
}
if(!stream || stream->id < 0) {
@@ -1681,32 +1777,66 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
goto out;
}
}
+ else if(stream->upload_blocked_len) {
+ /* the data in `buf` has alread been submitted or added to the
+ * buffers, but have been EAGAINed on the last invocation. */
+ DEBUGASSERT(len >= stream->upload_blocked_len);
+ if(len < stream->upload_blocked_len) {
+ /* Did we get called again with a smaller `len`? This should not
+ * happen. We are not prepared to handle that. */
+ failf(data, "HTTP/3 send again with decreased length");
+ *err = CURLE_HTTP3;
+ sent = -1;
+ goto out;
+ }
+ sent = (ssize_t)stream->upload_blocked_len;
+ stream->upload_blocked_len = 0;
+ }
+ else if(stream->closed) {
+ *err = CURLE_HTTP3;
+ sent = -1;
+ goto out;
+ }
else {
sent = Curl_bufq_write(&stream->sendbuf, buf, len, err);
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send, add to "
"sendbuf(len=%zu) -> %zd, %d",
stream->id, len, sent, *err));
if(sent < 0) {
- if(*err == CURLE_AGAIN) {
- /* Can't add more to the send buf, needs to drain first.
- * Pause the sending to avoid a busy loop. */
- data->req.keepon |= KEEP_SEND_HOLD;
- DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] pause send",
- stream->id));
- }
goto out;
}
(void)nghttp3_conn_resume_stream(ctx->h3conn, stream->id);
}
- if(cf_flush_egress(cf, data)) {
- *err = CURLE_SEND_ERROR;
+ result = cf_progress_egress(cf, data, &pktx);
+ if(result) {
+ *err = result;
sent = -1;
- goto out;
+ }
+
+ if(stream && sent > 0 && stream->sendbuf_len_in_flight) {
+ /* We have unacknowledged DATA and cannot report success to our
+ * caller. Instead we EAGAIN and remember how much we have already
+ * "written" into our various internal connection buffers.
+ * We put the stream upload on HOLD, until this gets ACKed. */
+ stream->upload_blocked_len = sent;
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send(len=%zu), "
+ "%zu bytes in flight -> EGAIN", stream->id, len,
+ stream->sendbuf_len_in_flight));
+ *err = CURLE_AGAIN;
+ sent = -1;
+ data->req.keepon |= KEEP_SEND_HOLD;
}
out:
+ result = check_and_set_expiry(cf, data, &pktx);
+ if(result) {
+ *err = result;
+ sent = -1;
+ }
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send(len=%zu) -> %zd, %d",
+ stream? stream->id : -1, len, sent, *err));
CF_DATA_RESTORE(cf, save);
return sent;
}
@@ -1763,34 +1893,27 @@ static CURLcode qng_verify_peer(struct Curl_cfilter *cf,
return result;
}
-struct recv_ctx {
- struct Curl_cfilter *cf;
- struct Curl_easy *data;
- ngtcp2_tstamp ts;
- size_t pkt_count;
-};
-
static CURLcode recv_pkt(const unsigned char *pkt, size_t pktlen,
struct sockaddr_storage *remote_addr,
socklen_t remote_addrlen, int ecn,
void *userp)
{
- struct recv_ctx *r = userp;
- struct cf_ngtcp2_ctx *ctx = r->cf->ctx;
+ struct pkt_io_ctx *pktx = userp;
+ struct cf_ngtcp2_ctx *ctx = pktx->cf->ctx;
ngtcp2_pkt_info pi;
ngtcp2_path path;
int rv;
- ++r->pkt_count;
+ ++pktx->pkt_count;
ngtcp2_addr_init(&path.local, (struct sockaddr *)&ctx->q.local_addr,
ctx->q.local_addrlen);
ngtcp2_addr_init(&path.remote, (struct sockaddr *)remote_addr,
remote_addrlen);
pi.ecn = (uint32_t)ecn;
- rv = ngtcp2_conn_read_pkt(ctx->qconn, &path, &pi, pkt, pktlen, r->ts);
+ rv = ngtcp2_conn_read_pkt(ctx->qconn, &path, &pi, pkt, pktlen, pktx->ts);
if(rv) {
- DEBUGF(LOG_CF(r->data, r->cf, "ingress, read_pkt -> %s",
+ DEBUGF(LOG_CF(pktx->data, pktx->cf, "ingress, read_pkt -> %s",
ngtcp2_strerror(rv)));
if(!ctx->last_error.error_code) {
if(rv == NGTCP2_ERR_CRYPTO) {
@@ -1813,41 +1936,40 @@ static CURLcode recv_pkt(const unsigned char *pkt, size_t pktlen,
return CURLE_OK;
}
-static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
- struct Curl_easy *data)
+static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct pkt_io_ctx *pktx)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
- struct recv_ctx rctx;
+ struct pkt_io_ctx local_pktx;
size_t pkts_chunk = 128, i;
size_t pkts_max = 10 * pkts_chunk;
- CURLcode result;
+ CURLcode result = CURLE_OK;
- rctx.cf = cf;
- rctx.data = data;
- rctx.ts = timestamp();
- rctx.pkt_count = 0;
+ if(!pktx) {
+ pktx_init(&local_pktx, cf, data);
+ pktx = &local_pktx;
+ }
+ else {
+ pktx->ts = timestamp();
+ }
for(i = 0; i < pkts_max; i += pkts_chunk) {
- rctx.pkt_count = 0;
+ pktx->pkt_count = 0;
result = vquic_recv_packets(cf, data, &ctx->q, pkts_chunk,
- recv_pkt, &rctx);
+ recv_pkt, pktx);
if(result) /* error */
break;
- if(rctx.pkt_count < pkts_chunk) /* got less than we could */
+ if(pktx->pkt_count < pkts_chunk) /* got less than we could */
break;
/* give egress a chance before we receive more */
- result = cf_flush_egress(cf, data);
+ result = cf_progress_egress(cf, data, pktx);
+ if(result) /* error */
+ break;
}
return result;
}
-struct read_ctx {
- struct Curl_cfilter *cf;
- struct Curl_easy *data;
- ngtcp2_tstamp ts;
- ngtcp2_path_storage *ps;
-};
-
/**
* Read a network packet to send from ngtcp2 into `buf`.
* Return number of bytes written or -1 with *err set.
@@ -1856,7 +1978,7 @@ static ssize_t read_pkt_to_send(void *userp,
unsigned char *buf, size_t buflen,
CURLcode *err)
{
- struct read_ctx *x = userp;
+ struct pkt_io_ctx *x = userp;
struct cf_ngtcp2_ctx *ctx = x->cf->ctx;
nghttp3_vec vec[16];
nghttp3_ssize veccnt;
@@ -1896,7 +2018,7 @@ static ssize_t read_pkt_to_send(void *userp,
flags = NGTCP2_WRITE_STREAM_FLAG_MORE |
(fin ? NGTCP2_WRITE_STREAM_FLAG_FIN : 0);
- n = ngtcp2_conn_writev_stream(ctx->qconn, x->ps? &x->ps->path : NULL,
+ n = ngtcp2_conn_writev_stream(ctx->qconn, &x->ps.path,
NULL, buf, buflen,
&ndatalen, flags, stream_id,
(const ngtcp2_vec *)vec, veccnt, x->ts);
@@ -1955,28 +2077,25 @@ out:
return nwritten;
}
-static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
- struct Curl_easy *data)
+static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct pkt_io_ctx *pktx)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
- int rv;
ssize_t nread;
size_t max_payload_size, path_max_payload_size, max_pktcnt;
size_t pktcnt = 0;
size_t gsolen = 0; /* this disables gso until we have a clue */
- ngtcp2_path_storage ps;
- ngtcp2_tstamp ts = timestamp();
- ngtcp2_tstamp expiry;
- ngtcp2_duration timeout;
CURLcode curlcode;
- struct read_ctx readx;
+ struct pkt_io_ctx local_pktx;
- rv = ngtcp2_conn_handle_expiry(ctx->qconn, ts);
- if(rv) {
- failf(data, "ngtcp2_conn_handle_expiry returned error: %s",
- ngtcp2_strerror(rv));
- ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0);
- return CURLE_SEND_ERROR;
+ if(!pktx) {
+ pktx_init(&local_pktx, cf, data);
+ pktx = &local_pktx;
+ }
+ else {
+ pktx->ts = timestamp();
+ ngtcp2_path_storage_zero(&pktx->ps);
}
curlcode = vquic_flush(cf, data, &ctx->q);
@@ -1988,8 +2107,6 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
return curlcode;
}
- ngtcp2_path_storage_zero(&ps);
-
/* In UDP, there is a maximum theoretical packet paload length and
* a minimum payload length that is "guarantueed" to work.
* To detect if this minimum payload can be increased, ngtcp2 sends
@@ -2008,15 +2125,10 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
max_pktcnt = CURLMIN(MAX_PKT_BURST,
ctx->q.sendbuf.chunk_size / max_payload_size);
- readx.cf = cf;
- readx.data = data;
- readx.ts = ts;
- readx.ps = &ps;
-
for(;;) {
/* add the next packet to send, if any, to our buffer */
nread = Curl_bufq_sipn(&ctx->q.sendbuf, max_payload_size,
- read_pkt_to_send, &readx, &curlcode);
+ read_pkt_to_send, pktx, &curlcode);
/* DEBUGF(LOG_CF(data, cf, "sip packet(maxlen=%zu) -> %zd, %d",
max_payload_size, nread, curlcode)); */
if(nread < 0) {
@@ -2076,21 +2188,6 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
}
out:
- /* non-errored exit. check when we should run again. */
- expiry = ngtcp2_conn_get_expiry(ctx->qconn);
- if(expiry != UINT64_MAX) {
- if(expiry <= ts) {
- timeout = 0;
- }
- else {
- timeout = expiry - ts;
- if(timeout % NGTCP2_MILLISECONDS) {
- timeout += NGTCP2_MILLISECONDS;
- }
- }
- Curl_expire(data, timeout / NGTCP2_MILLISECONDS, EXPIRE_QUIC);
- }
-
return CURLE_OK;
}
@@ -2101,7 +2198,7 @@ out:
static bool cf_ngtcp2_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data)
{
- const struct stream_ctx *stream = H3_STREAM_CTX(data);
+ const struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
(void)cf;
return stream && !Curl_bufq_is_empty(&stream->recvbuf);
}
@@ -2113,7 +2210,7 @@ static CURLcode h3_data_pause(struct Curl_cfilter *cf,
/* TODO: there seems right now no API in ngtcp2 to shrink/enlarge
* the streams windows. As we do in HTTP/2. */
if(!pause) {
- drain_stream(cf, data);
+ h3_drain_stream(cf, data);
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
return CURLE_OK;
@@ -2141,7 +2238,7 @@ static CURLcode cf_ngtcp2_data_event(struct Curl_cfilter *cf,
break;
}
case CF_CTRL_DATA_DONE_SEND: {
- struct stream_ctx *stream = H3_STREAM_CTX(data);
+ struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
if(stream && !stream->send_closed) {
stream->send_closed = TRUE;
stream->upload_left = Curl_bufq_len(&stream->sendbuf);
@@ -2150,11 +2247,7 @@ static CURLcode cf_ngtcp2_data_event(struct Curl_cfilter *cf,
break;
}
case CF_CTRL_DATA_IDLE:
- if(timestamp() >= ngtcp2_conn_get_expiry(ctx->qconn)) {
- if(cf_flush_egress(cf, data)) {
- result = CURLE_SEND_ERROR;
- }
- }
+ result = check_and_set_expiry(cf, data, NULL);
break;
default:
break;
@@ -2250,7 +2343,8 @@ static void cf_ngtcp2_destroy(struct Curl_cfilter *cf, struct Curl_easy *data)
* Might be called twice for happy eyeballs.
*/
static CURLcode cf_connect_start(struct Curl_cfilter *cf,
- struct Curl_easy *data)
+ struct Curl_easy *data,
+ struct pkt_io_ctx *pktx)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
int rc;
@@ -2294,7 +2388,7 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
(void)Curl_qlogdir(data, ctx->scid.data, NGTCP2_MAX_CIDLEN, &qfd);
ctx->qlogfd = qfd; /* -1 if failure above */
- quic_settings(ctx, data);
+ quic_settings(ctx, data, pktx);
result = vquic_ctx_init(&ctx->q);
if(result)
@@ -2344,6 +2438,7 @@ static CURLcode cf_ngtcp2_connect(struct Curl_cfilter *cf,
CURLcode result = CURLE_OK;
struct cf_call_data save;
struct curltime now;
+ struct pkt_io_ctx pktx;
if(cf->connected) {
*done = TRUE;
@@ -2359,6 +2454,7 @@ static CURLcode cf_ngtcp2_connect(struct Curl_cfilter *cf,
*done = FALSE;
now = Curl_now();
+ pktx_init(&pktx, cf, data);
CF_DATA_SAVE(save, cf, data);
@@ -2370,19 +2466,19 @@ static CURLcode cf_ngtcp2_connect(struct Curl_cfilter *cf,
if(!ctx->qconn) {
ctx->started_at = now;
- result = cf_connect_start(cf, data);
+ result = cf_connect_start(cf, data, &pktx);
if(result)
goto out;
- result = cf_flush_egress(cf, data);
+ result = cf_progress_egress(cf, data, &pktx);
/* we do not expect to be able to recv anything yet */
goto out;
}
- result = cf_process_ingress(cf, data);
+ result = cf_progress_ingress(cf, data, &pktx);
if(result)
goto out;
- result = cf_flush_egress(cf, data);
+ result = cf_progress_egress(cf, data, &pktx);
if(result)
goto out;
@@ -2402,7 +2498,7 @@ static CURLcode cf_ngtcp2_connect(struct Curl_cfilter *cf,
out:
if(result == CURLE_RECV_ERROR && ctx->qconn &&
- ngtcp2_conn_is_in_draining_period(ctx->qconn)) {
+ ngtcp2_conn_in_draining_period(ctx->qconn)) {
/* When a QUIC server instance is shutting down, it may send us a
* CONNECTION_CLOSE right away. Our connection then enters the DRAINING
* state.
@@ -2439,6 +2535,9 @@ out:
r_ip, r_port, curl_easy_strerror(result));
}
#endif
+ if(!result && ctx->qconn) {
+ result = check_and_set_expiry(cf, data, &pktx);
+ }
DEBUGF(LOG_CF(data, cf, "connect -> %d, done=%d", result, *done));
CF_DATA_RESTORE(cf, save);
return result;
@@ -2510,13 +2609,11 @@ static bool cf_ngtcp2_conn_is_alive(struct Curl_cfilter *cf,
not in use by any other transfer, there shouldn't be any data here,
only "protocol frames" */
*input_pending = FALSE;
- Curl_attach_connection(data, cf->conn);
- if(cf_process_ingress(cf, data))
+ if(cf_progress_ingress(cf, data, NULL))
alive = FALSE;
else {
alive = TRUE;
}
- Curl_detach_connection(data);
}
return alive;