summaryrefslogtreecommitdiffstats
path: root/lib/mqtt.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mqtt.c')
-rw-r--r--lib/mqtt.c108
1 files changed, 70 insertions, 38 deletions
diff --git a/lib/mqtt.c b/lib/mqtt.c
index 30edf3d..54f8882 100644
--- a/lib/mqtt.c
+++ b/lib/mqtt.c
@@ -109,6 +109,7 @@ static CURLcode mqtt_setup_conn(struct Curl_easy *data,
mq = calloc(1, sizeof(struct MQTT));
if(!mq)
return CURLE_OUT_OF_MEMORY;
+ Curl_dyn_init(&mq->recvbuf, DYN_MQTT_RECV);
data->req.p.mqtt = mq;
return CURLE_OK;
}
@@ -295,12 +296,12 @@ static CURLcode mqtt_connect(struct Curl_easy *data)
/* set initial values for the CONNECT packet */
pos = init_connpack(packet, remain, remain_pos);
- result = Curl_rand_hex(data, (unsigned char *)&client_id[clen],
- MQTT_CLIENTID_LEN - clen + 1);
+ result = Curl_rand_alnum(data, (unsigned char *)&client_id[clen],
+ MQTT_CLIENTID_LEN - clen + 1);
/* add client id */
rc = add_client_id(client_id, strlen(client_id), packet, pos + 1);
if(rc) {
- failf(data, "Client ID length mismatched: [%lu]", strlen(client_id));
+ failf(data, "Client ID length mismatched: [%zu]", strlen(client_id));
result = CURLE_WEIRD_SERVER_REPLY;
goto end;
}
@@ -317,7 +318,7 @@ static CURLcode mqtt_connect(struct Curl_easy *data)
rc = add_user(username, ulen,
(unsigned char *)packet, start_user, remain_pos);
if(rc) {
- failf(data, "Username is too large: [%lu]", ulen);
+ failf(data, "Username is too large: [%zu]", ulen);
result = CURLE_WEIRD_SERVER_REPLY;
goto end;
}
@@ -327,7 +328,7 @@ static CURLcode mqtt_connect(struct Curl_easy *data)
if(plen) {
rc = add_passwd(passwd, plen, packet, start_pwd, remain_pos);
if(rc) {
- failf(data, "Password is too large: [%lu]", plen);
+ failf(data, "Password is too large: [%zu]", plen);
result = CURLE_WEIRD_SERVER_REPLY;
goto end;
}
@@ -350,36 +351,66 @@ static CURLcode mqtt_disconnect(struct Curl_easy *data)
struct MQTT *mq = data->req.p.mqtt;
result = mqtt_send(data, (char *)"\xe0\x00", 2);
Curl_safefree(mq->sendleftovers);
+ Curl_dyn_free(&mq->recvbuf);
return result;
}
-static CURLcode mqtt_verify_connack(struct Curl_easy *data)
+static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes)
{
+ struct MQTT *mq = data->req.p.mqtt;
+ size_t rlen = Curl_dyn_len(&mq->recvbuf);
CURLcode result;
- struct connectdata *conn = data->conn;
- curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
- unsigned char readbuf[MQTT_CONNACK_LEN];
- ssize_t nread;
- result = Curl_read(data, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
- if(result)
- goto fail;
+ if(rlen < nbytes) {
+ unsigned char readbuf[1024];
+ ssize_t nread;
- Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
+ DEBUGASSERT(nbytes - rlen < sizeof(readbuf));
+ result = Curl_read(data, data->conn->sock[FIRSTSOCKET],
+ (char *)readbuf, nbytes - rlen, &nread);
+ if(result)
+ return result;
+ DEBUGASSERT(nread >= 0);
+ if(Curl_dyn_addn(&mq->recvbuf, readbuf, (size_t)nread))
+ return CURLE_OUT_OF_MEMORY;
+ rlen = Curl_dyn_len(&mq->recvbuf);
+ }
+ return (rlen >= nbytes)? CURLE_OK : CURLE_AGAIN;
+}
- /* fixme */
- if(nread < MQTT_CONNACK_LEN) {
- result = CURLE_WEIRD_SERVER_REPLY;
+static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes)
+{
+ struct MQTT *mq = data->req.p.mqtt;
+ size_t rlen = Curl_dyn_len(&mq->recvbuf);
+ if(rlen <= nbytes)
+ Curl_dyn_reset(&mq->recvbuf);
+ else
+ Curl_dyn_tail(&mq->recvbuf, rlen - nbytes);
+}
+
+static CURLcode mqtt_verify_connack(struct Curl_easy *data)
+{
+ struct MQTT *mq = data->req.p.mqtt;
+ CURLcode result;
+ char *ptr;
+
+ result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN);
+ if(result)
goto fail;
- }
/* verify CONNACK */
- if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
+ DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN);
+ ptr = Curl_dyn_ptr(&mq->recvbuf);
+ Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_CONNACK_LEN);
+
+ if(ptr[0] != 0x00 || ptr[1] != 0x00) {
failf(data, "Expected %02x%02x but got %02x%02x",
- 0x00, 0x00, readbuf[0], readbuf[1]);
+ 0x00, 0x00, ptr[0], ptr[1]);
+ Curl_dyn_reset(&mq->recvbuf);
result = CURLE_WEIRD_SERVER_REPLY;
+ goto fail;
}
-
+ mqtt_recv_consume(data, MQTT_CONNACK_LEN);
fail:
return result;
}
@@ -452,31 +483,29 @@ fail:
*/
static CURLcode mqtt_verify_suback(struct Curl_easy *data)
{
- CURLcode result;
+ struct MQTT *mq = data->req.p.mqtt;
struct connectdata *conn = data->conn;
- curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
- unsigned char readbuf[MQTT_SUBACK_LEN];
- ssize_t nread;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
+ CURLcode result;
+ char *ptr;
- result = Curl_read(data, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
+ result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN);
if(result)
goto fail;
- Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
-
- /* fixme */
- if(nread < MQTT_SUBACK_LEN) {
+ /* verify SUBACK */
+ DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN);
+ ptr = Curl_dyn_ptr(&mq->recvbuf);
+ Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_SUBACK_LEN);
+
+ if(((unsigned char)ptr[0]) != ((mqtt->packetid >> 8) & 0xff) ||
+ ((unsigned char)ptr[1]) != (mqtt->packetid & 0xff) ||
+ ptr[2] != 0x00) {
+ Curl_dyn_reset(&mq->recvbuf);
result = CURLE_WEIRD_SERVER_REPLY;
goto fail;
}
-
- /* verify SUBACK */
- if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
- readbuf[1] != (mqtt->packetid & 0xff) ||
- readbuf[2] != 0x00)
- result = CURLE_WEIRD_SERVER_REPLY;
-
+ mqtt_recv_consume(data, MQTT_SUBACK_LEN);
fail:
return result;
}
@@ -668,7 +697,9 @@ MQTT_SUBACK_COMING:
mq->npacket -= nread;
k->bytecount += nread;
- Curl_pgrsSetDownloadCounter(data, k->bytecount);
+ result = Curl_pgrsSetDownloadCounter(data, k->bytecount);
+ if(result)
+ goto end;
/* if QoS is set, message contains packet id */
@@ -711,6 +742,7 @@ static CURLcode mqtt_done(struct Curl_easy *data,
(void)status;
(void)premature;
Curl_safefree(mq->sendleftovers);
+ Curl_dyn_free(&mq->recvbuf);
return CURLE_OK;
}