summaryrefslogtreecommitdiffstats
path: root/Utilities/cmcurl/lib/mqtt.c
diff options
context:
space:
mode:
Diffstat (limited to 'Utilities/cmcurl/lib/mqtt.c')
-rw-r--r--Utilities/cmcurl/lib/mqtt.c29
1 files changed, 14 insertions, 15 deletions
diff --git a/Utilities/cmcurl/lib/mqtt.c b/Utilities/cmcurl/lib/mqtt.c
index 366235c..5a9d6d0 100644
--- a/Utilities/cmcurl/lib/mqtt.c
+++ b/Utilities/cmcurl/lib/mqtt.c
@@ -88,7 +88,7 @@ const struct Curl_handler Curl_handler_mqtt = {
ZERO_NULL, /* domore_getsock */
ZERO_NULL, /* perform_getsock */
ZERO_NULL, /* disconnect */
- ZERO_NULL, /* readwrite */
+ ZERO_NULL, /* write_resp */
ZERO_NULL, /* connection_check */
ZERO_NULL, /* attach connection */
PORT_MQTT, /* defport */
@@ -524,8 +524,10 @@ static CURLcode mqtt_publish(struct Curl_easy *data)
char encodedbytes[4];
curl_off_t postfieldsize = data->set.postfieldsize;
- if(!payload)
+ if(!payload) {
+ DEBUGF(infof(data, "mqtt_publish without payload, return bad arg"));
return CURLE_BAD_FUNCTION_ARGUMENT;
+ }
if(postfieldsize < 0)
payloadlen = strlen(payload);
else
@@ -622,7 +624,6 @@ static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
struct connectdata *conn = data->conn;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
ssize_t nread;
- unsigned char *pkt = (unsigned char *)data->state.buffer;
size_t remlen;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
struct MQTT *mq = data->req.p.mqtt;
@@ -671,13 +672,14 @@ MQTT_SUBACK_COMING:
data->req.bytecount = 0;
data->req.size = remlen;
mq->npacket = remlen; /* get this many bytes */
- /* FALLTHROUGH */
+ FALLTHROUGH();
case MQTT_PUB_REMAIN: {
/* read rest of packet, but no more. Cap to buffer size */
+ char buffer[4*1024];
size_t rest = mq->npacket;
- if(rest > (size_t)data->set.buffer_size)
- rest = (size_t)data->set.buffer_size;
- result = Curl_read(data, sockfd, (char *)pkt, rest, &nread);
+ if(rest > sizeof(buffer))
+ rest = sizeof(buffer);
+ result = Curl_read(data, sockfd, buffer, rest, &nread);
if(result) {
if(CURLE_AGAIN == result) {
infof(data, "EEEE AAAAGAIN");
@@ -690,14 +692,12 @@ MQTT_SUBACK_COMING:
goto end;
}
- mq->npacket -= nread;
-
/* if QoS is set, message contains packet id */
-
- result = Curl_client_write(data, CLIENTWRITE_BODY, (char *)pkt, nread);
+ result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread);
if(result)
goto end;
+ mq->npacket -= nread;
if(!mq->npacket)
/* no more PUBLISH payload, back to subscribe wait state */
mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
@@ -745,7 +745,6 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
struct MQTT *mq = data->req.p.mqtt;
ssize_t nread;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
- unsigned char *pkt = (unsigned char *)data->state.buffer;
unsigned char byte;
*done = FALSE;
@@ -776,14 +775,14 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
/* remember the first byte */
mq->npacket = 0;
mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
- /* FALLTHROUGH */
+ FALLTHROUGH();
case MQTT_REMAINING_LENGTH:
do {
result = Curl_read(data, sockfd, (char *)&byte, 1, &nread);
if(!nread)
break;
Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
- pkt[mq->npacket++] = byte;
+ mq->pkt_hd[mq->npacket++] = byte;
} while((byte & 0x80) && (mq->npacket < 4));
if(nread && (byte & 0x80))
/* MQTT supports up to 127 * 128^0 + 127 * 128^1 + 127 * 128^2 +
@@ -791,7 +790,7 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
result = CURLE_WEIRD_SERVER_REPLY;
if(result)
break;
- mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
+ mq->remaining_length = mqtt_decode_len(mq->pkt_hd, mq->npacket, NULL);
mq->npacket = 0;
if(mq->remaining_length) {
mqstate(data, mqtt->nextstate, MQTT_NOSTATE);