diff options
Diffstat (limited to 'lib/mqtt.c')
-rw-r--r-- | lib/mqtt.c | 108 |
1 files changed, 70 insertions, 38 deletions
@@ -109,6 +109,7 @@ static CURLcode mqtt_setup_conn(struct Curl_easy *data, mq = calloc(1, sizeof(struct MQTT)); if(!mq) return CURLE_OUT_OF_MEMORY; + Curl_dyn_init(&mq->recvbuf, DYN_MQTT_RECV); data->req.p.mqtt = mq; return CURLE_OK; } @@ -295,12 +296,12 @@ static CURLcode mqtt_connect(struct Curl_easy *data) /* set initial values for the CONNECT packet */ pos = init_connpack(packet, remain, remain_pos); - result = Curl_rand_hex(data, (unsigned char *)&client_id[clen], - MQTT_CLIENTID_LEN - clen + 1); + result = Curl_rand_alnum(data, (unsigned char *)&client_id[clen], + MQTT_CLIENTID_LEN - clen + 1); /* add client id */ rc = add_client_id(client_id, strlen(client_id), packet, pos + 1); if(rc) { - failf(data, "Client ID length mismatched: [%lu]", strlen(client_id)); + failf(data, "Client ID length mismatched: [%zu]", strlen(client_id)); result = CURLE_WEIRD_SERVER_REPLY; goto end; } @@ -317,7 +318,7 @@ static CURLcode mqtt_connect(struct Curl_easy *data) rc = add_user(username, ulen, (unsigned char *)packet, start_user, remain_pos); if(rc) { - failf(data, "Username is too large: [%lu]", ulen); + failf(data, "Username is too large: [%zu]", ulen); result = CURLE_WEIRD_SERVER_REPLY; goto end; } @@ -327,7 +328,7 @@ static CURLcode mqtt_connect(struct Curl_easy *data) if(plen) { rc = add_passwd(passwd, plen, packet, start_pwd, remain_pos); if(rc) { - failf(data, "Password is too large: [%lu]", plen); + failf(data, "Password is too large: [%zu]", plen); result = CURLE_WEIRD_SERVER_REPLY; goto end; } @@ -350,36 +351,66 @@ static CURLcode mqtt_disconnect(struct Curl_easy *data) struct MQTT *mq = data->req.p.mqtt; result = mqtt_send(data, (char *)"\xe0\x00", 2); Curl_safefree(mq->sendleftovers); + Curl_dyn_free(&mq->recvbuf); return result; } -static CURLcode mqtt_verify_connack(struct Curl_easy *data) +static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes) { + struct MQTT *mq = data->req.p.mqtt; + size_t rlen = Curl_dyn_len(&mq->recvbuf); CURLcode result; - struct connectdata *conn = data->conn; - curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; - unsigned char readbuf[MQTT_CONNACK_LEN]; - ssize_t nread; - result = Curl_read(data, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread); - if(result) - goto fail; + if(rlen < nbytes) { + unsigned char readbuf[1024]; + ssize_t nread; - Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread); + DEBUGASSERT(nbytes - rlen < sizeof(readbuf)); + result = Curl_read(data, data->conn->sock[FIRSTSOCKET], + (char *)readbuf, nbytes - rlen, &nread); + if(result) + return result; + DEBUGASSERT(nread >= 0); + if(Curl_dyn_addn(&mq->recvbuf, readbuf, (size_t)nread)) + return CURLE_OUT_OF_MEMORY; + rlen = Curl_dyn_len(&mq->recvbuf); + } + return (rlen >= nbytes)? CURLE_OK : CURLE_AGAIN; +} - /* fixme */ - if(nread < MQTT_CONNACK_LEN) { - result = CURLE_WEIRD_SERVER_REPLY; +static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes) +{ + struct MQTT *mq = data->req.p.mqtt; + size_t rlen = Curl_dyn_len(&mq->recvbuf); + if(rlen <= nbytes) + Curl_dyn_reset(&mq->recvbuf); + else + Curl_dyn_tail(&mq->recvbuf, rlen - nbytes); +} + +static CURLcode mqtt_verify_connack(struct Curl_easy *data) +{ + struct MQTT *mq = data->req.p.mqtt; + CURLcode result; + char *ptr; + + result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN); + if(result) goto fail; - } /* verify CONNACK */ - if(readbuf[0] != 0x00 || readbuf[1] != 0x00) { + DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN); + ptr = Curl_dyn_ptr(&mq->recvbuf); + Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_CONNACK_LEN); + + if(ptr[0] != 0x00 || ptr[1] != 0x00) { failf(data, "Expected %02x%02x but got %02x%02x", - 0x00, 0x00, readbuf[0], readbuf[1]); + 0x00, 0x00, ptr[0], ptr[1]); + Curl_dyn_reset(&mq->recvbuf); result = CURLE_WEIRD_SERVER_REPLY; + goto fail; } - + mqtt_recv_consume(data, MQTT_CONNACK_LEN); fail: return result; } @@ -452,31 +483,29 @@ fail: */ static CURLcode mqtt_verify_suback(struct Curl_easy *data) { - CURLcode result; + struct MQTT *mq = data->req.p.mqtt; struct connectdata *conn = data->conn; - curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; - unsigned char readbuf[MQTT_SUBACK_LEN]; - ssize_t nread; struct mqtt_conn *mqtt = &conn->proto.mqtt; + CURLcode result; + char *ptr; - result = Curl_read(data, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread); + result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN); if(result) goto fail; - Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread); - - /* fixme */ - if(nread < MQTT_SUBACK_LEN) { + /* verify SUBACK */ + DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN); + ptr = Curl_dyn_ptr(&mq->recvbuf); + Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_SUBACK_LEN); + + if(((unsigned char)ptr[0]) != ((mqtt->packetid >> 8) & 0xff) || + ((unsigned char)ptr[1]) != (mqtt->packetid & 0xff) || + ptr[2] != 0x00) { + Curl_dyn_reset(&mq->recvbuf); result = CURLE_WEIRD_SERVER_REPLY; goto fail; } - - /* verify SUBACK */ - if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) || - readbuf[1] != (mqtt->packetid & 0xff) || - readbuf[2] != 0x00) - result = CURLE_WEIRD_SERVER_REPLY; - + mqtt_recv_consume(data, MQTT_SUBACK_LEN); fail: return result; } @@ -668,7 +697,9 @@ MQTT_SUBACK_COMING: mq->npacket -= nread; k->bytecount += nread; - Curl_pgrsSetDownloadCounter(data, k->bytecount); + result = Curl_pgrsSetDownloadCounter(data, k->bytecount); + if(result) + goto end; /* if QoS is set, message contains packet id */ @@ -711,6 +742,7 @@ static CURLcode mqtt_done(struct Curl_easy *data, (void)status; (void)premature; Curl_safefree(mq->sendleftovers); + Curl_dyn_free(&mq->recvbuf); return CURLE_OK; } |