From 10d09c2e94190d231a67714dc0c129d630248b86 Mon Sep 17 00:00:00 2001 From: maxxir_w Date: Sat, 6 Apr 2019 12:47:19 +0400 Subject: [PATCH] MQTT client testing is OK --- 22_m1284p_WIZNET_MQTT/.cproject | 8 +- 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTClient.c | 538 +++++++++++++++++++++ 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTClient.h | 104 ++++ .../Internet/MQTT/MQTTPacket/src/MQTTConnect.h | 129 +++++ .../MQTT/MQTTPacket/src/MQTTConnectClient.c | 217 +++++++++ .../MQTT/MQTTPacket/src/MQTTConnectServer.c | 153 ++++++ .../MQTT/MQTTPacket/src/MQTTDeserializePublish.c | 109 +++++ .../Internet/MQTT/MQTTPacket/src/MQTTFormat.c | 265 ++++++++++ .../Internet/MQTT/MQTTPacket/src/MQTTFormat.h | 37 ++ .../Internet/MQTT/MQTTPacket/src/MQTTPacket.c | 443 +++++++++++++++++ .../Internet/MQTT/MQTTPacket/src/MQTTPacket.h | 136 ++++++ .../Internet/MQTT/MQTTPacket/src/MQTTPublish.h | 38 ++ .../MQTT/MQTTPacket/src/MQTTSerializePublish.c | 171 +++++++ .../Internet/MQTT/MQTTPacket/src/MQTTSubscribe.h | 38 ++ .../MQTT/MQTTPacket/src/MQTTSubscribeClient.c | 140 ++++++ .../MQTT/MQTTPacket/src/MQTTSubscribeServer.c | 114 +++++ .../Internet/MQTT/MQTTPacket/src/MQTTUnsubscribe.h | 37 ++ .../MQTT/MQTTPacket/src/MQTTUnsubscribeClient.c | 107 ++++ .../MQTT/MQTTPacket/src/MQTTUnsubscribeServer.c | 100 ++++ .../Internet/MQTT/MQTTPacket/src/StackTrace.h | 78 +++ .../Internet/MQTT/mqtt_interface.c | 79 +++ .../Internet/MQTT/mqtt_interface.h | 42 ++ 22_m1284p_WIZNET_MQTT/globals.c | 3 + 22_m1284p_WIZNET_MQTT/globals.h | 3 + 22_m1284p_WIZNET_MQTT/main.c | 142 ++++-- 25 files changed, 3194 insertions(+), 37 deletions(-) create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTClient.c create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTClient.h create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTConnect.h create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTConnectClient.c create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTConnectServer.c create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTDeserializePublish.c create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTFormat.c create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTFormat.h create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTPacket.c create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTPacket.h create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTPublish.h create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSerializePublish.c create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSubscribe.h create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSubscribeClient.c create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSubscribeServer.c create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTUnsubscribe.h create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTUnsubscribeClient.c create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTUnsubscribeServer.c create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/StackTrace.h create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/mqtt_interface.c create mode 100644 22_m1284p_WIZNET_MQTT/Internet/MQTT/mqtt_interface.h diff --git a/22_m1284p_WIZNET_MQTT/.cproject b/22_m1284p_WIZNET_MQTT/.cproject index b5431c1..36c3f22 100644 --- a/22_m1284p_WIZNET_MQTT/.cproject +++ b/22_m1284p_WIZNET_MQTT/.cproject @@ -34,6 +34,8 @@ + + @@ -75,5 +77,9 @@ - + + + + + diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTClient.c b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTClient.c new file mode 100644 index 0000000..f28286e --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTClient.c @@ -0,0 +1,538 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ +#include "MQTTClient.h" +//#include + + +void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessgage) +{ + md->topicName = aTopicName; + md->message = aMessgage; +} + +int32_t getNextPacketId(Client *c) +{ + return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1; +} + +int32_t sendPacket(Client* c, int32_t length, Timer* timer) +{ + int32_t rc = FAILURE, sent = 0; + + while (sent < length && !expired(timer)) + { + rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, left_ms(timer)); + + if (rc < 0) // there was an error writing the data + break; + + sent += rc; + } + + if (sent == length) + { + countdown(&c->ping_timer, c->keepAliveInterval); // record the fact that we have SUCCESSSfully sent the packet + rc = SUCCESSS; + } + else + rc = FAILURE; + + return rc; +} + + +void MQTTClient(Client* c, Network* network, uint32_t command_timeout_ms, uint8_t* buf, size_t buf_size, uint8_t* readbuf, size_t readbuf_size) +{ + c->ipstack = network; + + for (int32_t i = 0; i < MAX_MESSAGE_HANDLERS; ++i) + c->messageHandlers[i].topicFilter = 0; + + c->command_timeout_ms = command_timeout_ms; + c->buf = buf; + c->buf_size = buf_size; + c->readbuf = readbuf; + c->readbuf_size = readbuf_size; + c->isconnected = 0; + c->ping_outstanding = 0; + c->defaultMessageHandler = NULL; + + InitTimer(&c->ping_timer); +} + + +int32_t decodePacket(Client* c, int32_t* value, int32_t timeout) +{ + uint8_t i; + int32_t multiplier = 1; + int32_t len = 0; + const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; + + *value = 0; + do + { + int32_t rc = MQTTPACKET_READ_ERROR; + + if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) + { + rc = MQTTPACKET_READ_ERROR; /* bad data */ + goto exit; + } + + rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout); + + if (rc != 1) + goto exit; + + *value += (i & 127) * multiplier; + multiplier *= 128; + } while ((i & 128) != 0); +exit: + return len; +} + +int32_t readPacket(Client* c, Timer* timer) +{ + int32_t rc = FAILURE; + MQTTHeader header = {0}; + int32_t len = 0; + int32_t rem_len = 0; + + /* 1. read the header byte. This has the packet type in it */ + if (c->ipstack->mqttread(c->ipstack, c->readbuf, 1, left_ms(timer)) != 1) + goto exit; + + len = 1; + /* 2. read the remaining length. This is variable in itself */ + decodePacket(c, &rem_len, left_ms(timer)); + len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ + + /* 3. read the rest of the buffer using a callback to supply the rest of the data */ + if (rem_len > 0 && (c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, left_ms(timer)) != rem_len)) + goto exit; + + header.byte = c->readbuf[0]; + rc = header.bits.type; +exit: + return rc; +} + +// assume topic filter and name is in correct format +// # can only be at end +// + and # can only be next to separator +char isTopicMatched(char* topicFilter, MQTTString* topicName) +{ + char* curf = topicFilter; + char* curn = topicName->lenstring.data; + char* curn_end = curn + topicName->lenstring.len; + + while (*curf && curn < curn_end) + { + if (*curn == '/' && *curf != '/') + break; + + if (*curf != '+' && *curf != '#' && *curf != *curn) + break; + + if (*curf == '+') + { // skip until we meet the next separator, or end of string + char* nextpos = curn + 1; + + while (nextpos < curn_end && *nextpos != '/') + nextpos = ++curn + 1; + } + else if (*curf == '#') + curn = curn_end - 1; // skip until end of string + + curf++; + curn++; + } + + return (curn == curn_end) && (*curf == '\0'); +} + +int32_t deliverMessage(Client* c, MQTTString* topicName, MQTTMessage* message) +{ + int32_t rc = FAILURE; + + // we have to find the right message handler - indexed by topic + for (int32_t i = 0; i < MAX_MESSAGE_HANDLERS; ++i) + { + if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) || + isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName))) + { + if (c->messageHandlers[i].fp != NULL) + { + MessageData md; + NewMessageData(&md, topicName, message); + c->messageHandlers[i].fp(&md); + rc = SUCCESSS; + } + } + } + + if (rc == FAILURE && c->defaultMessageHandler != NULL) + { + MessageData md; + NewMessageData(&md, topicName, message); + c->defaultMessageHandler(&md); + rc = SUCCESSS; + } + + return rc; +} + +int32_t keepalive(Client* c) +{ + int32_t rc = FAILURE; + + if (c->keepAliveInterval == 0) + { + rc = SUCCESSS; + goto exit; + } + + if (expired(&c->ping_timer)) + { + if (!c->ping_outstanding) + { + Timer timer; + InitTimer(&timer); + countdown_ms(&timer, 1000); + int32_t len = MQTTSerialize_pingreq(c->buf, c->buf_size); + + if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESSS) // send the ping packet + c->ping_outstanding = 1; + } + } + +exit: + return rc; +} + +int32_t cycle(Client* c, Timer* timer) +{ + // read the socket, see what work is due + uint16_t packet_type = readPacket(c, timer); + + int32_t len = 0, rc = SUCCESSS; + + switch (packet_type) + { + case CONNACK: + case PUBACK: + case SUBACK: + break; + case PUBLISH: + { + MQTTString topicName; + MQTTMessage msg; + + if (MQTTDeserialize_publish((uint8_t*)&msg.dup, (uint8_t*)&msg.qos, (uint8_t*)&msg.retained, (uint16_t*)&msg.id, &topicName, + (uint8_t**)&msg.payload, (int32_t*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1) + goto exit; + + deliverMessage(c, &topicName, &msg); + + if (msg.qos != QOS0) + { + if (msg.qos == QOS1) + len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id); + else if (msg.qos == QOS2) + len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id); + + if (len <= 0) + rc = FAILURE; + else + rc = sendPacket(c, len, timer); + + if (rc == FAILURE) + goto exit; // there was a problem + } + break; + } + case PUBREC: + { + uint16_t mypacketid; + uint8_t dup, type; + + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) + rc = FAILURE; + else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREL, 0, mypacketid)) <= 0) + rc = FAILURE; + else if ((rc = sendPacket(c, len, timer)) != SUCCESSS) // send the PUBREL packet + rc = FAILURE; // there was a problem + + if (rc == FAILURE) + goto exit; // there was a problem + + break; + } + case PUBCOMP: + break; + case PINGRESP: + c->ping_outstanding = 0; + break; + } + keepalive(c); +exit: + if (rc == SUCCESSS) + rc = packet_type; + + return rc; +} + +int32_t MQTTYield(Client* c, int32_t timeout_ms) +{ + int32_t rc = SUCCESSS; + Timer timer; + + InitTimer(&timer); + countdown_ms(&timer, timeout_ms); + + while (!expired(&timer)) + { + if (cycle(c, &timer) == FAILURE) + { + rc = FAILURE; + break; + } + } + + return rc; +} + +// only used in single-threaded mode where one command at a time is in process +int32_t waitfor(Client* c, int32_t packet_type, Timer* timer) +{ + int32_t rc = FAILURE; + + do + { + if (expired(timer)) + break; // we timed out + } + while ((rc = cycle(c, timer)) != packet_type); + + return rc; +} + +int32_t MQTTConnect(Client* c, MQTTPacket_connectData* options) +{ + Timer connect_timer; + int32_t rc = FAILURE; + MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; + int32_t len = 0; + + InitTimer(&connect_timer); + countdown_ms(&connect_timer, c->command_timeout_ms); + + if (c->isconnected) // don't send connect packet again if we are already connected + goto exit; + + if (options == 0) + options = &default_options; // set default options if none were supplied + + c->keepAliveInterval = options->keepAliveInterval; + countdown(&c->ping_timer, c->keepAliveInterval); + + if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0) + goto exit; + + if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESSS) // send the connect packet + goto exit; // there was a problem + + // this will be a blocking call, wait for the connack + if (waitfor(c, CONNACK, &connect_timer) == CONNACK) + { + uint8_t connack_rc = 255; + int8_t sessionPresent = 0; + + if (MQTTDeserialize_connack((uint8_t*)&sessionPresent, &connack_rc, c->readbuf, c->readbuf_size) == 1) + rc = connack_rc; + else + rc = FAILURE; + } + else + rc = FAILURE; + +exit: + if (rc == SUCCESSS) + c->isconnected = 1; + + return rc; +} + + +int32_t MQTTSubscribe(Client* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler) +{ + int32_t rc = FAILURE; + Timer timer; + int32_t len = 0; + MQTTString topic = MQTTString_initializer; + topic.cstring = (char *)topicFilter; + + InitTimer(&timer); + countdown_ms(&timer, c->command_timeout_ms); + + if (!c->isconnected) + goto exit; + + len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int32_t*)&qos); + + if (len <= 0) + goto exit; + + if ((rc = sendPacket(c, len, &timer)) != SUCCESSS) // send the subscribe packet + goto exit; // there was a problem + + if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback + { + int32_t count = 0, grantedQoS = -1; + uint16_t mypacketid; + + if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, c->readbuf, c->readbuf_size) == 1) + rc = grantedQoS; // 0, 1, 2 or 0x80 + + if (rc != 0x80) + { + for (int32_t i = 0; i < MAX_MESSAGE_HANDLERS; ++i) + { + if (c->messageHandlers[i].topicFilter == 0) + { + c->messageHandlers[i].topicFilter = topicFilter; + c->messageHandlers[i].fp = messageHandler; + rc = 0; + break; + } + } + } + } + else + rc = FAILURE; + +exit: + return rc; +} + +int32_t MQTTUnsubscribe(Client* c, const char* topicFilter) +{ + int32_t rc = FAILURE; + Timer timer; + MQTTString topic = MQTTString_initializer; + topic.cstring = (char *)topicFilter; + int32_t len = 0; + + InitTimer(&timer); + countdown_ms(&timer, c->command_timeout_ms); + + if (!c->isconnected) + goto exit; + + if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0) + goto exit; + + if ((rc = sendPacket(c, len, &timer)) != SUCCESSS) // send the subscribe packet + goto exit; // there was a problem + + if (waitfor(c, UNSUBACK, &timer) == UNSUBACK) + { + uint16_t mypacketid; // should be the same as the packetid above + + if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1) + rc = 0; + } + else + rc = FAILURE; + +exit: + return rc; +} + +int32_t MQTTPublish(Client* c, const char* topicName, MQTTMessage* message) +{ + int32_t rc = FAILURE; + Timer timer; + MQTTString topic = MQTTString_initializer; + topic.cstring = (char *)topicName; + int32_t len = 0; + + InitTimer(&timer); + countdown_ms(&timer, c->command_timeout_ms); + + if (!c->isconnected) + goto exit; + + if (message->qos == QOS1 || message->qos == QOS2) + message->id = getNextPacketId(c); + + len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id, + topic, (uint8_t*)message->payload, message->payloadlen); + + if (len <= 0) + goto exit; + + if ((rc = sendPacket(c, len, &timer)) != SUCCESSS) // send the subscribe packet + goto exit; // there was a problem + + if (message->qos == QOS1) + { + if (waitfor(c, PUBACK, &timer) == PUBACK) + { + uint16_t mypacketid; + uint8_t dup, type; + + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) + rc = FAILURE; + } + else + rc = FAILURE; + } + else if (message->qos == QOS2) + { + if (waitfor(c, PUBCOMP, &timer) == PUBCOMP) + { + uint16_t mypacketid; + uint8_t dup, type; + + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) + rc = FAILURE; + } + else + rc = FAILURE; + } + +exit: + return rc; +} + + +int32_t MQTTDisconnect(Client* c) +{ + int32_t rc = FAILURE; + Timer timer; // we might wait for incomplete incoming publishes to complete + int32_t len = MQTTSerialize_disconnect(c->buf, c->buf_size); + + InitTimer(&timer); + countdown_ms(&timer, c->command_timeout_ms); + + if (len > 0) + rc = sendPacket(c, len, &timer); // send the disconnect packet + + c->isconnected = 0; + return rc; +} + diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTClient.h b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTClient.h new file mode 100644 index 0000000..3b0ed88 --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTClient.h @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#ifndef __MQTT_CLIENT_C_ +#define __MQTT_CLIENT_C_ + +#include "MQTTPacket.h" +#include "mqtt_interface.h" //Platform specific implementation header file +#include "stdio.h" + +#define MAX_PACKET_ID 65535 +#define MAX_MESSAGE_HANDLERS 5 + +enum QoS +{ + QOS0, + QOS1, + QOS2 +}; + +// all failure return codes must be negative +enum returnCode +{ + BUFFER_OVERFLOW = -2, + FAILURE = -1, + SUCCESSS = 0 +}; + +void NewTimer(Timer*); + +typedef struct MQTTMessage MQTTMessage; + +typedef struct MessageData MessageData; + +struct MQTTMessage +{ + enum QoS qos; + char retained; + char dup; + uint16_t id; + void *payload; + size_t payloadlen; +}; + +struct MessageData +{ + MQTTMessage* message; + MQTTString* topicName; +}; + +typedef void (*messageHandler)(MessageData*); + +typedef struct Client Client; + +int32_t MQTTConnect (Client*, MQTTPacket_connectData*); +int32_t MQTTPublish (Client*, const char*, MQTTMessage*); +int32_t MQTTSubscribe (Client*, const char*, enum QoS, messageHandler); +int32_t MQTTUnsubscribe (Client*, const char*); +int32_t MQTTDisconnect (Client*); +int32_t MQTTYield (Client*, int32_t); + +void setDefaultMessageHandler(Client*, messageHandler); + +void MQTTClient(Client*, Network*, uint32_t, unsigned char*, size_t, unsigned char*, size_t); + +struct Client +{ + uint32_t next_packetid; + uint32_t command_timeout_ms; + size_t buf_size, readbuf_size; + uint8_t *buf; + uint8_t *readbuf; + uint32_t keepAliveInterval; + int8_t ping_outstanding; + int32_t isconnected; + + struct MessageHandlers + { + const char* topicFilter; + void (*fp) (MessageData*); + } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic + + void (*defaultMessageHandler) (MessageData*); + + Network* ipstack; + Timer ping_timer; +}; + +#define DefaultClient {0, 0, 0, 0, NULL, NULL, 0, 0, 0} + +#endif diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTConnect.h b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTConnect.h new file mode 100644 index 0000000..0902137 --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTConnect.h @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Xiang Rong - 442039 Add makefile to Embedded C client + *******************************************************************************/ + +#ifndef MQTTCONNECT_H_ +#define MQTTCONNECT_H_ + +#include + +#if !defined(DLLImport) + #define DLLImport +#endif +#if !defined(DLLExport) + #define DLLExport +#endif + + +typedef union +{ + uint8_t all; /**< all connect flags */ +#if defined(REVERSED) + struct + { + uint8_t username : 1; /**< 3.1 user name */ + uint8_t password : 1; /**< 3.1 password */ + uint8_t willRetain : 1; /**< will retain setting */ + uint8_t willQoS : 2; /**< will QoS value */ + uint8_t will : 1; /**< will flag */ + uint8_t cleansession: 1; /**< clean session flag */ + uint8_t : 1; /**< unused */ + } bits; +#else + struct + { + uint8_t : 1; /**< unused */ + uint8_t cleansession: 1; /**< cleansession flag */ + uint8_t will : 1; /**< will flag */ + uint8_t willQoS : 2; /**< will QoS value */ + uint8_t willRetain : 1; /**< will retain setting */ + uint8_t password : 1; /**< 3.1 password */ + uint8_t username : 1; /**< 3.1 user name */ + } bits; +#endif +} MQTTConnectFlags; /**< connect flags byte */ + + + +/** Defines the MQTT "Last Will and Testament" (LWT) settings for the connect packet. */ +typedef struct +{ + /** The eyecatcher for this structure. must be MQTW. */ + int8_t struct_id[4]; + /** The version number of this structure. Must be 0 */ + int16_t struct_version; + /** The LWT topic to which the LWT message will be published. */ + MQTTString topicName; + /** The LWT payload. */ + MQTTString message; + /** The retained flag for the LWT message (see MQTTAsync_message.retained). */ + uint8_t retained; + /** The quality of service setting for the LWT message (see MQTTAsync_message.qos and @ref qos). */ + int8_t qos; +} MQTTPacket_willOptions; + + +#define MQTTPacket_willOptions_initializer { {'M', 'Q', 'T', 'W'}, 0, {NULL, {0, NULL}}, {NULL, {0, NULL}}, 0, 0 } + + +typedef struct +{ + /** The eyecatcher for this structure. must be MQTC. */ + int8_t struct_id[4]; + /** The version number of this structure. Must be 0 */ + uint16_t struct_version; + /** Version of MQTT to be used. 3 = 3.1 4 = 3.1.1 */ + uint8_t MQTTVersion; + MQTTString clientID; + uint16_t keepAliveInterval; + uint8_t cleansession; + uint8_t willFlag; + MQTTPacket_willOptions will; + MQTTString username; + MQTTString password; +} MQTTPacket_connectData; + +typedef union +{ + unsigned char all; /**< all connack flags */ +#if defined(REVERSED) + struct + { + uint8_t sessionpresent : 1; /**< session present flag */ + uint8_t : 7; /**< unused */ + } bits; +#else + struct + { + uint8_t : 7; /**< unused */ + uint8_t sessionpresent : 1; /**< session present flag */ + } bits; +#endif +} MQTTConnackFlags; /**< connack flags byte */ + +#define MQTTPacket_connectData_initializer { {'M', 'Q', 'T', 'C'}, 0, 4, {NULL, {0, NULL}}, 60, 1, 0, \ + MQTTPacket_willOptions_initializer, {NULL, {0, NULL}}, {NULL, {0, NULL}} } + +DLLExport int32_t MQTTSerialize_connect(uint8_t* buf, int32_t buflen, MQTTPacket_connectData* options); +DLLExport int32_t MQTTDeserialize_connect(MQTTPacket_connectData* data, uint8_t* buf, int32_t len); + +DLLExport int32_t MQTTSerialize_connack(uint8_t* buf, int32_t buflen, uint8_t connack_rc, uint8_t sessionPresent); +DLLExport int32_t MQTTDeserialize_connack(uint8_t* sessionPresent, uint8_t* connack_rc, uint8_t* buf, int32_t buflen); + +DLLExport int32_t MQTTSerialize_disconnect(uint8_t* buf, int32_t buflen); +DLLExport int32_t MQTTSerialize_pingreq(uint8_t* buf, int32_t buflen); + +#endif /* MQTTCONNECT_H_ */ diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTConnectClient.c b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTConnectClient.c new file mode 100644 index 0000000..178e01e --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTConnectClient.c @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "MQTTPacket.h" +#include "StackTrace.h" + +#include + +/** + * Determines the length of the MQTT connect packet that would be produced using the supplied connect options. + * @param options the options to be used to build the connect packet + * @return the length of buffer needed to contain the serialized version of the packet + */ +int32_t MQTTSerialize_connectLength(MQTTPacket_connectData* options) +{ + int32_t len = 0; + + FUNC_ENTRY; + + if (options->MQTTVersion == 3) + len = 12; /* variable depending on MQTT or MQIsdp */ + else if (options->MQTTVersion == 4) + len = 10; + + len += MQTTstrlen(options->clientID) + 2; + if (options->willFlag) + len += MQTTstrlen(options->will.topicName) + 2 + MQTTstrlen(options->will.message) + 2; + if (options->username.cstring || options->username.lenstring.data) + len += MQTTstrlen(options->username)+2; + if (options->password.cstring || options->password.lenstring.data) + len += MQTTstrlen(options->password)+2; + + FUNC_EXIT_RC(len); + return len; +} + + +/** + * Serializes the connect options into the buffer. + * @param buf the buffer into which the packet will be serialized + * @param len the length in bytes of the supplied buffer + * @param options the options to be used to build the connect packet + * @return serialized length, or error if 0 + */ +int32_t MQTTSerialize_connect(uint8_t* buf, int32_t buflen, MQTTPacket_connectData* options) +{ + uint8_t *ptr = buf; + MQTTHeader header = {0}; + MQTTConnectFlags flags = {0}; + int32_t len = 0; + int32_t rc = -1; + + FUNC_ENTRY; + if (MQTTPacket_len(len = MQTTSerialize_connectLength(options)) > buflen) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + + header.byte = 0; + header.bits.type = CONNECT; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, len); /* write remaining length */ + + if (options->MQTTVersion == 4) + { + writeCString(&ptr, "MQTT"); + writeChar(&ptr, (char) 4); + } + else + { + writeCString(&ptr, "MQIsdp"); + writeChar(&ptr, (char) 3); + } + + flags.all = 0; + flags.bits.cleansession = options->cleansession; + flags.bits.will = (options->willFlag) ? 1 : 0; + + if (flags.bits.will) + { + flags.bits.willQoS = options->will.qos; + flags.bits.willRetain = options->will.retained; + } + + if (options->username.cstring || options->username.lenstring.data) + flags.bits.username = 1; + if (options->password.cstring || options->password.lenstring.data) + flags.bits.password = 1; + + writeChar(&ptr, flags.all); + writeInt(&ptr, options->keepAliveInterval); + writeMQTTString(&ptr, options->clientID); + + if (options->willFlag) + { + writeMQTTString(&ptr, options->will.topicName); + writeMQTTString(&ptr, options->will.message); + } + + if (flags.bits.username) + writeMQTTString(&ptr, options->username); + if (flags.bits.password) + writeMQTTString(&ptr, options->password); + + rc = ptr - buf; + + exit: FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Deserializes the supplied (wire) buffer into connack data - return code + * @param sessionPresent the session present flag returned (only for MQTT 3.1.1) + * @param connack_rc returned integer value of the connack return code + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param len the length in bytes of the data in the supplied buffer + * @return error code. 1 is success, 0 is failure + */ +int32_t MQTTDeserialize_connack(uint8_t* sessionPresent, uint8_t* connack_rc, uint8_t* buf, int32_t buflen) +{ + MQTTHeader header = {0}; + uint8_t* curdata = buf; + uint8_t* enddata = NULL; + int32_t rc = 0; + int32_t mylen; + MQTTConnackFlags flags = {0}; + + FUNC_ENTRY; + header.byte = readChar(&curdata); + if (header.bits.type != CONNACK) + goto exit; + + curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ + enddata = curdata + mylen; + if (enddata - curdata < 2) + goto exit; + + flags.all = readChar(&curdata); + *sessionPresent = flags.bits.sessionpresent; + *connack_rc = readChar(&curdata); + + rc = 1; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Serializes a 0-length packet into the supplied buffer, ready for writing to a socket + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer, to avoid overruns + * @param packettype the message type + * @return serialized length, or error if 0 + */ +int32_t MQTTSerialize_zero(uint8_t* buf, int32_t buflen, uint8_t packettype) +{ + MQTTHeader header = {0}; + int32_t rc = -1; + uint8_t *ptr = buf; + + FUNC_ENTRY; + if (buflen < 2) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + header.byte = 0; + header.bits.type = packettype; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, 0); /* write remaining length */ + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Serializes a disconnect packet into the supplied buffer, ready for writing to a socket + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer, to avoid overruns + * @return serialized length, or error if 0 + */ +int32_t MQTTSerialize_disconnect(uint8_t* buf, int32_t buflen) +{ + return MQTTSerialize_zero(buf, buflen, DISCONNECT); +} + + +/** + * Serializes a disconnect packet into the supplied buffer, ready for writing to a socket + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer, to avoid overruns + * @return serialized length, or error if 0 + */ +int32_t MQTTSerialize_pingreq(uint8_t* buf, int32_t buflen) +{ + return MQTTSerialize_zero(buf, buflen, PINGREQ); +} diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTConnectServer.c b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTConnectServer.c new file mode 100644 index 0000000..c4a0edb --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTConnectServer.c @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "StackTrace.h" +#include "MQTTPacket.h" +#include + +#define min(a, b) ((a < b) ? a : b) + + +/** + * Validates MQTT protocol name and version combinations + * @param protocol the MQTT protocol name as an MQTTString + * @param version the MQTT protocol version number, as in the connect packet + * @return correct MQTT combination? 1 is true, 0 is false + */ +int32_t MQTTPacket_checkVersion(MQTTString* protocol, int32_t version) +{ + int32_t rc = 0; + + if (version == 3 && memcmp(protocol->lenstring.data, "MQIsdp", min(6, protocol->lenstring.len)) == 0) + rc = 1; + else if (version == 4 && memcmp(protocol->lenstring.data, "MQTT", min(4, protocol->lenstring.len)) == 0) + rc = 1; + + return rc; +} + + +/** + * Deserializes the supplied (wire) buffer into connect data structure + * @param data the connect data structure to be filled out + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param len the length in bytes of the data in the supplied buffer + * @return error code. 1 is success, 0 is failure + */ +int32_t MQTTDeserialize_connect(MQTTPacket_connectData* data, uint8_t* buf, int32_t len) +{ + MQTTHeader header = {0}; + MQTTConnectFlags flags = {0}; + uint8_t* curdata = buf; + uint8_t* enddata = &buf[len]; + int32_t rc = 0; + MQTTString Protocol; + int32_t version; + int32_t mylen = 0; + + FUNC_ENTRY; + header.byte = readChar(&curdata); + if (header.bits.type != CONNECT) + goto exit; + + curdata += MQTTPacket_decodeBuf(curdata, &mylen); /* read remaining length */ + + /* do we have enough data to read the protocol version byte? */ + if (!readMQTTLenString(&Protocol, &curdata, enddata) || enddata - curdata < 0) + goto exit; + + version = (int)readChar(&curdata); /* Protocol version */ + + /* If we don't recognize the protocol version, we don't parse the connect packet on the + * basis that we don't know what the format will be. */ + if (MQTTPacket_checkVersion(&Protocol, version)) + { + flags.all = readChar(&curdata); + data->cleansession = flags.bits.cleansession; + data->keepAliveInterval = readInt(&curdata); + + if (!readMQTTLenString(&data->clientID, &curdata, enddata)) + goto exit; + + data->willFlag = flags.bits.will; + if (flags.bits.will) + { + data->will.qos = flags.bits.willQoS; + data->will.retained = flags.bits.willRetain; + + if (!readMQTTLenString(&data->will.topicName, &curdata, enddata) || + !readMQTTLenString(&data->will.message, &curdata, enddata)) + goto exit; + } + + if (flags.bits.username) + { + if (enddata - curdata < 3 || !readMQTTLenString(&data->username, &curdata, enddata)) + goto exit; /* username flag set, but no username supplied - invalid */ + if (flags.bits.password && + (enddata - curdata < 3 || !readMQTTLenString(&data->password, &curdata, enddata))) + goto exit; /* password flag set, but no password supplied - invalid */ + } + else if (flags.bits.password) + goto exit; /* password flag set without username - invalid */ + + rc = 1; + } +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Serializes the connack packet into the supplied buffer. + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param connack_rc the integer connack return code to be used + * @param sessionPresent the MQTT 3.1.1 sessionPresent flag + * @return serialized length, or error if 0 + */ +int32_t MQTTSerialize_connack(uint8_t* buf, int32_t buflen, uint8_t connack_rc, uint8_t sessionPresent) +{ + MQTTHeader header = {0}; + int32_t rc = 0; + uint8_t *ptr = buf; + MQTTConnackFlags flags = {0}; + + FUNC_ENTRY; + if (buflen < 2) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + + header.byte = 0; + header.bits.type = CONNACK; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */ + + flags.all = 0; + flags.bits.sessionpresent = sessionPresent; + writeChar(&ptr, flags.all); + writeChar(&ptr, connack_rc); + + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTDeserializePublish.c b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTDeserializePublish.c new file mode 100644 index 0000000..8d3445e --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTDeserializePublish.c @@ -0,0 +1,109 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "StackTrace.h" +#include "MQTTPacket.h" +#include + +#define min(a, b) ((a < b) ? 1 : 0) + +/** + * Deserializes the supplied (wire) buffer into publish data + * @param dup returned integer - the MQTT dup flag + * @param qos returned integer - the MQTT QoS value + * @param retained returned integer - the MQTT retained flag + * @param packetid returned integer - the MQTT packet identifier + * @param topicName returned MQTTString - the MQTT topic in the publish + * @param payload returned byte buffer - the MQTT publish payload + * @param payloadlen returned integer - the length of the MQTT payload + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @return error code. 1 is success + */ +int32_t MQTTDeserialize_publish(uint8_t* dup, uint8_t* qos, uint8_t* retained, uint16_t* packetid, MQTTString* topicName, + uint8_t** payload, int32_t* payloadlen, uint8_t* buf, int32_t buflen) +{ + MQTTHeader header = {0}; + uint8_t* curdata = buf; + uint8_t* enddata = NULL; + int32_t rc = 0; + int32_t mylen = 0; + + FUNC_ENTRY; + header.byte = readChar(&curdata); + + if (header.bits.type != PUBLISH) + goto exit; + + *dup = header.bits.dup; + *qos = header.bits.qos; + *retained = header.bits.retain; + + curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ + enddata = curdata + mylen; + + /* do we have enough data to read the protocol version byte? */ + if (!readMQTTLenString(topicName, &curdata, enddata) || enddata - curdata < 0) + goto exit; + + if (*qos > 0) + *packetid = readInt(&curdata); + + *payloadlen = enddata - curdata; + *payload = curdata; + rc = 1; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Deserializes the supplied (wire) buffer into an ack + * @param packettype returned integer - the MQTT packet type + * @param dup returned integer - the MQTT dup flag + * @param packetid returned integer - the MQTT packet identifier + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @return error code. 1 is success, 0 is failure + */ +int32_t MQTTDeserialize_ack(uint8_t* packettype, uint8_t* dup, uint16_t* packetid, uint8_t* buf, int32_t buflen) +{ + MQTTHeader header = {0}; + uint8_t* curdata = buf; + uint8_t* enddata = NULL; + int32_t rc = 0; + int32_t mylen; + + FUNC_ENTRY; + header.byte = readChar(&curdata); + *dup = header.bits.dup; + *packettype = header.bits.type; + + curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ + enddata = curdata + mylen; + + if (enddata - curdata < 2) + goto exit; + + *packetid = readInt(&curdata); + + rc = 1; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTFormat.c b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTFormat.c new file mode 100644 index 0000000..686fef7 --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTFormat.c @@ -0,0 +1,265 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "StackTrace.h" +#include "MQTTPacket.h" + +#include + + +const char* MQTTPacket_names[] = +{ + "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", + "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", + "PINGRESP", "DISCONNECT" +}; + + +const char* MQTTPacket_getName(uint16_t packetid) +{ + return MQTTPacket_names[packetid]; +} + + +int32_t MQTTStringFormat_connect(char* strbuf, int32_t strbuflen, MQTTPacket_connectData* data) +{ + int32_t strindex = 0; + + strindex = snprintf(strbuf, strbuflen, + "CONNECT MQTT version %d, client id %.*s, clean session %d, keep alive %d", + (int32_t)data->MQTTVersion, data->clientID.lenstring.len, data->clientID.lenstring.data, + (int32_t)data->cleansession, data->keepAliveInterval); + if (data->willFlag) + strindex += snprintf(&strbuf[strindex], strbuflen - strindex, + ", will QoS %d, will retain %d, will topic %.*s, will message %.*s", + data->will.qos, data->will.retained, + data->will.topicName.lenstring.len, data->will.topicName.lenstring.data, + data->will.message.lenstring.len, data->will.message.lenstring.data); + if (data->username.lenstring.data && data->username.lenstring.len > 0) + strindex += snprintf(&strbuf[strindex], strbuflen - strindex, + ", user name %.*s", data->username.lenstring.len, data->username.lenstring.data); + if (data->password.lenstring.data && data->password.lenstring.len > 0) + strindex += snprintf(&strbuf[strindex], strbuflen - strindex, + ", password %.*s", data->password.lenstring.len, data->password.lenstring.data); + return strindex; +} + + +int32_t MQTTStringFormat_connack(char* strbuf, int32_t strbuflen, uint8_t connack_rc, uint8_t sessionPresent) +{ + int32_t strindex = snprintf(strbuf, strbuflen, "CONNACK session present %d, rc %d", sessionPresent, connack_rc); + return strindex; +} + + +int32_t MQTTStringFormat_publish(char* strbuf, int32_t strbuflen, uint8_t dup, uint8_t qos, uint8_t retained, + uint16_t packetid, MQTTString topicName, uint8_t* payload, int32_t payloadlen) +{ + int32_t strindex = snprintf(strbuf, strbuflen, + "PUBLISH dup %d, QoS %d, retained %d, packet id %d, topic %.*s, payload length %d, payload %.*s", + dup, qos, retained, packetid, + (topicName.lenstring.len < 20) ? topicName.lenstring.len : 20, topicName.lenstring.data, + payloadlen, (payloadlen < 20) ? payloadlen : 20, payload); + return strindex; +} + + +int32_t MQTTStringFormat_ack(char* strbuf, int32_t strbuflen, uint8_t packettype, uint8_t dup, uint16_t packetid) +{ + int32_t strindex = snprintf(strbuf, strbuflen, "%s, packet id %d", MQTTPacket_names[packettype], packetid); + + if (dup) + strindex += snprintf(strbuf + strindex, strbuflen - strindex, ", dup %d", dup); + + return strindex; +} + + +int32_t MQTTStringFormat_subscribe(char* strbuf, int32_t strbuflen, uint8_t dup, uint16_t packetid, int32_t count, + MQTTString topicFilters[], int32_t requestedQoSs[]) +{ + return snprintf(strbuf, strbuflen, "SUBSCRIBE dup %d, packet id %d count %d topic %.*s qos %d", + dup, packetid, count, topicFilters[0].lenstring.len, topicFilters[0].lenstring.data, + requestedQoSs[0]); +} + + +int32_t MQTTStringFormat_suback(char* strbuf, int32_t strbuflen, uint16_t packetid, int32_t count, int32_t* grantedQoSs) +{ + return snprintf(strbuf, strbuflen, + "SUBACK packet id %d count %d granted qos %d", packetid, count, grantedQoSs[0]); +} + + +int32_t MQTTStringFormat_unsubscribe(char* strbuf, int32_t strbuflen, uint8_t dup, uint16_t packetid, + int32_t count, MQTTString topicFilters[]) +{ + return snprintf(strbuf, strbuflen, "UNSUBSCRIBE dup %d, packet id %d count %d topic %.*s", + dup, packetid, count, topicFilters[0].lenstring.len, topicFilters[0].lenstring.data); +} + + +char* MQTTFormat_toClientString(char* strbuf, int32_t strbuflen, uint8_t* buf, int32_t buflen) +{ + int32_t index = 0; + int32_t rem_length = 0; + MQTTHeader header = {0}; + int32_t strindex = 0; + + header.byte = buf[index++]; + index += MQTTPacket_decodeBuf(&buf[index], &rem_length); + + switch (header.bits.type) + { + case CONNACK: + { + uint8_t sessionPresent, connack_rc; + if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) == 1) + strindex = MQTTStringFormat_connack(strbuf, strbuflen, connack_rc, sessionPresent); + } + break; + case PUBLISH: + { + uint8_t dup, retained, *payload; + uint16_t packetid; + uint8_t qos; + int32_t payloadlen; + MQTTString topicName = MQTTString_initializer; + + if (MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName, + &payload, &payloadlen, buf, buflen) == 1) + strindex = MQTTStringFormat_publish(strbuf, strbuflen, dup, qos, retained, + packetid, topicName, payload, payloadlen); + } + break; + case PUBACK: + case PUBREC: + case PUBREL: + case PUBCOMP: + { + uint8_t packettype, dup; + uint16_t packetid; + if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf, buflen) == 1) + strindex = MQTTStringFormat_ack(strbuf, strbuflen, packettype, dup, packetid); + } + break; + case SUBACK: + { + uint16_t packetid; + int32_t maxcount = 1, count = 0; + int32_t grantedQoSs[1]; + if (MQTTDeserialize_suback(&packetid, maxcount, &count, grantedQoSs, buf, buflen) == 1) + strindex = MQTTStringFormat_suback(strbuf, strbuflen, packetid, count, grantedQoSs); + } + break; + case UNSUBACK: + { + uint16_t packetid; + if (MQTTDeserialize_unsuback(&packetid, buf, buflen) == 1) + strindex = MQTTStringFormat_ack(strbuf, strbuflen, UNSUBACK, 0, packetid); + } + break; + case PINGREQ: + case PINGRESP: + case DISCONNECT: + strindex = snprintf(strbuf, strbuflen, "%s", MQTTPacket_names[header.bits.type]); + break; + } + + return strbuf; +} + + +char* MQTTFormat_toServerString(char* strbuf, int32_t strbuflen, uint8_t* buf, int32_t buflen) +{ + int32_t index = 0; + int32_t rem_length = 0; + MQTTHeader header = {0}; + int32_t strindex = 0; + + header.byte = buf[index++]; + index += MQTTPacket_decodeBuf(&buf[index], &rem_length); + + switch (header.bits.type) + { + case CONNECT: + { + MQTTPacket_connectData data; + int32_t rc; + + if ((rc = MQTTDeserialize_connect(&data, buf, buflen)) == 1) + strindex = MQTTStringFormat_connect(strbuf, strbuflen, &data); + } + break; + case PUBLISH: + { + uint8_t dup, retained, *payload; + uint16_t packetid; + uint8_t qos; + int32_t payloadlen; + MQTTString topicName = MQTTString_initializer; + + if (MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName, + &payload, &payloadlen, buf, buflen) == 1) + strindex = MQTTStringFormat_publish(strbuf, strbuflen, dup, qos, retained, + packetid, topicName, payload, payloadlen); + } + break; + case PUBACK: + case PUBREC: + case PUBREL: + case PUBCOMP: + { + uint8_t packettype, dup; + uint16_t packetid; + + if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf, buflen) == 1) + strindex = MQTTStringFormat_ack(strbuf, strbuflen, packettype, dup, packetid); + } + break; + case SUBSCRIBE: + { + uint8_t dup; + uint16_t packetid; + int32_t maxcount = 1, count = 0; + MQTTString topicFilters[1]; + int32_t requestedQoSs[1]; + + if (MQTTDeserialize_subscribe(&dup, &packetid, maxcount, &count, + topicFilters, requestedQoSs, buf, buflen) == 1) + strindex = MQTTStringFormat_subscribe(strbuf, strbuflen, dup, packetid, count, topicFilters, requestedQoSs);; + } + break; + case UNSUBSCRIBE: + { + uint8_t dup; + uint16_t packetid; + int32_t maxcount = 1, count = 0; + MQTTString topicFilters[1]; + if (MQTTDeserialize_unsubscribe(&dup, &packetid, maxcount, &count, topicFilters, buf, buflen) == 1) + strindex = MQTTStringFormat_unsubscribe(strbuf, strbuflen, dup, packetid, count, topicFilters); + } + break; + case PINGREQ: + case PINGRESP: + case DISCONNECT: + strindex = snprintf(strbuf, strbuflen, "%s", MQTTPacket_names[header.bits.type]); + break; + } + + strbuf[strbuflen] = '\0'; + return strbuf; +} diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTFormat.h b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTFormat.h new file mode 100644 index 0000000..a921aea --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTFormat.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#if !defined(MQTTFORMAT_H) +#define MQTTFORMAT_H + +#include "StackTrace.h" +#include "MQTTPacket.h" + +const char* MQTTPacket_getName(uint16_t packetid); +int32_t MQTTStringFormat_connect(char* strbuf, int32_t strbuflen, MQTTPacket_connectData* data); +int32_t MQTTStringFormat_connack(char* strbuf, int32_t strbuflen, uint8_t connack_rc, uint8_t sessionPresent); +int32_t MQTTStringFormat_publish(char* strbuf, int32_t strbuflen, uint8_t dup, uint8_t qos, uint8_t retained, + uint16_t packetid, MQTTString topicName, uint8_t* payload, int32_t payloadlen); +int32_t MQTTStringFormat_ack(char* strbuf, int32_t strbuflen, uint8_t packettype, uint8_t dup, uint16_t packetid); +int32_t MQTTStringFormat_subscribe(char* strbuf, int32_t strbuflen, uint8_t dup, uint16_t packetid, int32_t count, + MQTTString topicFilters[], int32_t requestedQoSs[]); +int32_t MQTTStringFormat_suback(char* strbuf, int32_t strbuflen, uint16_t packetid, int32_t count, int32_t* grantedQoSs); +int32_t MQTTStringFormat_unsubscribe(char* strbuf, int32_t strbuflen, uint8_t dup, uint16_t packetid, + int32_t count, MQTTString topicFilters[]); +char* MQTTFormat_toClientString(char* strbuf, int32_t strbuflen, uint8_t* buf, int32_t buflen); +char* MQTTFormat_toServerString(char* strbuf, int32_t strbuflen, uint8_t* buf, int32_t buflen); + +#endif diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTPacket.c b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTPacket.c new file mode 100644 index 0000000..f608549 --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTPacket.c @@ -0,0 +1,443 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Sergio R. Caprile - non-blocking packet read functions for stream transport + *******************************************************************************/ + +#include "StackTrace.h" +#include "MQTTPacket.h" +#include + +/** + * Encodes the message length according to the MQTT algorithm + * @param buf the buffer into which the encoded data is written + * @param length the length to be encoded + * @return the number of bytes written to buffer + */ +int32_t MQTTPacket_encode(uint8_t* buf, int32_t length) +{ + int32_t rc = 0; + + FUNC_ENTRY; + do + { + uint8_t d = length % 128; + length /= 128; + + /* if there are more digits to encode, set the top bit of this digit */ + if (length > 0) + d |= 0x80; + + buf[rc++] = d; + } while (length > 0); + + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Decodes the message length according to the MQTT algorithm + * @param getcharfn pointer to function to read the next character from the data source + * @param value the decoded length returned + * @return the number of bytes read from the socket + */ +int32_t MQTTPacket_decode(int32_t (*getcharfn)(uint8_t*, int32_t), int32_t* value) +{ + uint8_t c; + int32_t multiplier = 1; + int32_t len = 0; + +#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 + + FUNC_ENTRY; + *value = 0; + + do + { + int32_t rc = MQTTPACKET_READ_ERROR; + + if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) + { + rc = MQTTPACKET_READ_ERROR; /* bad data */ + goto exit; + } + + rc = (*getcharfn)(&c, 1); + + if (rc != 1) + goto exit; + + *value += (c & 127) * multiplier; + multiplier *= 128; + } while ((c & 128) != 0); + +exit: + FUNC_EXIT_RC(len); + return len; +} + + +int32_t MQTTPacket_len(int32_t rem_len) +{ + rem_len += 1; /* header byte */ + + /* now remaining_length field */ + if (rem_len < 128) + rem_len += 1; + else if (rem_len < 16384) + rem_len += 2; + else if (rem_len < 2097151) + rem_len += 3; + else + rem_len += 4; + + return rem_len; +} + + +static uint8_t* bufptr; + +int32_t bufchar(uint8_t* c, int32_t count) +{ + for (int32_t i = 0; i < count; ++i) + *c = *bufptr++; + + return count; +} + + +int32_t MQTTPacket_decodeBuf(uint8_t* buf, int32_t* value) +{ + bufptr = buf; + return MQTTPacket_decode(bufchar, value); +} + + +/** + * Calculates an integer from two bytes read from the input buffer + * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned + * @return the integer value calculated + */ +int32_t readInt(uint8_t** pptr) +{ + uint8_t* ptr = *pptr; + int32_t len = 256*(*ptr) + (*(ptr+1)); + *pptr += 2; + return len; +} + + +/** + * Reads one character from the input buffer. + * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned + * @return the character read + */ +char readChar(uint8_t** pptr) +{ + char c = **pptr; + (*pptr)++; + return c; +} + + +/** + * Writes one character to an output buffer. + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param c the character to write + */ +void writeChar(uint8_t** pptr, char c) +{ + **pptr = c; + (*pptr)++; +} + + +/** + * Writes an integer as 2 bytes to an output buffer. + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param anInt the integer to write + */ +void writeInt(uint8_t** pptr, int32_t anInt) +{ + **pptr = (uint8_t)(anInt / 256); + (*pptr)++; + **pptr = (uint8_t)(anInt % 256); + (*pptr)++; +} + + +/** + * Writes a "UTF" string to an output buffer. Converts C string to length-delimited. + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param string the C string to write + */ +void writeCString(uint8_t** pptr, const char* string) +{ + int32_t len = strlen(string); + writeInt(pptr, len); + memcpy(*pptr, string, len); + *pptr += len; +} + + +int32_t getLenStringLen(char* ptr) +{ + int32_t len = 256*((uint8_t)(*ptr)) + (uint8_t)(*(ptr+1)); + return len; +} + + +void writeMQTTString(uint8_t** pptr, MQTTString mqttstring) +{ + if (mqttstring.lenstring.len > 0) + { + writeInt(pptr, mqttstring.lenstring.len); + memcpy(*pptr, mqttstring.lenstring.data, mqttstring.lenstring.len); + *pptr += mqttstring.lenstring.len; + } + else if (mqttstring.cstring) + writeCString(pptr, mqttstring.cstring); + else + writeInt(pptr, 0); +} + + +/** + * @param mqttstring the MQTTString structure into which the data is to be read + * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned + * @param enddata pointer to the end of the data: do not read beyond + * @return 1 if successful, 0 if not + */ +int32_t readMQTTLenString(MQTTString* mqttstring, uint8_t** pptr, uint8_t* enddata) +{ + int32_t rc = 0; + + FUNC_ENTRY; + + /* the first two bytes are the length of the string */ + if (enddata - (*pptr) > 1) /* enough length to read the integer? */ + { + mqttstring->lenstring.len = readInt(pptr); /* increments pptr to point past length */ + + if (&(*pptr)[mqttstring->lenstring.len] <= enddata) + { + mqttstring->lenstring.data = (char*)*pptr; + *pptr += mqttstring->lenstring.len; + rc = 1; + } + } + + mqttstring->cstring = NULL; + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Return the length of the MQTTstring - C string if there is one, otherwise the length delimited string + * @param mqttstring the string to return the length of + * @return the length of the string + */ +int32_t MQTTstrlen(MQTTString mqttstring) +{ + int rc = 0; + + if (mqttstring.cstring) + rc = strlen(mqttstring.cstring); + else + rc = mqttstring.lenstring.len; + + return rc; +} + + +/** + * Compares an MQTTString to a C string + * @param a the MQTTString to compare + * @param bptr the C string to compare + * @return boolean - equal or not + */ +int32_t MQTTPacket_equals(MQTTString* a, char* bptr) +{ + int32_t alen = 0, blen = 0; + char *aptr; + + if (a->cstring) + { + aptr = a->cstring; + alen = strlen(a->cstring); + } + else + { + aptr = a->lenstring.data; + alen = a->lenstring.len; + } + + blen = strlen(bptr); + + return (alen == blen) && (strncmp(aptr, bptr, alen) == 0); +} + + +/** + * Helper function to read packet data from some source into a buffer + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param getfn pointer to a function which will read any number of bytes from the needed source + * @return integer MQTT packet type, or -1 on error + * @note the whole message must fit into the caller's buffer + */ +int32_t MQTTPacket_read(uint8_t* buf, int32_t buflen, int32_t (*getfn)(uint8_t*, int32_t)) +{ + int32_t rc = -1; + MQTTHeader header = {0}; + int32_t len = 0; + int32_t rem_len = 0; + + /* 1. read the header byte. This has the packet type in it */ + if ((*getfn)(buf, 1) != 1) + goto exit; + + len = 1; + /* 2. read the remaining length. This is variable in itself */ + MQTTPacket_decode(getfn, &rem_len); + len += MQTTPacket_encode(buf + 1, rem_len); /* put the original remaining length back into the buffer */ + + /* 3. read the rest of the buffer using a callback to supply the rest of the data */ + if((rem_len + len) > buflen) + goto exit; + if ((*getfn)(buf + len, rem_len) != rem_len) + goto exit; + + header.byte = buf[0]; + rc = header.bits.type; +exit: + return rc; +} + +/** + * Decodes the message length according to the MQTT algorithm, non-blocking + * @param trp pointer to a transport structure holding what is needed to solve getting data from it + * @param value the decoded length returned + * @return integer the number of bytes read from the socket, 0 for call again, or -1 on error + */ +static int32_t MQTTPacket_decodenb(MQTTTransport *trp) +{ + uint8_t c; + int32_t rc = MQTTPACKET_READ_ERROR; + + FUNC_ENTRY; + + if (trp->len == 0) + { /* initialize on first call */ + trp->multiplier = 1; + trp->rem_len = 0; + } + + do + { + int32_t frc; + + if (trp->len >= MAX_NO_OF_REMAINING_LENGTH_BYTES) + goto exit; + + if ((frc=(*trp->getfn)(trp->sck, &c, 1)) == -1) + goto exit; + + if (frc == 0) + { + rc = 0; + goto exit; + } + + ++(trp->len); + trp->rem_len += (c & 127) * trp->multiplier; + trp->multiplier *= 128; + } while ((c & 128) != 0); + + rc = trp->len; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + +/** + * Helper function to read packet data from some source into a buffer, non-blocking + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param trp pointer to a transport structure holding what is needed to solve getting data from it + * @return integer MQTT packet type, 0 for call again, or -1 on error + * @note the whole message must fit into the caller's buffer + */ +int32_t MQTTPacket_readnb(uint8_t* buf, int32_t buflen, MQTTTransport *trp) +{ + int32_t rc = -1, frc; + MQTTHeader header = {0}; + + switch (trp->state) + { + default: + trp->state = 0; + /*FALLTHROUGH*/ + case 0: + /* read the header byte. This has the packet type in it */ + if ((frc=(*trp->getfn)(trp->sck, buf, 1)) == -1) + goto exit; + + if (frc == 0) + return 0; + + trp->len = 0; + ++trp->state; + + /*FALLTHROUGH*/ + /* read the remaining length. This is variable in itself */ + case 1: + if ((frc=MQTTPacket_decodenb(trp)) == MQTTPACKET_READ_ERROR) + goto exit; + + if (frc == 0) + return 0; + + trp->len = 1 + MQTTPacket_encode(buf + 1, trp->rem_len); /* put the original remaining length back into the buffer */ + + if ((trp->rem_len + trp->len) > buflen) + goto exit; + + ++trp->state; + /*FALLTHROUGH*/ + case 2: + /* read the rest of the buffer using a callback to supply the rest of the data */ + if ((frc=(*trp->getfn)(trp->sck, buf + trp->len, trp->rem_len)) == -1) + goto exit; + + if (frc == 0) + return 0; + + trp->rem_len -= frc; + trp->len += frc; + + if (trp->rem_len) + return 0; + + header.byte = buf[0]; + rc = header.bits.type; + break; + } +exit: + trp->state = 0; + return rc; +} + diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTPacket.h b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTPacket.h new file mode 100644 index 0000000..0b9c474 --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTPacket.h @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Xiang Rong - 442039 Add makefile to Embedded C client + *******************************************************************************/ + +#ifndef MQTTPACKET_H_ +#define MQTTPACKET_H_ + +#include + + +#if defined(__cplusplus) /* If this is a C++ compiler, use C linkage */ +extern "C" { +#endif + +#if defined(WIN32_DLL) || defined(WIN64_DLL) + #define DLLImport __declspec(dllimport) + #define DLLExport __declspec(dllexport) +#elif defined(LINUX_SO) + #define DLLImport extern + #define DLLExport __attribute__ ((visibility ("default"))) +#else + #define DLLImport + #define DLLExport +#endif + +enum errors +{ + MQTTPACKET_BUFFER_TOO_SHORT = -2, + MQTTPACKET_READ_ERROR = -1, + MQTTPACKET_READ_COMPLETE +}; + +enum msgTypes +{ + CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, + SUBACK, UNSUBSCRIBE, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT +}; + +/** + * Bitfields for the MQTT header byte. + */ +typedef union +{ + uint8_t byte; /**< the whole byte */ +#if defined(REVERSED) + struct + { + uint8_t type : 4; /**< message type nibble */ + uint8_t dup : 1; /**< DUP flag bit */ + uint8_t qos : 2; /**< QoS value, 0, 1 or 2 */ + uint8_t retain : 1; /**< retained flag bit */ + } bits; +#else + struct + { + uint8_t retain : 1; /**< retained flag bit */ + uint8_t qos : 2; /**< QoS value, 0, 1 or 2 */ + uint8_t dup : 1; /**< DUP flag bit */ + uint8_t type : 4; /**< message type nibble */ +} bits; +#endif +} MQTTHeader; + +typedef struct +{ + int32_t len; + char* data; +} MQTTLenString; + +typedef struct +{ + char* cstring; + MQTTLenString lenstring; +} MQTTString; + +#define MQTTString_initializer {NULL, {0, NULL}} + +int32_t MQTTstrlen(MQTTString mqttstring); + +#include "MQTTConnect.h" +#include "MQTTPublish.h" +#include "MQTTSubscribe.h" +#include "MQTTUnsubscribe.h" +#include "MQTTFormat.h" + +int32_t MQTTSerialize_ack(uint8_t* buf, int32_t buflen, uint8_t type, uint8_t dup, uint16_t packetid); +int32_t MQTTDeserialize_ack(uint8_t* packettype, uint8_t* dup, uint16_t* packetid, uint8_t* buf, int32_t buflen); + +int32_t MQTTPacket_len(int32_t rem_len); +int32_t MQTTPacket_equals(MQTTString* a, char* b); + +int32_t MQTTPacket_encode(uint8_t* buf, int32_t length); +int32_t MQTTPacket_decode(int32_t (*getcharfn)(uint8_t*, int32_t), int32_t* value); +int32_t MQTTPacket_decodeBuf(uint8_t* buf, int32_t* value); + +int32_t readInt(uint8_t** pptr); +char readChar(uint8_t** pptr); +void writeChar(uint8_t** pptr, char c); +void writeInt(uint8_t** pptr, int32_t anInt); +int32_t readMQTTLenString(MQTTString* mqttstring, uint8_t** pptr, uint8_t* enddata); +void writeCString(uint8_t** pptr, const char* string); +void writeMQTTString(uint8_t** pptr, MQTTString mqttstring); + +DLLExport int32_t MQTTPacket_read(uint8_t* buf, int32_t buflen, int32_t (*getfn)(uint8_t*, int32_t)); + +typedef struct +{ + int32_t (*getfn)(void *, uint8_t*, int32_t); /* must return -1 for error, 0 for call again, or the number of bytes read */ + void *sck; /* pointer to whatever the system may use to identify the transport */ + int32_t multiplier; + int32_t rem_len; + int32_t len; + char state; +}MQTTTransport; + +int32_t MQTTPacket_readnb(uint8_t* buf, int32_t buflen, MQTTTransport *trp); + +#ifdef __cplusplus /* If this is a C++ compiler, use C linkage */ +} +#endif + + +#endif /* MQTTPACKET_H_ */ diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTPublish.h b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTPublish.h new file mode 100644 index 0000000..87b86f7 --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTPublish.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Xiang Rong - 442039 Add makefile to Embedded C client + *******************************************************************************/ + +#ifndef MQTTPUBLISH_H_ +#define MQTTPUBLISH_H_ + +#if !defined(DLLImport) + #define DLLImport +#endif +#if !defined(DLLExport) + #define DLLExport +#endif + +DLLExport int32_t MQTTSerialize_publish(uint8_t* buf, int32_t buflen, uint8_t dup, uint8_t qos, uint8_t retained, uint16_t packetid, + MQTTString topicName, uint8_t* payload, int32_t payloadlen); + +DLLExport int32_t MQTTDeserialize_publish(uint8_t* dup, uint8_t* qos, uint8_t* retained, uint16_t* packetid, MQTTString* topicName, + uint8_t** payload, int32_t* payloadlen, uint8_t* buf, int32_t len); + +DLLExport int32_t MQTTSerialize_puback(uint8_t* buf, int32_t buflen, uint16_t packetid); +DLLExport int32_t MQTTSerialize_pubrel(uint8_t* buf, int32_t buflen, uint8_t dup, uint16_t packetid); +DLLExport int32_t MQTTSerialize_pubcomp(uint8_t* buf, int32_t buflen, uint16_t packetid); + +#endif /* MQTTPUBLISH_H_ */ diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSerializePublish.c b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSerializePublish.c new file mode 100644 index 0000000..32d281f --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSerializePublish.c @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs - fix for https://bugs.eclipse.org/bugs/show_bug.cgi?id=453144 + *******************************************************************************/ + +#include "MQTTPacket.h" +#include "StackTrace.h" + +#include + + +/** + * Determines the length of the MQTT publish packet that would be produced using the supplied parameters + * @param qos the MQTT QoS of the publish (packetid is omitted for QoS 0) + * @param topicName the topic name to be used in the publish + * @param payloadlen the length of the payload to be sent + * @return the length of buffer needed to contain the serialized version of the packet + */ +int32_t MQTTSerialize_publishLength(uint8_t qos, MQTTString topicName, int32_t payloadlen) +{ + int32_t len = 2 + MQTTstrlen(topicName) + payloadlen; + + if (qos > 0) + len += 2; /* packetid */ + + return len; +} + + +/** + * Serializes the supplied publish data into the supplied buffer, ready for sending + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param dup integer - the MQTT dup flag + * @param qos integer - the MQTT QoS value + * @param retained integer - the MQTT retained flag + * @param packetid integer - the MQTT packet identifier + * @param topicName MQTTString - the MQTT topic in the publish + * @param payload byte buffer - the MQTT publish payload + * @param payloadlen integer - the length of the MQTT payload + * @return the length of the serialized data. <= 0 indicates error + */ +int32_t MQTTSerialize_publish(uint8_t* buf, int32_t buflen, uint8_t dup, uint8_t qos, uint8_t retained, uint16_t packetid, + MQTTString topicName, uint8_t* payload, int32_t payloadlen) +{ + uint8_t *ptr = buf; + MQTTHeader header = {0}; + int32_t rem_len = 0; + int32_t rc = 0; + + FUNC_ENTRY; + + if (MQTTPacket_len(rem_len = MQTTSerialize_publishLength(qos, topicName, payloadlen)) > buflen) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + + header.bits.type = PUBLISH; + header.bits.dup = dup; + header.bits.qos = qos; + header.bits.retain = retained; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */; + + writeMQTTString(&ptr, topicName); + + if (qos > 0) + writeInt(&ptr, packetid); + + memcpy(ptr, payload, payloadlen); + ptr += payloadlen; + + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + + +/** + * Serializes the ack packet into the supplied buffer. + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param type the MQTT packet type + * @param dup the MQTT dup flag + * @param packetid the MQTT packet identifier + * @return serialized length, or error if 0 + */ +int32_t MQTTSerialize_ack(uint8_t* buf, int32_t buflen, uint8_t packettype, uint8_t dup, uint16_t packetid) +{ + MQTTHeader header = {0}; + int32_t rc = 0; + uint8_t *ptr = buf; + + FUNC_ENTRY; + + if (buflen < 4) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + + header.bits.type = packettype; + header.bits.dup = dup; + header.bits.qos = (packettype == PUBREL) ? 1 : 0; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */ + writeInt(&ptr, packetid); + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Serializes a puback packet into the supplied buffer. + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param packetid integer - the MQTT packet identifier + * @return serialized length, or error if 0 + */ +int32_t MQTTSerialize_puback(uint8_t* buf, int32_t buflen, uint16_t packetid) +{ + return MQTTSerialize_ack(buf, buflen, PUBACK, 0, packetid); +} + + +/** + * Serializes a pubrel packet into the supplied buffer. + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param dup integer - the MQTT dup flag + * @param packetid integer - the MQTT packet identifier + * @return serialized length, or error if 0 + */ +int32_t MQTTSerialize_pubrel(uint8_t* buf, int32_t buflen, uint8_t dup, uint16_t packetid) +{ + return MQTTSerialize_ack(buf, buflen, PUBREL, dup, packetid); +} + + +/** + * Serializes a pubrel packet into the supplied buffer. + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param packetid integer - the MQTT packet identifier + * @return serialized length, or error if 0 + */ +int32_t MQTTSerialize_pubcomp(uint8_t* buf, int32_t buflen, uint16_t packetid) +{ + return MQTTSerialize_ack(buf, buflen, PUBCOMP, 0, packetid); +} + + diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSubscribe.h b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSubscribe.h new file mode 100644 index 0000000..e55ee1f --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSubscribe.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Xiang Rong - 442039 Add makefile to Embedded C client + *******************************************************************************/ + +#ifndef MQTTSUBSCRIBE_H_ +#define MQTTSUBSCRIBE_H_ + +#if !defined(DLLImport) + #define DLLImport +#endif +#if !defined(DLLExport) + #define DLLExport +#endif + +DLLExport int32_t MQTTSerialize_subscribe(uint8_t* buf, int32_t buflen, uint8_t dup, uint16_t packetid, + int32_t count, MQTTString topicFilters[], int32_t requestedQoSs[]); + +DLLExport int32_t MQTTDeserialize_subscribe(uint8_t* dup, uint16_t* packetid, + int32_t maxcount, int32_t* count, MQTTString topicFilters[], int32_t requestedQoSs[], uint8_t* buf, int32_t len); + +DLLExport int32_t MQTTSerialize_suback(uint8_t* buf, int32_t buflen, uint16_t packetid, int32_t count, int32_t* grantedQoSs); +DLLExport int32_t MQTTDeserialize_suback(uint16_t* packetid, int32_t maxcount, int32_t* count, int32_t grantedQoSs[], uint8_t* buf, int32_t len); + + +#endif /* MQTTSUBSCRIBE_H_ */ diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSubscribeClient.c b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSubscribeClient.c new file mode 100644 index 0000000..253f9d2 --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSubscribeClient.c @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "MQTTPacket.h" +#include "StackTrace.h" + +#include + +/** + * Determines the length of the MQTT subscribe packet that would be produced using the supplied parameters + * @param count the number of topic filter strings in topicFilters + * @param topicFilters the array of topic filter strings to be used in the publish + * @return the length of buffer needed to contain the serialized version of the packet + */ +int32_t MQTTSerialize_subscribeLength(int32_t count, MQTTString topicFilters[]) +{ + int32_t len = 2; /* packetid */ + + for (int32_t i = 0; i < count; ++i) + len += 2 + MQTTstrlen(topicFilters[i]) + 1; /* length + topic + req_qos */ + + return len; +} + + +/** + * Serializes the supplied subscribe data into the supplied buffer, ready for sending + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied bufferr + * @param dup integer - the MQTT dup flag + * @param packetid integer - the MQTT packet identifier + * @param count - number of members in the topicFilters and reqQos arrays + * @param topicFilters - array of topic filter names + * @param requestedQoSs - array of requested QoS + * @return the length of the serialized data. <= 0 indicates error + */ +int32_t MQTTSerialize_subscribe(uint8_t* buf, int32_t buflen, uint8_t dup, uint16_t packetid, int32_t count, + MQTTString topicFilters[], int32_t requestedQoSs[]) +{ + uint8_t *ptr = buf; + MQTTHeader header = {0}; + int32_t rem_len = 0; + int32_t rc = 0; + + FUNC_ENTRY; + + if (MQTTPacket_len(rem_len = MQTTSerialize_subscribeLength(count, topicFilters)) > buflen) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + + header.byte = 0; + header.bits.type = SUBSCRIBE; + header.bits.dup = dup; + header.bits.qos = 1; + writeChar(&ptr, header.byte); /* write header */ + + ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */; + + writeInt(&ptr, packetid); + + for (int32_t i = 0; i < count; ++i) + { + writeMQTTString(&ptr, topicFilters[i]); + writeChar(&ptr, requestedQoSs[i]); + } + + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + + +/** + * Deserializes the supplied (wire) buffer into suback data + * @param packetid returned integer - the MQTT packet identifier + * @param maxcount - the maximum number of members allowed in the grantedQoSs array + * @param count returned integer - number of members in the grantedQoSs array + * @param grantedQoSs returned array of integers - the granted qualities of service + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @return error code. 1 is success, 0 is failure + */ +int32_t MQTTDeserialize_suback(uint16_t* packetid, int32_t maxcount, int32_t* count, int32_t grantedQoSs[], uint8_t* buf, int32_t buflen) +{ + MQTTHeader header = {0}; + uint8_t* curdata = buf; + uint8_t* enddata = NULL; + int32_t rc = 0; + int32_t mylen; + + FUNC_ENTRY; + + header.byte = readChar(&curdata); + if (header.bits.type != SUBACK) + goto exit; + + curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ + enddata = curdata + mylen; + + if (enddata - curdata < 2) + goto exit; + + *packetid = readInt(&curdata); + + *count = 0; + + while (curdata < enddata) + { + if (*count > maxcount) + { + rc = -1; + goto exit; + } + grantedQoSs[(*count)++] = readChar(&curdata); + } + + rc = 1; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSubscribeServer.c b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSubscribeServer.c new file mode 100644 index 0000000..da3d492 --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTSubscribeServer.c @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "MQTTPacket.h" +#include "StackTrace.h" + +#include + + +/** + * Deserializes the supplied (wire) buffer into subscribe data + * @param dup integer returned - the MQTT dup flag + * @param packetid integer returned - the MQTT packet identifier + * @param maxcount - the maximum number of members allowed in the topicFilters and requestedQoSs arrays + * @param count - number of members in the topicFilters and requestedQoSs arrays + * @param topicFilters - array of topic filter names + * @param requestedQoSs - array of requested QoS + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @return the length of the serialized data. <= 0 indicates error + */ +int32_t MQTTDeserialize_subscribe(uint8_t* dup, uint16_t* packetid, int32_t maxcount, int32_t* count, MQTTString topicFilters[], + int32_t requestedQoSs[], uint8_t* buf, int32_t buflen) +{ + MQTTHeader header = {0}; + uint8_t* curdata = buf; + uint8_t* enddata = NULL; + int32_t rc = -1; + int32_t mylen = 0; + + FUNC_ENTRY; + + header.byte = readChar(&curdata); + + if (header.bits.type != SUBSCRIBE) + goto exit; + + *dup = header.bits.dup; + curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ + enddata = curdata + mylen; + *packetid = readInt(&curdata); + *count = 0; + + while (curdata < enddata) + { + if (!readMQTTLenString(&topicFilters[*count], &curdata, enddata)) + goto exit; + + if (curdata >= enddata) /* do we have enough data to read the req_qos version byte? */ + goto exit; + + requestedQoSs[*count] = readChar(&curdata); + (*count)++; + } + + rc = 1; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Serializes the supplied suback data into the supplied buffer, ready for sending + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param packetid integer - the MQTT packet identifier + * @param count - number of members in the grantedQoSs array + * @param grantedQoSs - array of granted QoS + * @return the length of the serialized data. <= 0 indicates error + */ +int32_t MQTTSerialize_suback(uint8_t* buf, int32_t buflen, uint16_t packetid, int32_t count, int32_t* grantedQoSs) +{ + MQTTHeader header = {0}; + int32_t rc = -1; + uint8_t *ptr = buf; + + FUNC_ENTRY; + + if (buflen < 2 + count) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + + header.byte = 0; + header.bits.type = SUBACK; + writeChar(&ptr, header.byte); /* write header */ + ptr += MQTTPacket_encode(ptr, 2 + count); /* write remaining length */ + writeInt(&ptr, packetid); + + for (int32_t i = 0; i < count; ++i) + writeChar(&ptr, grantedQoSs[i]); + + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTUnsubscribe.h b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTUnsubscribe.h new file mode 100644 index 0000000..e68d1a6 --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTUnsubscribe.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Xiang Rong - 442039 Add makefile to Embedded C client + *******************************************************************************/ + +#ifndef MQTTUNSUBSCRIBE_H_ +#define MQTTUNSUBSCRIBE_H_ + +#if !defined(DLLImport) + #define DLLImport +#endif +#if !defined(DLLExport) + #define DLLExport +#endif + +DLLExport int32_t MQTTSerialize_unsubscribe(uint8_t* buf, int32_t buflen, uint8_t dup, uint16_t packetid, + int32_t count, MQTTString topicFilters[]); + +DLLExport int32_t MQTTDeserialize_unsubscribe(uint8_t* dup, uint16_t* packetid, int32_t max_count, int32_t* count, MQTTString topicFilters[], + uint8_t* buf, int32_t len); + +DLLExport int32_t MQTTSerialize_unsuback(uint8_t* buf, int32_t buflen, uint16_t packetid); +DLLExport int32_t MQTTDeserialize_unsuback(uint16_t* packetid, uint8_t* buf, int32_t len); + +#endif /* MQTTUNSUBSCRIBE_H_ */ diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTUnsubscribeClient.c b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTUnsubscribeClient.c new file mode 100644 index 0000000..132507c --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTUnsubscribeClient.c @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "MQTTPacket.h" +#include "StackTrace.h" + +#include + +/** + * Determines the length of the MQTT unsubscribe packet that would be produced using the supplied parameters + * @param count the number of topic filter strings in topicFilters + * @param topicFilters the array of topic filter strings to be used in the publish + * @return the length of buffer needed to contain the serialized version of the packet + */ +int32_t MQTTSerialize_unsubscribeLength(int32_t count, MQTTString topicFilters[]) +{ + int32_t len = 2; /* packetid */ + + for (int32_t i = 0; i < count; ++i) + len += 2 + MQTTstrlen(topicFilters[i]); /* length + topic*/ + + return len; +} + + +/** + * Serializes the supplied unsubscribe data into the supplied buffer, ready for sending + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @param dup integer - the MQTT dup flag + * @param packetid integer - the MQTT packet identifier + * @param count - number of members in the topicFilters array + * @param topicFilters - array of topic filter names + * @return the length of the serialized data. <= 0 indicates error + */ +int32_t MQTTSerialize_unsubscribe(uint8_t* buf, int32_t buflen, uint8_t dup, uint16_t packetid, + int32_t count, MQTTString topicFilters[]) +{ + uint8_t *ptr = buf; + MQTTHeader header = {0}; + int32_t rem_len = 0; + int32_t rc = -1; + + FUNC_ENTRY; + + if (MQTTPacket_len(rem_len = MQTTSerialize_unsubscribeLength(count, topicFilters)) > buflen) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + + header.byte = 0; + header.bits.type = UNSUBSCRIBE; + header.bits.dup = dup; + header.bits.qos = 1; + writeChar(&ptr, header.byte); /* write header */ + ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */; + writeInt(&ptr, packetid); + + for (int32_t i = 0; i < count; ++i) + writeMQTTString(&ptr, topicFilters[i]); + + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Deserializes the supplied (wire) buffer into unsuback data + * @param packetid returned integer - the MQTT packet identifier + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @return error code. 1 is success, 0 is failure + */ +int32_t MQTTDeserialize_unsuback(uint16_t* packetid, uint8_t* buf, int32_t buflen) +{ + uint8_t type = 0; + uint8_t dup = 0; + int32_t rc = 0; + + FUNC_ENTRY; + + rc = MQTTDeserialize_ack(&type, &dup, packetid, buf, buflen); + + if (type == UNSUBACK) + rc = 1; + + FUNC_EXIT_RC(rc); + return rc; +} + + diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTUnsubscribeServer.c b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTUnsubscribeServer.c new file mode 100644 index 0000000..2bbf86b --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/MQTTUnsubscribeServer.c @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ +#include "MQTTPacket.h" +#include "StackTrace.h" +#include + +/** + * Deserializes the supplied (wire) buffer into unsubscribe data + * @param dup integer returned - the MQTT dup flag + * @param packetid integer returned - the MQTT packet identifier + * @param maxcount - the maximum number of members allowed in the topicFilters and requestedQoSs arrays + * @param count - number of members in the topicFilters and requestedQoSs arrays + * @param topicFilters - array of topic filter names + * @param buf the raw buffer data, of the correct length determined by the remaining length field + * @param buflen the length in bytes of the data in the supplied buffer + * @return the length of the serialized data. <= 0 indicates error + */ +int32_t MQTTDeserialize_unsubscribe(uint8_t* dup, uint16_t* packetid, int32_t maxcount, int32_t* count, MQTTString topicFilters[], + uint8_t* buf, int32_t len) +{ + MQTTHeader header = {0}; + uint8_t* curdata = buf; + uint8_t* enddata = NULL; + int32_t rc = 0; + int32_t mylen = 0; + + FUNC_ENTRY; + + header.byte = readChar(&curdata); + if (header.bits.type != UNSUBSCRIBE) + goto exit; + *dup = header.bits.dup; + + curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */ + enddata = curdata + mylen; + + *packetid = readInt(&curdata); + *count = 0; + + while (curdata < enddata) + { + if (!readMQTTLenString(&topicFilters[*count], &curdata, enddata)) + goto exit; + (*count)++; + } + + rc = 1; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + +/** + * Serializes the supplied unsuback data into the supplied buffer, ready for sending + * @param buf the buffer into which the packet will be serialized + * @param buflen the length in bytes of the supplied buffer + * @param packetid integer - the MQTT packet identifier + * @return the length of the serialized data. <= 0 indicates error + */ +int32_t MQTTSerialize_unsuback(uint8_t* buf, int32_t buflen, uint16_t packetid) +{ + MQTTHeader header = {0}; + int32_t rc = 0; + uint8_t *ptr = buf; + + FUNC_ENTRY; + + if (buflen < 2) + { + rc = MQTTPACKET_BUFFER_TOO_SHORT; + goto exit; + } + + header.byte = 0; + header.bits.type = UNSUBACK; + writeChar(&ptr, header.byte); /* write header */ + ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */ + writeInt(&ptr, packetid); + + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + + diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/StackTrace.h b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/StackTrace.h new file mode 100644 index 0000000..5ba0a85 --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/MQTTPacket/src/StackTrace.h @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + * Ian Craggs - fix for bug #434081 + *******************************************************************************/ + +#ifndef STACKTRACE_H_ +#define STACKTRACE_H_ + +#include +#define NOSTACKTRACE 1 + +#if defined(NOSTACKTRACE) +#define FUNC_ENTRY +#define FUNC_ENTRY_NOLOG +#define FUNC_ENTRY_MED +#define FUNC_ENTRY_MAX +#define FUNC_EXIT +#define FUNC_EXIT_NOLOG +#define FUNC_EXIT_MED +#define FUNC_EXIT_MAX +#define FUNC_EXIT_RC(x) +#define FUNC_EXIT_MED_RC(x) +#define FUNC_EXIT_MAX_RC(x) + +#else + +#if defined(WIN32) +#define inline __inline +#define FUNC_ENTRY StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MINIMUM) +#define FUNC_ENTRY_NOLOG StackTrace_entry(__FUNCTION__, __LINE__, -1) +#define FUNC_ENTRY_MED StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MEDIUM) +#define FUNC_ENTRY_MAX StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MAXIMUM) +#define FUNC_EXIT StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MINIMUM) +#define FUNC_EXIT_NOLOG StackTrace_exit(__FUNCTION__, __LINE__, -1) +#define FUNC_EXIT_MED StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MEDIUM) +#define FUNC_EXIT_MAX StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MAXIMUM) +#define FUNC_EXIT_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MINIMUM) +#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MEDIUM) +#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MAXIMUM) +#else +#define FUNC_ENTRY StackTrace_entry(__func__, __LINE__, TRACE_MINIMUM) +#define FUNC_ENTRY_NOLOG StackTrace_entry(__func__, __LINE__, -1) +#define FUNC_ENTRY_MED StackTrace_entry(__func__, __LINE__, TRACE_MEDIUM) +#define FUNC_ENTRY_MAX StackTrace_entry(__func__, __LINE__, TRACE_MAXIMUM) +#define FUNC_EXIT StackTrace_exit(__func__, __LINE__, NULL, TRACE_MINIMUM) +#define FUNC_EXIT_NOLOG StackTrace_exit(__func__, __LINE__, NULL, -1) +#define FUNC_EXIT_MED StackTrace_exit(__func__, __LINE__, NULL, TRACE_MEDIUM) +#define FUNC_EXIT_MAX StackTrace_exit(__func__, __LINE__, NULL, TRACE_MAXIMUM) +#define FUNC_EXIT_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MINIMUM) +#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MEDIUM) +#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MAXIMUM) + +void StackTrace_entry(const char* name, int32_t line, int32_t trace); +void StackTrace_exit(const char* name, int32_t line, void* return_value, int32_t trace); + +void StackTrace_printStack(FILE* dest); +char* StackTrace_get(unsigned long); + +#endif + +#endif + + + + +#endif /* STACKTRACE_H_ */ diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/mqtt_interface.c b/22_m1284p_WIZNET_MQTT/Internet/MQTT/mqtt_interface.c new file mode 100644 index 0000000..2792ed9 --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/mqtt_interface.c @@ -0,0 +1,79 @@ +#include "mqtt_interface.h" +#include "wizchip_conf.h" +#include "socket.h" +//#include +#include + +/* +uint32_t MilliTimer; + +void MilliTimer_Handler(void) +{ + MilliTimer++; +} +*/ + +int8_t expired(Timer* timer) +{ + int32_t left = (timer->end_time) - millis(); + return (left < 0); +} + +void countdown_ms(Timer* timer, uint32_t timeout) +{ + timer->end_time = millis() + timeout; +} + +void countdown(Timer* timer, uint32_t timeout) +{ + timer->end_time = millis() + (timeout * 1000UL); +} + +int32_t left_ms(Timer* timer) +{ + int32_t left = timer->end_time - millis(); + return (left < 0) ? 0 : left; +} + +void InitTimer(Timer* timer) +{ + timer->end_time = 0; +} + + +void NewNetwork(Network* n) +{ + //n->my_socket = 0; //initialized outside actually.. + n->mqttread = w5500_read; + n->mqttwrite = w5500_write; + n->disconnect = w5500_disconnect; +} + +int32_t w5500_read(Network* n, uint8_t* buffer, int32_t len, int32_t timeout_ms) +{ + if ((getSn_SR(n->my_socket) == SOCK_ESTABLISHED) && (getSn_RX_RSR(n->my_socket) > 0)) + return recv(n->my_socket, buffer, len); + + return 0; +} + +int32_t w5500_write(Network* n, uint8_t* buffer, int32_t len, int32_t timeout_ms) +{ + if (getSn_SR(n->my_socket) == SOCK_ESTABLISHED) + return send(n->my_socket, buffer, len); + + return 0; +} + +void w5500_disconnect(Network* n) +{ + disconnect(n->my_socket); +} + +int32_t ConnectNetwork(Network* n, uint8_t* ip, uint16_t port) +{ + socket(n->my_socket, Sn_MR_TCP, 12345, 0); + connect(n->my_socket, ip, port); + + return 0; +} diff --git a/22_m1284p_WIZNET_MQTT/Internet/MQTT/mqtt_interface.h b/22_m1284p_WIZNET_MQTT/Internet/MQTT/mqtt_interface.h new file mode 100644 index 0000000..037baa1 --- /dev/null +++ b/22_m1284p_WIZNET_MQTT/Internet/MQTT/mqtt_interface.h @@ -0,0 +1,42 @@ +#ifndef __MQTT_INTERFACE_H_ +#define __MQTT_INTERFACE_H_ + +#include +#include "../../globals.h" + +typedef struct Timer Timer; + +struct Timer +{ + uint32_t systick_period; + uint32_t end_time; +}; + +typedef struct Network Network; + +struct Network +{ + int32_t my_socket; + int32_t (*mqttread) (Network*, uint8_t*, int32_t, int32_t); + int32_t (*mqttwrite) (Network*, uint8_t*, int32_t, int32_t); + void (*disconnect) (Network*); +}; + +void InitTimer(Timer*); +/* +void MilliTimer_Handler(void); +*/ + +int8_t expired(Timer*); +void countdown_ms(Timer*, uint32_t); +void countdown(Timer*, uint32_t); +int32_t left_ms(Timer*); + +int32_t w5500_read(Network*, uint8_t*, int32_t, int32_t); +int32_t w5500_write(Network*, uint8_t*, int32_t, int32_t); +void w5500_disconnect(Network*); +void NewNetwork(Network*); + +int32_t ConnectNetwork(Network*, uint8_t*, uint16_t); + +#endif diff --git a/22_m1284p_WIZNET_MQTT/globals.c b/22_m1284p_WIZNET_MQTT/globals.c index 6b51aa0..98b2186 100644 --- a/22_m1284p_WIZNET_MQTT/globals.c +++ b/22_m1284p_WIZNET_MQTT/globals.c @@ -14,6 +14,8 @@ wiz_NetInfo netInfo = { .mac = {0x00, 0x08, 0xdc, 0xab, 0xcd, 0xef}, // Mac add .dns = {8,8,8,8}, // DNS address (google dns) .gw = {192, 168, 0, 1}, // Gateway address .dhcp = NETINFO_STATIC}; //Static IP configuration +uint8_t MQTT_targetIP[4] = {192, 168, 0, 100}; // IP брокера MQTT + #else //NIC metrics for another PC (second IP configuration) wiz_NetInfo netInfo = { .mac = {0x00, 0x08, 0xdc, 0xab, 0xcd, 0xef}, // Mac address @@ -22,5 +24,6 @@ wiz_NetInfo netInfo = { .mac = {0x00, 0x08, 0xdc, 0xab, 0xcd, 0xef}, // Mac add .dns = {8,8,8,8}, // DNS address (google dns) .gw = {192, 168, 1, 1}, // Gateway address .dhcp = NETINFO_STATIC}; //Static IP configuration +uint8_t MQTT_targetIP[4] = {192, 168, 1, 81}; // IP брокера MQTT #endif diff --git a/22_m1284p_WIZNET_MQTT/globals.h b/22_m1284p_WIZNET_MQTT/globals.h index 25410eb..dacd544 100644 --- a/22_m1284p_WIZNET_MQTT/globals.h +++ b/22_m1284p_WIZNET_MQTT/globals.h @@ -41,6 +41,8 @@ static FATFS Fatfs; //File system object for each logical drive. >= 2 #define PRINTF(...) #endif +#define SPRINTF(__S, FORMAT, args...) sprintf_P(__S, PSTR(FORMAT),##args) + #define IP_WORK //SPI CLOCK 4 or 8Mhz @@ -67,6 +69,7 @@ extern const char compile_time[] PROGMEM; extern const char str_prog_name[] PROGMEM; extern wiz_NetInfo netInfo; +extern uint8_t MQTT_targetIP[4]; #define CHK_RAM_LEAKAGE #define CHK_UPTIME diff --git a/22_m1284p_WIZNET_MQTT/main.c b/22_m1284p_WIZNET_MQTT/main.c index d298186..3b13a99 100644 --- a/22_m1284p_WIZNET_MQTT/main.c +++ b/22_m1284p_WIZNET_MQTT/main.c @@ -21,12 +21,15 @@ #include "Ethernet/wizchip_conf.h" #include "Application/loopback/loopback.h" +#include "Internet/MQTT/mqtt_interface.h" +#include "Internet/MQTT/MQTTClient.h" + #define _MAIN_DEBUG_ /* * 22. MQTT + Mosquitto in LAN test * - * Used code from: + * Used as base code from: * Nadyrshin Ruslan - MQTTPacket (MQTT client/server v3.1.1 adapted for AVR MCU). * YouTube-channel: https://www.youtube.com/channel/UChButpZaL5kUUl_zTyIDFkQ * @@ -56,7 +59,7 @@ volatile unsigned long _millis; // for millis tick !! Overflow every ~49.7 days //*********Program metrics const char compile_date[] PROGMEM = __DATE__; // Mmm dd yyyy - Дата компиляции const char compile_time[] PROGMEM = __TIME__; // hh:mm:ss - Время компиляции -const char str_prog_name[] PROGMEM = "\r\nAtMega1284p v1.0alpha Static IP MQTT && Loop-back WIZNET_5500 ETHERNET 05/04/2019\r\n"; // Program name +const char str_prog_name[] PROGMEM = "\r\nAtMega1284p v1.1 Static IP MQTT && Loop-back WIZNET_5500 ETHERNET 06/04/2019\r\n"; // Program name #if defined(__AVR_ATmega128__) const char PROGMEM str_mcu[] = "ATmega128"; //CPU is m128 @@ -76,6 +79,37 @@ const char PROGMEM str_mcu[] = "ATmega1284p"; //CPU is m1284p const char PROGMEM str_mcu[] = "Unknown CPU"; //CPU is unknown #endif +//******************* MQTT: BEGIN +#define SOCK_MQTT 2 +// Receive Buffer +#define MQTT_BUFFER_SIZE 512 // 2048 +uint8_t mqtt_readBuffer[MQTT_BUFFER_SIZE]; +volatile uint16_t mes_id; + + +//MQTT subscribe call-back is here +void messageArrived(MessageData* md) +{ + char _topic_name[64] = "\0"; + char _message[128] = "\0"; + + MQTTMessage* message = md->message; + MQTTString* topic = md->topicName; + strncpy(_topic_name, topic->lenstring.data, topic->lenstring.len); + strncpy(_message, message->payload, message->payloadlen); + PRINTF("<topicName-> + /* + for (uint8_t i = 0; i < md->topicName->lenstring.len; i++) + putchar(*(md->topicName->lenstring.data + i)); + + printf(" (%.*s)\r\n", (int32_t)message->payloadlen, (char*)message->payload); + */ +} + + +//******************* MQTT: END //FUNC headers static void avr_init(void); @@ -263,35 +297,6 @@ void IO_LIBRARY_Init(void) { } //***************** WIZCHIP INIT: END -/* -void spi_speed_tst(void) -{ - // Here on SPI pins: MOSI 400Khz freq out, on SCLK 3.2MhzOUT - while(1) - { - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - SPI_WRITE(0xF0); - } -} -*/ - int main() { //uint8_t prev_sw1 = 1; // VAR for sw1 pressing detect @@ -324,11 +329,52 @@ int main() wdt_reset(); } +//****************MQTT client initialize + //Find MQTT broker and connect with it + uint8_t mqtt_buf[100]; + int32_t mqtt_rc = 0; + Network mqtt_network; + Client mqtt_client; + mqtt_network.my_socket = SOCK_MQTT; + + // Можно определить IP узла по DNS-имени, IP узла будет в массиве targetIP + //DNS_init(1, tempBuffer); + //DNS_run(gWIZNETINFO.dns, "test.mosquitto.org", targetIP); + + PRINTF(">>Trying connect to MQTT broker: %d.%d.%d.%d ..\r\n", MQTT_targetIP[0], MQTT_targetIP[1], MQTT_targetIP[2], MQTT_targetIP[3]); + NewNetwork(&mqtt_network); + ConnectNetwork(&mqtt_network, MQTT_targetIP, 1883); + MQTTClient(&mqtt_client, &mqtt_network, 1000, mqtt_buf, 100, mqtt_readBuffer, MQTT_BUFFER_SIZE); + + //Connection to MQTT broker + MQTTPacket_connectData data = MQTTPacket_connectData_initializer; + data.willFlag = 0; + data.MQTTVersion = 4;//3; + data.clientID.cstring = (char*)"w5500_avr_client"; + data.username.cstring = (char*)"user1234"; + data.password.cstring = (char*)"\0"; + data.keepAliveInterval = 60; + data.cleansession = 1; + mqtt_rc = MQTTConnect(&mqtt_client, &data); + if (mqtt_rc == SUCCESSS) + { + PRINTF("++MQTT Connected SUCCESS: %ld\r\n", mqtt_rc); + } + else + { + PRINTF("--MQTT Connected ERROR: %ld\r\n", mqtt_rc); + while(1);//Reboot the board + } + + // Subscribe to all topics + char SubString[] = "/#"; + mqtt_rc = MQTTSubscribe(&mqtt_client, SubString, QOS0, messageArrived); + PRINTF("Subscribed (%s) %d\r\n", SubString, mqtt_rc); + - /* Loopback Test: TCP Server and UDP */ - // Test for Ethernet data transfer validation uint32_t timer_link_1sec = millis(); uint32_t timer_uptime_60sec = millis(); + uint32_t timer_mqtt_pub_10sec = millis(); while(1) { //Here at least every 1sec @@ -337,12 +383,36 @@ int main() //Use Hercules Terminal to check loopback tcp:5000 and udp:3000 /* * https://www.hw-group.com/software/hercules-setup-utility - * */ + * + */ loopback_tcps(SOCK_TCPS,ethBuf0,PORT_TCPS); loopback_udps(SOCK_UDPS,ethBuf0,PORT_UDPS); - //loopback_ret = loopback_tcpc(SOCK_TCPS, gDATABUF, destip, destport); - //if(loopback_ret < 0) printf("loopback ret: %ld\r\n", loopback_ret); // TCP Socket Error code + // MQTT pub event every 10 sec + if((millis()-timer_mqtt_pub_10sec)> 10000) + { + static uint32_t mqtt_pub_count = 0; + //here every 10 sec + timer_mqtt_pub_10sec = millis(); + + char _msg[64]; + //Every 10sec push message: "Uptime: xxx sec; Free RAM: xxxxx bytes", to BLYNK server (widget Terminal) + int len = SPRINTF(_msg, "Uptime: %lu sec; Free RAM: %d bytes\r\n", millis()/1000, freeRam()); + if(len > 0) + { + PRINTF(">>MQTT pub msg №%lu\r\n", ++mqtt_pub_count); + MQTTMessage pubMessage; + pubMessage.qos = QOS0; + pubMessage.id = mes_id++; + pubMessage.payloadlen = (size_t)len; + pubMessage.payload = _msg; + MQTTPublish(&mqtt_client, "/w5500_avr_dbg", &pubMessage); + } + } + + // MQTT broker connection and sub receive + MQTTYield(&mqtt_client, 100);//~100msec blocking here + if((millis()-timer_link_1sec)> 1000) {