diff options
Diffstat (limited to 'Utilities/cmcurl/lib/vquic/curl_msh3.c')
-rw-r--r-- | Utilities/cmcurl/lib/vquic/curl_msh3.c | 712 |
1 files changed, 474 insertions, 238 deletions
diff --git a/Utilities/cmcurl/lib/vquic/curl_msh3.c b/Utilities/cmcurl/lib/vquic/curl_msh3.c index 5308999..1738867 100644 --- a/Utilities/cmcurl/lib/vquic/curl_msh3.c +++ b/Utilities/cmcurl/lib/vquic/curl_msh3.c @@ -35,7 +35,7 @@ #include "cf-socket.h" #include "connect.h" #include "progress.h" -#include "h2h3.h" +#include "http1.h" #include "curl_msh3.h" #include "socketpair.h" #include "vquic/vquic.h" @@ -45,16 +45,10 @@ #include "curl_memory.h" #include "memdebug.h" -#define DEBUG_CF 1 - -#if DEBUG_CF && defined(DEBUGBUILD) -#define CF_DEBUGF(x) x -#else -#define CF_DEBUGF(x) do { } while(0) -#endif - -#define MSH3_REQ_INIT_BUF_LEN 16384 -#define MSH3_REQ_MAX_BUF_LEN 0x100000 +#define H3_STREAM_WINDOW_SIZE (128 * 1024) +#define H3_STREAM_CHUNK_SIZE (16 * 1024) +#define H3_STREAM_RECV_CHUNKS \ + (H3_STREAM_WINDOW_SIZE / H3_STREAM_CHUNK_SIZE) #ifdef _WIN32 #define msh3_lock CRITICAL_SECTION @@ -116,6 +110,7 @@ struct cf_msh3_ctx { curl_socket_t sock[2]; /* fake socket pair until we get support in msh3 */ char l_ip[MAX_IPADR_LEN]; /* local IP as string */ int l_port; /* local port number */ + struct cf_call_data call_data; struct curltime connect_started; /* time the current attempt started */ struct curltime handshake_at; /* time connect handshake finished */ /* Flags written by msh3/msquic thread */ @@ -127,6 +122,104 @@ struct cf_msh3_ctx { BIT(active); }; +/* How to access `call_data` from a cf_msh3 filter */ +#define CF_CTX_CALL_DATA(cf) \ + ((struct cf_msh3_ctx *)(cf)->ctx)->call_data + +/** + * All about the H3 internals of a stream + */ +struct stream_ctx { + struct MSH3_REQUEST *req; + struct bufq recvbuf; /* h3 response */ +#ifdef _WIN32 + CRITICAL_SECTION recv_lock; +#else /* !_WIN32 */ + pthread_mutex_t recv_lock; +#endif /* _WIN32 */ + uint64_t error3; /* HTTP/3 stream error code */ + int status_code; /* HTTP status code */ + CURLcode recv_error; + bool closed; + bool reset; + bool upload_done; + bool firstheader; /* FALSE until headers arrive */ + bool recv_header_complete; +}; + +#define H3_STREAM_CTX(d) ((struct stream_ctx *)(((d) && (d)->req.p.http)? \ + ((struct HTTP *)(d)->req.p.http)->h3_ctx \ + : NULL)) +#define H3_STREAM_LCTX(d) ((struct HTTP *)(d)->req.p.http)->h3_ctx +#define H3_STREAM_ID(d) (H3_STREAM_CTX(d)? \ + H3_STREAM_CTX(d)->id : -2) + + +static CURLcode h3_data_setup(struct Curl_cfilter *cf, + struct Curl_easy *data) +{ + struct stream_ctx *stream = H3_STREAM_CTX(data); + + if(stream) + return CURLE_OK; + + stream = calloc(1, sizeof(*stream)); + if(!stream) + return CURLE_OUT_OF_MEMORY; + + H3_STREAM_LCTX(data) = stream; + stream->req = ZERO_NULL; + msh3_lock_initialize(&stream->recv_lock); + Curl_bufq_init2(&stream->recvbuf, H3_STREAM_CHUNK_SIZE, + H3_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT); + DEBUGF(LOG_CF(data, cf, "data setup (easy %p)", (void *)data)); + return CURLE_OK; +} + +static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) +{ + struct stream_ctx *stream = H3_STREAM_CTX(data); + + (void)cf; + if(stream) { + DEBUGF(LOG_CF(data, cf, "easy handle is done")); + Curl_bufq_free(&stream->recvbuf); + free(stream); + H3_STREAM_LCTX(data) = NULL; + } +} + +static void drain_stream_from_other_thread(struct Curl_easy *data, + struct stream_ctx *stream) +{ + unsigned char bits; + + /* risky */ + bits = CURL_CSELECT_IN; + if(stream && !stream->upload_done) + bits |= CURL_CSELECT_OUT; + if(data->state.dselect_bits != bits) { + data->state.dselect_bits = bits; + /* cannot expire from other thread */ + } +} + +static void drain_stream(struct Curl_cfilter *cf, + struct Curl_easy *data) +{ + struct stream_ctx *stream = H3_STREAM_CTX(data); + unsigned char bits; + + (void)cf; + bits = CURL_CSELECT_IN; + if(stream && !stream->upload_done) + bits |= CURL_CSELECT_OUT; + if(data->state.dselect_bits != bits) { + data->state.dselect_bits = bits; + Curl_expire(data, 0, EXPIRE_RUN_NOW); + } +} + static const MSH3_CONNECTION_IF msh3_conn_if = { msh3_conn_connected, msh3_conn_shutdown_complete, @@ -136,10 +229,12 @@ static const MSH3_CONNECTION_IF msh3_conn_if = { static void MSH3_CALL msh3_conn_connected(MSH3_CONNECTION *Connection, void *IfContext) { - struct cf_msh3_ctx *ctx = IfContext; + struct Curl_cfilter *cf = IfContext; + struct cf_msh3_ctx *ctx = cf->ctx; + struct Curl_easy *data = CF_DATA_CURRENT(cf); (void)Connection; - if(ctx->verbose) - CF_DEBUGF(fprintf(stderr, "* [MSH3] evt: connected\n")); + + DEBUGF(LOG_CF(data, cf, "[MSH3] connected")); ctx->handshake_succeeded = true; ctx->connected = true; ctx->handshake_complete = true; @@ -148,10 +243,12 @@ static void MSH3_CALL msh3_conn_connected(MSH3_CONNECTION *Connection, static void MSH3_CALL msh3_conn_shutdown_complete(MSH3_CONNECTION *Connection, void *IfContext) { - struct cf_msh3_ctx *ctx = IfContext; + struct Curl_cfilter *cf = IfContext; + struct cf_msh3_ctx *ctx = cf->ctx; + struct Curl_easy *data = CF_DATA_CURRENT(cf); + (void)Connection; - if(ctx->verbose) - CF_DEBUGF(fprintf(stderr, "* [MSH3] evt: shutdown complete\n")); + DEBUGF(LOG_CF(data, cf, "[MSH3] shutdown complete")); ctx->connected = false; ctx->handshake_complete = true; } @@ -173,173 +270,167 @@ static const MSH3_REQUEST_IF msh3_request_if = { msh3_data_sent }; -static CURLcode msh3_data_setup(struct Curl_cfilter *cf, - struct Curl_easy *data) +/* Decode HTTP status code. Returns -1 if no valid status code was + decoded. (duplicate from http2.c) */ +static int decode_status_code(const char *value, size_t len) { - struct HTTP *stream = data->req.p.http; - (void)cf; + int i; + int res; - DEBUGASSERT(stream); - if(!stream->recv_buf) { - DEBUGF(LOG_CF(data, cf, "req: setup")); - stream->recv_buf = malloc(MSH3_REQ_INIT_BUF_LEN); - if(!stream->recv_buf) { - return CURLE_OUT_OF_MEMORY; + if(len != 3) { + return -1; + } + + res = 0; + + for(i = 0; i < 3; ++i) { + char c = value[i]; + + if(c < '0' || c > '9') { + return -1; } - stream->req = ZERO_NULL; - msh3_lock_initialize(&stream->recv_lock); - stream->recv_buf_alloc = MSH3_REQ_INIT_BUF_LEN; - stream->recv_buf_max = MSH3_REQ_MAX_BUF_LEN; - stream->recv_header_len = 0; - stream->recv_header_complete = false; - stream->recv_data_len = 0; - stream->recv_data_complete = false; - stream->recv_error = CURLE_OK; + + res *= 10; + res += c - '0'; } - return CURLE_OK; + + return res; } -/* Requires stream->recv_lock to be held */ -static bool msh3request_ensure_room(struct HTTP *stream, size_t len) -{ - uint8_t *new_recv_buf; - const size_t cur_recv_len = stream->recv_header_len + stream->recv_data_len; - - if(cur_recv_len + len > stream->recv_buf_alloc) { - size_t new_recv_buf_alloc_len = stream->recv_buf_alloc; - do { - new_recv_buf_alloc_len <<= 1; /* TODO - handle overflow */ - } while(cur_recv_len + len > new_recv_buf_alloc_len); - CF_DEBUGF(fprintf(stderr, "* enlarging buffer to %zu\n", - new_recv_buf_alloc_len)); - new_recv_buf = malloc(new_recv_buf_alloc_len); - if(!new_recv_buf) { - CF_DEBUGF(fprintf(stderr, "* FAILED: enlarging buffer to %zu\n", - new_recv_buf_alloc_len)); - return false; - } - if(cur_recv_len) { - memcpy(new_recv_buf, stream->recv_buf, cur_recv_len); - } - stream->recv_buf_alloc = new_recv_buf_alloc_len; - free(stream->recv_buf); - stream->recv_buf = new_recv_buf; +/* + * write_resp_raw() copies response data in raw format to the `data`'s + * receive buffer. If not enough space is available, it appends to the + * `data`'s overflow buffer. + */ +static CURLcode write_resp_raw(struct Curl_easy *data, + const void *mem, size_t memlen) +{ + struct stream_ctx *stream = H3_STREAM_CTX(data); + CURLcode result = CURLE_OK; + ssize_t nwritten; + + if(!stream) + return CURLE_RECV_ERROR; + + nwritten = Curl_bufq_write(&stream->recvbuf, mem, memlen, &result); + if(nwritten < 0) { + return result; + } + + if((size_t)nwritten < memlen) { + /* This MUST not happen. Our recbuf is dimensioned to hold the + * full max_stream_window and then some for this very reason. */ + DEBUGASSERT(0); + return CURLE_RECV_ERROR; } - return true; + return result; } static void MSH3_CALL msh3_header_received(MSH3_REQUEST *Request, - void *IfContext, - const MSH3_HEADER *Header) + void *userp, + const MSH3_HEADER *hd) { - struct Curl_easy *data = IfContext; - struct HTTP *stream = data->req.p.http; - size_t total_len; + struct Curl_easy *data = userp; + struct stream_ctx *stream = H3_STREAM_CTX(data); + CURLcode result; (void)Request; - if(stream->recv_header_complete) { - CF_DEBUGF(fprintf(stderr, "* ignoring header after data\n")); + if(!stream || stream->recv_header_complete) { return; } msh3_lock_acquire(&stream->recv_lock); - if((Header->NameLength == 7) && - !strncmp(H2H3_PSEUDO_STATUS, (char *)Header->Name, 7)) { - total_len = 10 + Header->ValueLength; - if(!msh3request_ensure_room(stream, total_len)) { - CF_DEBUGF(fprintf(stderr, "* ERROR: unable to buffer: %.*s\n", - (int)Header->NameLength, Header->Name)); - stream->recv_error = CURLE_OUT_OF_MEMORY; - goto release_lock; - } - msnprintf((char *)stream->recv_buf + stream->recv_header_len, - stream->recv_buf_alloc - stream->recv_header_len, - "HTTP/3 %.*s \r\n", (int)Header->ValueLength, Header->Value); + if((hd->NameLength == 7) && + !strncmp(HTTP_PSEUDO_STATUS, (char *)hd->Name, 7)) { + char line[14]; /* status line is always 13 characters long */ + size_t ncopy; + + DEBUGASSERT(!stream->firstheader); + stream->status_code = decode_status_code(hd->Value, hd->ValueLength); + DEBUGASSERT(stream->status_code != -1); + ncopy = msnprintf(line, sizeof(line), "HTTP/3 %03d \r\n", + stream->status_code); + result = write_resp_raw(data, line, ncopy); + if(result) + stream->recv_error = result; + stream->firstheader = TRUE; } else { - total_len = 4 + Header->NameLength + Header->ValueLength; - if(!msh3request_ensure_room(stream, total_len)) { - CF_DEBUGF(fprintf(stderr, "* ERROR: unable to buffer: %.*s\n", - (int)Header->NameLength, Header->Name)); - stream->recv_error = CURLE_OUT_OF_MEMORY; - goto release_lock; + /* store as an HTTP1-style header */ + DEBUGASSERT(stream->firstheader); + result = write_resp_raw(data, hd->Name, hd->NameLength); + if(!result) + result = write_resp_raw(data, ": ", 2); + if(!result) + result = write_resp_raw(data, hd->Value, hd->ValueLength); + if(!result) + result = write_resp_raw(data, "\r\n", 2); + if(result) { + stream->recv_error = result; } - msnprintf((char *)stream->recv_buf + stream->recv_header_len, - stream->recv_buf_alloc - stream->recv_header_len, - "%.*s: %.*s\r\n", - (int)Header->NameLength, Header->Name, - (int)Header->ValueLength, Header->Value); } - stream->recv_header_len += total_len; - data->state.drain = 1; - -release_lock: + drain_stream_from_other_thread(data, stream); msh3_lock_release(&stream->recv_lock); } static bool MSH3_CALL msh3_data_received(MSH3_REQUEST *Request, - void *IfContext, uint32_t *Length, - const uint8_t *Data) + void *IfContext, uint32_t *buflen, + const uint8_t *buf) { struct Curl_easy *data = IfContext; - struct HTTP *stream = data->req.p.http; - size_t cur_recv_len = stream->recv_header_len + stream->recv_data_len; + struct stream_ctx *stream = H3_STREAM_CTX(data); + CURLcode result; + bool rv = FALSE; + /* TODO: we would like to limit the amount of data we are buffer here. + * There seems to be no mechanism in msh3 to adjust flow control and + * it is undocumented what happens if we return FALSE here or less + * length (buflen is an inout parameter). + */ (void)Request; - if(data && data->set.verbose) - CF_DEBUGF(fprintf(stderr, "* [MSH3] req: evt: received %u. %zu buffered, " - "%zu allocated\n", - *Length, cur_recv_len, stream->recv_buf_alloc)); - /* TODO - Update this code to limit data bufferring by `stream->recv_buf_max` - and return `false` when we reach that limit. Then, when curl drains some - of the buffer, making room, call MsH3RequestSetReceiveEnabled to enable - receive callbacks again. */ + if(!stream) + return FALSE; + msh3_lock_acquire(&stream->recv_lock); if(!stream->recv_header_complete) { - if(data && data->set.verbose) - CF_DEBUGF(fprintf(stderr, "* [MSH3] req: Headers complete!\n")); - if(!msh3request_ensure_room(stream, 2)) { - stream->recv_error = CURLE_OUT_OF_MEMORY; - goto release_lock; + result = write_resp_raw(data, "\r\n", 2); + if(result) { + stream->recv_error = result; + goto out; } - stream->recv_buf[stream->recv_header_len++] = '\r'; - stream->recv_buf[stream->recv_header_len++] = '\n'; stream->recv_header_complete = true; - cur_recv_len += 2; } - if(!msh3request_ensure_room(stream, *Length)) { - stream->recv_error = CURLE_OUT_OF_MEMORY; - goto release_lock; + + result = write_resp_raw(data, buf, *buflen); + if(result) { + stream->recv_error = result; } - memcpy(stream->recv_buf + cur_recv_len, Data, *Length); - stream->recv_data_len += (size_t)*Length; - data->state.drain = 1; + rv = TRUE; -release_lock: +out: msh3_lock_release(&stream->recv_lock); - return true; + return rv; } static void MSH3_CALL msh3_complete(MSH3_REQUEST *Request, void *IfContext, - bool Aborted, uint64_t AbortError) + bool aborted, uint64_t error) { struct Curl_easy *data = IfContext; - struct HTTP *stream = data->req.p.http; + struct stream_ctx *stream = H3_STREAM_CTX(data); (void)Request; - (void)AbortError; - if(data && data->set.verbose) - CF_DEBUGF(fprintf(stderr, "* [MSH3] req: evt: complete, aborted=%s\n", - Aborted ? "true" : "false")); + if(!stream) + return; msh3_lock_acquire(&stream->recv_lock); - if(Aborted) { - stream->recv_error = CURLE_HTTP3; /* TODO - how do we pass AbortError? */ - } + stream->closed = TRUE; stream->recv_header_complete = true; - stream->recv_data_complete = true; + if(error) + stream->error3 = error; + if(aborted) + stream->reset = TRUE; msh3_lock_release(&stream->recv_lock); } @@ -347,7 +438,10 @@ static void MSH3_CALL msh3_shutdown_complete(MSH3_REQUEST *Request, void *IfContext) { struct Curl_easy *data = IfContext; - struct HTTP *stream = data->req.p.http; + struct stream_ctx *stream = H3_STREAM_CTX(data); + + if(!stream) + return; (void)Request; (void)stream; } @@ -356,138 +450,225 @@ static void MSH3_CALL msh3_data_sent(MSH3_REQUEST *Request, void *IfContext, void *SendContext) { struct Curl_easy *data = IfContext; - struct HTTP *stream = data->req.p.http; + struct stream_ctx *stream = H3_STREAM_CTX(data); + if(!stream) + return; (void)Request; (void)stream; (void)SendContext; } +static ssize_t recv_closed_stream(struct Curl_cfilter *cf, + struct Curl_easy *data, + CURLcode *err) +{ + struct stream_ctx *stream = H3_STREAM_CTX(data); + ssize_t nread = -1; + + if(!stream) { + *err = CURLE_RECV_ERROR; + return -1; + } + (void)cf; + if(stream->reset) { + failf(data, "HTTP/3 stream reset by server"); + *err = CURLE_PARTIAL_FILE; + DEBUGF(LOG_CF(data, cf, "cf_recv, was reset -> %d", *err)); + goto out; + } + else if(stream->error3) { + failf(data, "HTTP/3 stream was not closed cleanly: (error %zd)", + (ssize_t)stream->error3); + *err = CURLE_HTTP3; + DEBUGF(LOG_CF(data, cf, "cf_recv, closed uncleanly -> %d", *err)); + goto out; + } + else { + DEBUGF(LOG_CF(data, cf, "cf_recv, closed ok -> %d", *err)); + } + *err = CURLE_OK; + nread = 0; + +out: + return nread; +} + +static void set_quic_expire(struct Curl_cfilter *cf, struct Curl_easy *data) +{ + struct stream_ctx *stream = H3_STREAM_CTX(data); + + /* we have no indication from msh3 when it would be a good time + * to juggle the connection again. So, we compromise by calling + * us again every some milliseconds. */ + (void)cf; + if(stream && stream->req && !stream->closed) { + Curl_expire(data, 10, EXPIRE_QUIC); + } + else { + Curl_expire(data, 50, EXPIRE_QUIC); + } +} + static ssize_t cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data, char *buf, size_t len, CURLcode *err) { - struct HTTP *stream = data->req.p.http; - size_t outsize = 0; + struct stream_ctx *stream = H3_STREAM_CTX(data); + ssize_t nread = -1; + struct cf_call_data save; (void)cf; + if(!stream) { + *err = CURLE_RECV_ERROR; + return -1; + } + CF_DATA_SAVE(save, cf, data); DEBUGF(LOG_CF(data, cf, "req: recv with %zu byte buffer", len)); + msh3_lock_acquire(&stream->recv_lock); + if(stream->recv_error) { failf(data, "request aborted"); - data->state.drain = 0; *err = stream->recv_error; - return -1; + goto out; } *err = CURLE_OK; - msh3_lock_acquire(&stream->recv_lock); - if(stream->recv_header_len) { - outsize = len; - if(stream->recv_header_len < outsize) { - outsize = stream->recv_header_len; - } - memcpy(buf, stream->recv_buf, outsize); - if(outsize < stream->recv_header_len + stream->recv_data_len) { - memmove(stream->recv_buf, stream->recv_buf + outsize, - stream->recv_header_len + stream->recv_data_len - outsize); - } - stream->recv_header_len -= outsize; - DEBUGF(LOG_CF(data, cf, "req: returned %zu bytes of header", outsize)); - } - else if(stream->recv_data_len) { - outsize = len; - if(stream->recv_data_len < outsize) { - outsize = stream->recv_data_len; - } - memcpy(buf, stream->recv_buf, outsize); - if(outsize < stream->recv_data_len) { - memmove(stream->recv_buf, stream->recv_buf + outsize, - stream->recv_data_len - outsize); - } - stream->recv_data_len -= outsize; - DEBUGF(LOG_CF(data, cf, "req: returned %zu bytes of data", outsize)); - if(stream->recv_data_len == 0 && stream->recv_data_complete) - data->state.drain = 1; + if(!Curl_bufq_is_empty(&stream->recvbuf)) { + nread = Curl_bufq_read(&stream->recvbuf, + (unsigned char *)buf, len, err); + DEBUGF(LOG_CF(data, cf, "read recvbuf(len=%zu) -> %zd, %d", + len, nread, *err)); + if(nread < 0) + goto out; + if(stream->closed) + drain_stream(cf, data); } - else if(stream->recv_data_complete) { - DEBUGF(LOG_CF(data, cf, "req: receive complete")); - data->state.drain = 0; + else if(stream->closed) { + nread = recv_closed_stream(cf, data, err); + goto out; } else { DEBUGF(LOG_CF(data, cf, "req: nothing here, call again")); *err = CURLE_AGAIN; - outsize = -1; } +out: msh3_lock_release(&stream->recv_lock); - - return (ssize_t)outsize; + set_quic_expire(cf, data); + CF_DATA_RESTORE(cf, save); + return nread; } static ssize_t cf_msh3_send(struct Curl_cfilter *cf, struct Curl_easy *data, const void *buf, size_t len, CURLcode *err) { struct cf_msh3_ctx *ctx = cf->ctx; - struct HTTP *stream = data->req.p.http; - struct h2h3req *hreq; - size_t hdrlen = 0; - size_t sentlen = 0; + struct stream_ctx *stream = H3_STREAM_CTX(data); + struct h1_req_parser h1; + struct dynhds h2_headers; + MSH3_HEADER *nva = NULL; + size_t nheader, i; + ssize_t nwritten = -1; + struct cf_call_data save; + bool eos; + + CF_DATA_SAVE(save, cf, data); + + Curl_h1_req_parse_init(&h1, H1_PARSE_DEFAULT_MAX_LINE_LEN); + Curl_dynhds_init(&h2_headers, 0, DYN_HTTP_REQUEST); /* Sizes must match for cast below to work" */ - DEBUGASSERT(sizeof(MSH3_HEADER) == sizeof(struct h2h3pseudo)); + DEBUGASSERT(stream); DEBUGF(LOG_CF(data, cf, "req: send %zu bytes", len)); if(!stream->req) { /* The first send on the request contains the headers and possibly some data. Parse out the headers and create the request, then if there is any data left over go ahead and send it too. */ + nwritten = Curl_h1_req_parse_read(&h1, buf, len, NULL, 0, err); + if(nwritten < 0) + goto out; + DEBUGASSERT(h1.done); + DEBUGASSERT(h1.req); - *err = msh3_data_setup(cf, data); + *err = Curl_http_req_to_h2(&h2_headers, h1.req, data); if(*err) { - failf(data, "could not setup data"); - return -1; + nwritten = -1; + goto out; } - *err = Curl_pseudo_headers(data, buf, len, &hdrlen, &hreq); - if(*err) { - failf(data, "Curl_pseudo_headers failed"); - return -1; + nheader = Curl_dynhds_count(&h2_headers); + nva = malloc(sizeof(MSH3_HEADER) * nheader); + if(!nva) { + *err = CURLE_OUT_OF_MEMORY; + nwritten = -1; + goto out; } - DEBUGF(LOG_CF(data, cf, "req: send %zu headers", hreq->entries)); + for(i = 0; i < nheader; ++i) { + struct dynhds_entry *e = Curl_dynhds_getn(&h2_headers, i); + nva[i].Name = e->name; + nva[i].NameLength = e->namelen; + nva[i].Value = e->value; + nva[i].ValueLength = e->valuelen; + } + + switch(data->state.httpreq) { + case HTTPREQ_POST: + case HTTPREQ_POST_FORM: + case HTTPREQ_POST_MIME: + case HTTPREQ_PUT: + /* known request body size or -1 */ + eos = FALSE; + break; + default: + /* there is not request body */ + eos = TRUE; + stream->upload_done = TRUE; + break; + } + + DEBUGF(LOG_CF(data, cf, "req: send %zu headers", nheader)); stream->req = MsH3RequestOpen(ctx->qconn, &msh3_request_if, data, - (MSH3_HEADER*)hreq->header, hreq->entries, - hdrlen == len ? MSH3_REQUEST_FLAG_FIN : + nva, nheader, + eos ? MSH3_REQUEST_FLAG_FIN : MSH3_REQUEST_FLAG_NONE); - Curl_pseudo_free(hreq); if(!stream->req) { failf(data, "request open failed"); *err = CURLE_SEND_ERROR; - return -1; + goto out; } *err = CURLE_OK; - return len; + nwritten = len; + goto out; } + else { + /* request is open */ + DEBUGF(LOG_CF(data, cf, "req: send %zd body bytes", len)); + if(len > 0xFFFFFFFF) { + len = 0xFFFFFFFF; + } - DEBUGF(LOG_CF(data, cf, "req: send %zd body bytes", len)); - if(len > 0xFFFFFFFF) { - /* msh3 doesn't support size_t sends currently. */ - *err = CURLE_SEND_ERROR; - return -1; - } + if(!MsH3RequestSend(stream->req, MSH3_REQUEST_FLAG_NONE, buf, + (uint32_t)len, stream)) { + *err = CURLE_SEND_ERROR; + goto out; + } - /* TODO - Need an explicit signal to know when to FIN. */ - if(!MsH3RequestSend(stream->req, MSH3_REQUEST_FLAG_FIN, buf, (uint32_t)len, - stream)) { - *err = CURLE_SEND_ERROR; - return -1; + /* TODO - msh3/msquic will hold onto this memory until the send complete + event. How do we make sure curl doesn't free it until then? */ + *err = CURLE_OK; + nwritten = len; } - /* TODO - msh3/msquic will hold onto this memory until the send complete - event. How do we make sure curl doesn't free it until then? */ - sentlen += len; - *err = CURLE_OK; - return sentlen; +out: + set_quic_expire(cf, data); + free(nva); + Curl_h1_req_parse_free(&h1); + Curl_dynhds_free(&h2_headers); + CF_DATA_RESTORE(cf, save); + return nwritten; } static int cf_msh3_get_select_socks(struct Curl_cfilter *cf, @@ -495,36 +676,50 @@ static int cf_msh3_get_select_socks(struct Curl_cfilter *cf, curl_socket_t *socks) { struct cf_msh3_ctx *ctx = cf->ctx; - struct HTTP *stream = data->req.p.http; + struct stream_ctx *stream = H3_STREAM_CTX(data); int bitmap = GETSOCK_BLANK; + struct cf_call_data save; + CF_DATA_SAVE(save, cf, data); if(stream && ctx->sock[SP_LOCAL] != CURL_SOCKET_BAD) { socks[0] = ctx->sock[SP_LOCAL]; if(stream->recv_error) { bitmap |= GETSOCK_READSOCK(0); - data->state.drain = 1; + drain_stream(cf, data); } - else if(stream->recv_header_len || stream->recv_data_len) { + else if(stream->req) { bitmap |= GETSOCK_READSOCK(0); - data->state.drain = 1; + drain_stream(cf, data); } } - DEBUGF(LOG_CF(data, cf, "select_sock %u -> %d", - (uint32_t)data->state.drain, bitmap)); - + DEBUGF(LOG_CF(data, cf, "select_sock -> %d", bitmap)); + CF_DATA_RESTORE(cf, save); return bitmap; } static bool cf_msh3_data_pending(struct Curl_cfilter *cf, const struct Curl_easy *data) { - struct HTTP *stream = data->req.p.http; + struct stream_ctx *stream = H3_STREAM_CTX(data); + struct cf_call_data save; + bool pending = FALSE; + + CF_DATA_SAVE(save, cf, data); (void)cf; - DEBUGF(LOG_CF((struct Curl_easy *)data, cf, "data pending = %hhu", - (bool)(stream->recv_header_len || stream->recv_data_len))); - return stream->recv_header_len || stream->recv_data_len; + if(stream && stream->req) { + msh3_lock_acquire(&stream->recv_lock); + DEBUGF(LOG_CF((struct Curl_easy *)data, cf, "data pending = %zu", + Curl_bufq_len(&stream->recvbuf))); + pending = !Curl_bufq_is_empty(&stream->recvbuf); + msh3_lock_release(&stream->recv_lock); + if(pending) + drain_stream(cf, (struct Curl_easy *)data); + } + + CF_DATA_RESTORE(cf, save); + return pending; } static void cf_msh3_active(struct Curl_cfilter *cf, struct Curl_easy *data) @@ -544,36 +739,52 @@ static void cf_msh3_active(struct Curl_cfilter *cf, struct Curl_easy *data) ctx->active = TRUE; } +static CURLcode h3_data_pause(struct Curl_cfilter *cf, + struct Curl_easy *data, + bool pause) +{ + if(!pause) { + drain_stream(cf, data); + Curl_expire(data, 0, EXPIRE_RUN_NOW); + } + return CURLE_OK; +} + static CURLcode cf_msh3_data_event(struct Curl_cfilter *cf, struct Curl_easy *data, int event, int arg1, void *arg2) { - struct HTTP *stream = data->req.p.http; + struct stream_ctx *stream = H3_STREAM_CTX(data); + struct cf_call_data save; CURLcode result = CURLE_OK; + CF_DATA_SAVE(save, cf, data); + (void)arg1; (void)arg2; switch(event) { case CF_CTRL_DATA_SETUP: - result = msh3_data_setup(cf, data); + result = h3_data_setup(cf, data); + break; + case CF_CTRL_DATA_PAUSE: + result = h3_data_pause(cf, data, (arg1 != 0)); break; case CF_CTRL_DATA_DONE: - DEBUGF(LOG_CF(data, cf, "req: done")); + h3_data_done(cf, data); + break; + case CF_CTRL_DATA_DONE_SEND: + DEBUGF(LOG_CF(data, cf, "req: send done")); if(stream) { - if(stream->recv_buf) { - Curl_safefree(stream->recv_buf); - msh3_lock_uninitialize(&stream->recv_lock); - } + stream->upload_done = TRUE; if(stream->req) { - MsH3RequestClose(stream->req); - stream->req = ZERO_NULL; + char buf[1]; + if(!MsH3RequestSend(stream->req, MSH3_REQUEST_FLAG_FIN, + buf, 0, data)) { + result = CURLE_SEND_ERROR; + } } } break; - case CF_CTRL_DATA_DONE_SEND: - DEBUGF(LOG_CF(data, cf, "req: send done")); - stream->upload_done = TRUE; - break; case CF_CTRL_CONN_INFO_UPDATE: DEBUGF(LOG_CF(data, cf, "req: update info")); cf_msh3_active(cf, data); @@ -581,6 +792,8 @@ static CURLcode cf_msh3_data_event(struct Curl_cfilter *cf, default: break; } + + CF_DATA_RESTORE(cf, save); return result; } @@ -590,9 +803,10 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf, struct cf_msh3_ctx *ctx = cf->ctx; bool verify = !!cf->conn->ssl_config.verifypeer; MSH3_ADDR addr = {0}; + CURLcode result; + memcpy(&addr, &ctx->addr.sa_addr, ctx->addr.addrlen); MSH3_SET_PORT(&addr, (uint16_t)cf->conn->remote_port); - ctx->verbose = (data && data->set.verbose); if(verify && (cf->conn->ssl_config.CAfile || cf->conn->ssl_config.CApath)) { /* TODO: need a way to provide trust anchors to MSH3 */ @@ -618,7 +832,7 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf, ctx->qconn = MsH3ConnectionOpen(ctx->api, &msh3_conn_if, - ctx, + cf, cf->conn->host.name, &addr, !verify); @@ -631,6 +845,10 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf, return CURLE_FAILED_INIT; } + result = h3_data_setup(cf, data); + if(result) + return result; + return CURLE_OK; } @@ -639,6 +857,7 @@ static CURLcode cf_msh3_connect(struct Curl_cfilter *cf, bool blocking, bool *done) { struct cf_msh3_ctx *ctx = cf->ctx; + struct cf_call_data save; CURLcode result = CURLE_OK; (void)blocking; @@ -647,6 +866,8 @@ static CURLcode cf_msh3_connect(struct Curl_cfilter *cf, return CURLE_OK; } + CF_DATA_SAVE(save, cf, data); + if(ctx->sock[SP_LOCAL] == CURL_SOCKET_BAD) { if(Curl_socketpair(AF_UNIX, SOCK_STREAM, 0, &ctx->sock[0]) < 0) { ctx->sock[SP_LOCAL] = CURL_SOCKET_BAD; @@ -666,6 +887,7 @@ static CURLcode cf_msh3_connect(struct Curl_cfilter *cf, if(ctx->handshake_complete) { ctx->handshake_at = Curl_now(); if(ctx->handshake_succeeded) { + DEBUGF(LOG_CF(data, cf, "handshake succeeded")); cf->conn->bits.multiplex = TRUE; /* at least potentially multiplexed */ cf->conn->httpversion = 30; cf->conn->bundle->multiuse = BUNDLE_MULTIPLEX; @@ -682,26 +904,35 @@ static CURLcode cf_msh3_connect(struct Curl_cfilter *cf, } out: + CF_DATA_RESTORE(cf, save); return result; } static void cf_msh3_close(struct Curl_cfilter *cf, struct Curl_easy *data) { struct cf_msh3_ctx *ctx = cf->ctx; + struct cf_call_data save; (void)data; + CF_DATA_SAVE(save, cf, data); + if(ctx) { DEBUGF(LOG_CF(data, cf, "destroying")); - if(ctx->qconn) + if(ctx->qconn) { MsH3ConnectionClose(ctx->qconn); - if(ctx->api) + ctx->qconn = NULL; + } + if(ctx->api) { MsH3ApiClose(ctx->api); + ctx->api = NULL; + } if(ctx->active) { /* We share our socket at cf->conn->sock[cf->sockindex] when active. * If it is no longer there, someone has stolen (and hopefully * closed it) and we just forget about it. */ + ctx->active = FALSE; if(ctx->sock[SP_LOCAL] == cf->conn->sock[cf->sockindex]) { DEBUGF(LOG_CF(data, cf, "cf_msh3_close(%d) active", (int)ctx->sock[SP_LOCAL])); @@ -721,17 +952,22 @@ static void cf_msh3_close(struct Curl_cfilter *cf, struct Curl_easy *data) if(ctx->sock[SP_REMOTE] != CURL_SOCKET_BAD) { sclose(ctx->sock[SP_REMOTE]); } - memset(ctx, 0, sizeof(*ctx)); ctx->sock[SP_LOCAL] = CURL_SOCKET_BAD; ctx->sock[SP_REMOTE] = CURL_SOCKET_BAD; } + CF_DATA_RESTORE(cf, save); } static void cf_msh3_destroy(struct Curl_cfilter *cf, struct Curl_easy *data) { + struct cf_call_data save; + + CF_DATA_SAVE(save, cf, data); cf_msh3_close(cf, data); free(cf->ctx); cf->ctx = NULL; + /* no CF_DATA_RESTORE(cf, save); its gone */ + } static CURLcode cf_msh3_query(struct Curl_cfilter *cf, |