MQTT client testing is OK

master
maxxir_w 7 years ago
parent 36c836d1b7
commit 10d09c2e94

@ -34,6 +34,8 @@
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/${ProjName}/Ethernet}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/${ProjName}/Ethernet/W5500}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/${ProjName}/Application/loopback}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/${ProjName}/Internet/MQTT/MQTTPacket}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/${ProjName}/Internet/MQTT/MQTTPacket/src}&quot;"/>
</option>
<inputType id="de.innot.avreclipse.compiler.winavr.input.167604838" name="C Source Files" superClass="de.innot.avreclipse.compiler.winavr.input"/>
</tool>
@ -75,5 +77,9 @@
</scannerConfigBuildInfo>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.core.LanguageSettingsProviders"/>
<storageModule moduleId="refreshScope"/>
<storageModule moduleId="refreshScope" versionNumber="2">
<configuration configurationName="Release">
<resource resourceType="PROJECT" workspacePath="/22_m1284p_WIZNET_MQTT"/>
</configuration>
</storageModule>
</cproject>

@ -0,0 +1,538 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "MQTTClient.h"
//#include <terminal_io.h>
void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessgage)
{
md->topicName = aTopicName;
md->message = aMessgage;
}
int32_t getNextPacketId(Client *c)
{
return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
}
int32_t sendPacket(Client* c, int32_t length, Timer* timer)
{
int32_t rc = FAILURE, sent = 0;
while (sent < length && !expired(timer))
{
rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, left_ms(timer));
if (rc < 0) // there was an error writing the data
break;
sent += rc;
}
if (sent == length)
{
countdown(&c->ping_timer, c->keepAliveInterval); // record the fact that we have SUCCESSSfully sent the packet
rc = SUCCESSS;
}
else
rc = FAILURE;
return rc;
}
void MQTTClient(Client* c, Network* network, uint32_t command_timeout_ms, uint8_t* buf, size_t buf_size, uint8_t* readbuf, size_t readbuf_size)
{
c->ipstack = network;
for (int32_t i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
c->messageHandlers[i].topicFilter = 0;
c->command_timeout_ms = command_timeout_ms;
c->buf = buf;
c->buf_size = buf_size;
c->readbuf = readbuf;
c->readbuf_size = readbuf_size;
c->isconnected = 0;
c->ping_outstanding = 0;
c->defaultMessageHandler = NULL;
InitTimer(&c->ping_timer);
}
int32_t decodePacket(Client* c, int32_t* value, int32_t timeout)
{
uint8_t i;
int32_t multiplier = 1;
int32_t len = 0;
const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
*value = 0;
do
{
int32_t rc = MQTTPACKET_READ_ERROR;
if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
{
rc = MQTTPACKET_READ_ERROR; /* bad data */
goto exit;
}
rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
if (rc != 1)
goto exit;
*value += (i & 127) * multiplier;
multiplier *= 128;
} while ((i & 128) != 0);
exit:
return len;
}
int32_t readPacket(Client* c, Timer* timer)
{
int32_t rc = FAILURE;
MQTTHeader header = {0};
int32_t len = 0;
int32_t rem_len = 0;
/* 1. read the header byte. This has the packet type in it */
if (c->ipstack->mqttread(c->ipstack, c->readbuf, 1, left_ms(timer)) != 1)
goto exit;
len = 1;
/* 2. read the remaining length. This is variable in itself */
decodePacket(c, &rem_len, left_ms(timer));
len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
if (rem_len > 0 && (c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, left_ms(timer)) != rem_len))
goto exit;
header.byte = c->readbuf[0];
rc = header.bits.type;
exit:
return rc;
}
// assume topic filter and name is in correct format
// # can only be at end
// + and # can only be next to separator
char isTopicMatched(char* topicFilter, MQTTString* topicName)
{
char* curf = topicFilter;
char* curn = topicName->lenstring.data;
char* curn_end = curn + topicName->lenstring.len;
while (*curf && curn < curn_end)
{
if (*curn == '/' && *curf != '/')
break;
if (*curf != '+' && *curf != '#' && *curf != *curn)
break;
if (*curf == '+')
{ // skip until we meet the next separator, or end of string
char* nextpos = curn + 1;
while (nextpos < curn_end && *nextpos != '/')
nextpos = ++curn + 1;
}
else if (*curf == '#')
curn = curn_end - 1; // skip until end of string
curf++;
curn++;
}
return (curn == curn_end) && (*curf == '\0');
}
int32_t deliverMessage(Client* c, MQTTString* topicName, MQTTMessage* message)
{
int32_t rc = FAILURE;
// we have to find the right message handler - indexed by topic
for (int32_t i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
{
if (c->messageHandlers[i].fp != NULL)
{
MessageData md;
NewMessageData(&md, topicName, message);
c->messageHandlers[i].fp(&md);
rc = SUCCESSS;
}
}
}
if (rc == FAILURE && c->defaultMessageHandler != NULL)
{
MessageData md;
NewMessageData(&md, topicName, message);
c->defaultMessageHandler(&md);
rc = SUCCESSS;
}
return rc;
}
int32_t keepalive(Client* c)
{
int32_t rc = FAILURE;
if (c->keepAliveInterval == 0)
{
rc = SUCCESSS;
goto exit;
}
if (expired(&c->ping_timer))
{
if (!c->ping_outstanding)
{
Timer timer;
InitTimer(&timer);
countdown_ms(&timer, 1000);
int32_t len = MQTTSerialize_pingreq(c->buf, c->buf_size);
if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESSS) // send the ping packet
c->ping_outstanding = 1;
}
}
exit:
return rc;
}
int32_t cycle(Client* c, Timer* timer)
{
// read the socket, see what work is due
uint16_t packet_type = readPacket(c, timer);
int32_t len = 0, rc = SUCCESSS;
switch (packet_type)
{
case CONNACK:
case PUBACK:
case SUBACK:
break;
case PUBLISH:
{
MQTTString topicName;
MQTTMessage msg;
if (MQTTDeserialize_publish((uint8_t*)&msg.dup, (uint8_t*)&msg.qos, (uint8_t*)&msg.retained, (uint16_t*)&msg.id, &topicName,
(uint8_t**)&msg.payload, (int32_t*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
goto exit;
deliverMessage(c, &topicName, &msg);
if (msg.qos != QOS0)
{
if (msg.qos == QOS1)
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
else if (msg.qos == QOS2)
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
if (len <= 0)
rc = FAILURE;
else
rc = sendPacket(c, len, timer);
if (rc == FAILURE)
goto exit; // there was a problem
}
break;
}
case PUBREC:
{
uint16_t mypacketid;
uint8_t dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREL, 0, mypacketid)) <= 0)
rc = FAILURE;
else if ((rc = sendPacket(c, len, timer)) != SUCCESSS) // send the PUBREL packet
rc = FAILURE; // there was a problem
if (rc == FAILURE)
goto exit; // there was a problem
break;
}
case PUBCOMP:
break;
case PINGRESP:
c->ping_outstanding = 0;
break;
}
keepalive(c);
exit:
if (rc == SUCCESSS)
rc = packet_type;
return rc;
}
int32_t MQTTYield(Client* c, int32_t timeout_ms)
{
int32_t rc = SUCCESSS;
Timer timer;
InitTimer(&timer);
countdown_ms(&timer, timeout_ms);
while (!expired(&timer))
{
if (cycle(c, &timer) == FAILURE)
{
rc = FAILURE;
break;
}
}
return rc;
}
// only used in single-threaded mode where one command at a time is in process
int32_t waitfor(Client* c, int32_t packet_type, Timer* timer)
{
int32_t rc = FAILURE;
do
{
if (expired(timer))
break; // we timed out
}
while ((rc = cycle(c, timer)) != packet_type);
return rc;
}
int32_t MQTTConnect(Client* c, MQTTPacket_connectData* options)
{
Timer connect_timer;
int32_t rc = FAILURE;
MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
int32_t len = 0;
InitTimer(&connect_timer);
countdown_ms(&connect_timer, c->command_timeout_ms);
if (c->isconnected) // don't send connect packet again if we are already connected
goto exit;
if (options == 0)
options = &default_options; // set default options if none were supplied
c->keepAliveInterval = options->keepAliveInterval;
countdown(&c->ping_timer, c->keepAliveInterval);
if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
goto exit;
if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESSS) // send the connect packet
goto exit; // there was a problem
// this will be a blocking call, wait for the connack
if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
{
uint8_t connack_rc = 255;
int8_t sessionPresent = 0;
if (MQTTDeserialize_connack((uint8_t*)&sessionPresent, &connack_rc, c->readbuf, c->readbuf_size) == 1)
rc = connack_rc;
else
rc = FAILURE;
}
else
rc = FAILURE;
exit:
if (rc == SUCCESSS)
c->isconnected = 1;
return rc;
}
int32_t MQTTSubscribe(Client* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
{
int32_t rc = FAILURE;
Timer timer;
int32_t len = 0;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicFilter;
InitTimer(&timer);
countdown_ms(&timer, c->command_timeout_ms);
if (!c->isconnected)
goto exit;
len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int32_t*)&qos);
if (len <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESSS) // send the subscribe packet
goto exit; // there was a problem
if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
{
int32_t count = 0, grantedQoS = -1;
uint16_t mypacketid;
if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, c->readbuf, c->readbuf_size) == 1)
rc = grantedQoS; // 0, 1, 2 or 0x80
if (rc != 0x80)
{
for (int32_t i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter == 0)
{
c->messageHandlers[i].topicFilter = topicFilter;
c->messageHandlers[i].fp = messageHandler;
rc = 0;
break;
}
}
}
}
else
rc = FAILURE;
exit:
return rc;
}
int32_t MQTTUnsubscribe(Client* c, const char* topicFilter)
{
int32_t rc = FAILURE;
Timer timer;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicFilter;
int32_t len = 0;
InitTimer(&timer);
countdown_ms(&timer, c->command_timeout_ms);
if (!c->isconnected)
goto exit;
if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESSS) // send the subscribe packet
goto exit; // there was a problem
if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
{
uint16_t mypacketid; // should be the same as the packetid above
if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
rc = 0;
}
else
rc = FAILURE;
exit:
return rc;
}
int32_t MQTTPublish(Client* c, const char* topicName, MQTTMessage* message)
{
int32_t rc = FAILURE;
Timer timer;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicName;
int32_t len = 0;
InitTimer(&timer);
countdown_ms(&timer, c->command_timeout_ms);
if (!c->isconnected)
goto exit;
if (message->qos == QOS1 || message->qos == QOS2)
message->id = getNextPacketId(c);
len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
topic, (uint8_t*)message->payload, message->payloadlen);
if (len <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESSS) // send the subscribe packet
goto exit; // there was a problem
if (message->qos == QOS1)
{
if (waitfor(c, PUBACK, &timer) == PUBACK)
{
uint16_t mypacketid;
uint8_t dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
}
else
rc = FAILURE;
}
else if (message->qos == QOS2)
{
if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
{
uint16_t mypacketid;
uint8_t dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
}
else
rc = FAILURE;
}
exit:
return rc;
}
int32_t MQTTDisconnect(Client* c)
{
int32_t rc = FAILURE;
Timer timer; // we might wait for incomplete incoming publishes to complete
int32_t len = MQTTSerialize_disconnect(c->buf, c->buf_size);
InitTimer(&timer);
countdown_ms(&timer, c->command_timeout_ms);
if (len > 0)
rc = sendPacket(c, len, &timer); // send the disconnect packet
c->isconnected = 0;
return rc;
}

@ -0,0 +1,104 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#ifndef __MQTT_CLIENT_C_
#define __MQTT_CLIENT_C_
#include "MQTTPacket.h"
#include "mqtt_interface.h" //Platform specific implementation header file
#include "stdio.h"
#define MAX_PACKET_ID 65535
#define MAX_MESSAGE_HANDLERS 5
enum QoS
{
QOS0,
QOS1,
QOS2
};
// all failure return codes must be negative
enum returnCode
{
BUFFER_OVERFLOW = -2,
FAILURE = -1,
SUCCESSS = 0
};
void NewTimer(Timer*);
typedef struct MQTTMessage MQTTMessage;
typedef struct MessageData MessageData;
struct MQTTMessage
{
enum QoS qos;
char retained;
char dup;
uint16_t id;
void *payload;
size_t payloadlen;
};
struct MessageData
{
MQTTMessage* message;
MQTTString* topicName;
};
typedef void (*messageHandler)(MessageData*);
typedef struct Client Client;
int32_t MQTTConnect (Client*, MQTTPacket_connectData*);
int32_t MQTTPublish (Client*, const char*, MQTTMessage*);
int32_t MQTTSubscribe (Client*, const char*, enum QoS, messageHandler);
int32_t MQTTUnsubscribe (Client*, const char*);
int32_t MQTTDisconnect (Client*);
int32_t MQTTYield (Client*, int32_t);
void setDefaultMessageHandler(Client*, messageHandler);
void MQTTClient(Client*, Network*, uint32_t, unsigned char*, size_t, unsigned char*, size_t);
struct Client
{
uint32_t next_packetid;
uint32_t command_timeout_ms;
size_t buf_size, readbuf_size;
uint8_t *buf;
uint8_t *readbuf;
uint32_t keepAliveInterval;
int8_t ping_outstanding;
int32_t isconnected;
struct MessageHandlers
{
const char* topicFilter;
void (*fp) (MessageData*);
} messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
void (*defaultMessageHandler) (MessageData*);
Network* ipstack;
Timer ping_timer;
};
#define DefaultClient {0, 0, 0, 0, NULL, NULL, 0, 0, 0}
#endif

@ -0,0 +1,129 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Xiang Rong - 442039 Add makefile to Embedded C client
*******************************************************************************/
#ifndef MQTTCONNECT_H_
#define MQTTCONNECT_H_
#include <stdint.h>
#if !defined(DLLImport)
#define DLLImport
#endif
#if !defined(DLLExport)
#define DLLExport
#endif
typedef union
{
uint8_t all; /**< all connect flags */
#if defined(REVERSED)
struct
{
uint8_t username : 1; /**< 3.1 user name */
uint8_t password : 1; /**< 3.1 password */
uint8_t willRetain : 1; /**< will retain setting */
uint8_t willQoS : 2; /**< will QoS value */
uint8_t will : 1; /**< will flag */
uint8_t cleansession: 1; /**< clean session flag */
uint8_t : 1; /**< unused */
} bits;
#else
struct
{
uint8_t : 1; /**< unused */
uint8_t cleansession: 1; /**< cleansession flag */
uint8_t will : 1; /**< will flag */
uint8_t willQoS : 2; /**< will QoS value */
uint8_t willRetain : 1; /**< will retain setting */
uint8_t password : 1; /**< 3.1 password */
uint8_t username : 1; /**< 3.1 user name */
} bits;
#endif
} MQTTConnectFlags; /**< connect flags byte */
/** Defines the MQTT "Last Will and Testament" (LWT) settings for the connect packet. */
typedef struct
{
/** The eyecatcher for this structure. must be MQTW. */
int8_t struct_id[4];
/** The version number of this structure. Must be 0 */
int16_t struct_version;
/** The LWT topic to which the LWT message will be published. */
MQTTString topicName;
/** The LWT payload. */
MQTTString message;
/** The retained flag for the LWT message (see MQTTAsync_message.retained). */
uint8_t retained;
/** The quality of service setting for the LWT message (see MQTTAsync_message.qos and @ref qos). */
int8_t qos;
} MQTTPacket_willOptions;
#define MQTTPacket_willOptions_initializer { {'M', 'Q', 'T', 'W'}, 0, {NULL, {0, NULL}}, {NULL, {0, NULL}}, 0, 0 }
typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
int8_t struct_id[4];
/** The version number of this structure. Must be 0 */
uint16_t struct_version;
/** Version of MQTT to be used. 3 = 3.1 4 = 3.1.1 */
uint8_t MQTTVersion;
MQTTString clientID;
uint16_t keepAliveInterval;
uint8_t cleansession;
uint8_t willFlag;
MQTTPacket_willOptions will;
MQTTString username;
MQTTString password;
} MQTTPacket_connectData;
typedef union
{
unsigned char all; /**< all connack flags */
#if defined(REVERSED)
struct
{
uint8_t sessionpresent : 1; /**< session present flag */
uint8_t : 7; /**< unused */
} bits;
#else
struct
{
uint8_t : 7; /**< unused */
uint8_t sessionpresent : 1; /**< session present flag */
} bits;
#endif
} MQTTConnackFlags; /**< connack flags byte */
#define MQTTPacket_connectData_initializer { {'M', 'Q', 'T', 'C'}, 0, 4, {NULL, {0, NULL}}, 60, 1, 0, \
MQTTPacket_willOptions_initializer, {NULL, {0, NULL}}, {NULL, {0, NULL}} }
DLLExport int32_t MQTTSerialize_connect(uint8_t* buf, int32_t buflen, MQTTPacket_connectData* options);
DLLExport int32_t MQTTDeserialize_connect(MQTTPacket_connectData* data, uint8_t* buf, int32_t len);
DLLExport int32_t MQTTSerialize_connack(uint8_t* buf, int32_t buflen, uint8_t connack_rc, uint8_t sessionPresent);
DLLExport int32_t MQTTDeserialize_connack(uint8_t* sessionPresent, uint8_t* connack_rc, uint8_t* buf, int32_t buflen);
DLLExport int32_t MQTTSerialize_disconnect(uint8_t* buf, int32_t buflen);
DLLExport int32_t MQTTSerialize_pingreq(uint8_t* buf, int32_t buflen);
#endif /* MQTTCONNECT_H_ */

@ -0,0 +1,217 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "MQTTPacket.h"
#include "StackTrace.h"
#include <string.h>
/**
* Determines the length of the MQTT connect packet that would be produced using the supplied connect options.
* @param options the options to be used to build the connect packet
* @return the length of buffer needed to contain the serialized version of the packet
*/
int32_t MQTTSerialize_connectLength(MQTTPacket_connectData* options)
{
int32_t len = 0;
FUNC_ENTRY;
if (options->MQTTVersion == 3)
len = 12; /* variable depending on MQTT or MQIsdp */
else if (options->MQTTVersion == 4)
len = 10;
len += MQTTstrlen(options->clientID) + 2;
if (options->willFlag)
len += MQTTstrlen(options->will.topicName) + 2 + MQTTstrlen(options->will.message) + 2;
if (options->username.cstring || options->username.lenstring.data)
len += MQTTstrlen(options->username)+2;
if (options->password.cstring || options->password.lenstring.data)
len += MQTTstrlen(options->password)+2;
FUNC_EXIT_RC(len);
return len;
}
/**
* Serializes the connect options into the buffer.
* @param buf the buffer into which the packet will be serialized
* @param len the length in bytes of the supplied buffer
* @param options the options to be used to build the connect packet
* @return serialized length, or error if 0
*/
int32_t MQTTSerialize_connect(uint8_t* buf, int32_t buflen, MQTTPacket_connectData* options)
{
uint8_t *ptr = buf;
MQTTHeader header = {0};
MQTTConnectFlags flags = {0};
int32_t len = 0;
int32_t rc = -1;
FUNC_ENTRY;
if (MQTTPacket_len(len = MQTTSerialize_connectLength(options)) > buflen)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.byte = 0;
header.bits.type = CONNECT;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, len); /* write remaining length */
if (options->MQTTVersion == 4)
{
writeCString(&ptr, "MQTT");
writeChar(&ptr, (char) 4);
}
else
{
writeCString(&ptr, "MQIsdp");
writeChar(&ptr, (char) 3);
}
flags.all = 0;
flags.bits.cleansession = options->cleansession;
flags.bits.will = (options->willFlag) ? 1 : 0;
if (flags.bits.will)
{
flags.bits.willQoS = options->will.qos;
flags.bits.willRetain = options->will.retained;
}
if (options->username.cstring || options->username.lenstring.data)
flags.bits.username = 1;
if (options->password.cstring || options->password.lenstring.data)
flags.bits.password = 1;
writeChar(&ptr, flags.all);
writeInt(&ptr, options->keepAliveInterval);
writeMQTTString(&ptr, options->clientID);
if (options->willFlag)
{
writeMQTTString(&ptr, options->will.topicName);
writeMQTTString(&ptr, options->will.message);
}
if (flags.bits.username)
writeMQTTString(&ptr, options->username);
if (flags.bits.password)
writeMQTTString(&ptr, options->password);
rc = ptr - buf;
exit: FUNC_EXIT_RC(rc);
return rc;
}
/**
* Deserializes the supplied (wire) buffer into connack data - return code
* @param sessionPresent the session present flag returned (only for MQTT 3.1.1)
* @param connack_rc returned integer value of the connack return code
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param len the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure
*/
int32_t MQTTDeserialize_connack(uint8_t* sessionPresent, uint8_t* connack_rc, uint8_t* buf, int32_t buflen)
{
MQTTHeader header = {0};
uint8_t* curdata = buf;
uint8_t* enddata = NULL;
int32_t rc = 0;
int32_t mylen;
MQTTConnackFlags flags = {0};
FUNC_ENTRY;
header.byte = readChar(&curdata);
if (header.bits.type != CONNACK)
goto exit;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen;
if (enddata - curdata < 2)
goto exit;
flags.all = readChar(&curdata);
*sessionPresent = flags.bits.sessionpresent;
*connack_rc = readChar(&curdata);
rc = 1;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Serializes a 0-length packet into the supplied buffer, ready for writing to a socket
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer, to avoid overruns
* @param packettype the message type
* @return serialized length, or error if 0
*/
int32_t MQTTSerialize_zero(uint8_t* buf, int32_t buflen, uint8_t packettype)
{
MQTTHeader header = {0};
int32_t rc = -1;
uint8_t *ptr = buf;
FUNC_ENTRY;
if (buflen < 2)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.byte = 0;
header.bits.type = packettype;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, 0); /* write remaining length */
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Serializes a disconnect packet into the supplied buffer, ready for writing to a socket
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer, to avoid overruns
* @return serialized length, or error if 0
*/
int32_t MQTTSerialize_disconnect(uint8_t* buf, int32_t buflen)
{
return MQTTSerialize_zero(buf, buflen, DISCONNECT);
}
/**
* Serializes a disconnect packet into the supplied buffer, ready for writing to a socket
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer, to avoid overruns
* @return serialized length, or error if 0
*/
int32_t MQTTSerialize_pingreq(uint8_t* buf, int32_t buflen)
{
return MQTTSerialize_zero(buf, buflen, PINGREQ);
}

@ -0,0 +1,153 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "StackTrace.h"
#include "MQTTPacket.h"
#include <string.h>
#define min(a, b) ((a < b) ? a : b)
/**
* Validates MQTT protocol name and version combinations
* @param protocol the MQTT protocol name as an MQTTString
* @param version the MQTT protocol version number, as in the connect packet
* @return correct MQTT combination? 1 is true, 0 is false
*/
int32_t MQTTPacket_checkVersion(MQTTString* protocol, int32_t version)
{
int32_t rc = 0;
if (version == 3 && memcmp(protocol->lenstring.data, "MQIsdp", min(6, protocol->lenstring.len)) == 0)
rc = 1;
else if (version == 4 && memcmp(protocol->lenstring.data, "MQTT", min(4, protocol->lenstring.len)) == 0)
rc = 1;
return rc;
}
/**
* Deserializes the supplied (wire) buffer into connect data structure
* @param data the connect data structure to be filled out
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param len the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure
*/
int32_t MQTTDeserialize_connect(MQTTPacket_connectData* data, uint8_t* buf, int32_t len)
{
MQTTHeader header = {0};
MQTTConnectFlags flags = {0};
uint8_t* curdata = buf;
uint8_t* enddata = &buf[len];
int32_t rc = 0;
MQTTString Protocol;
int32_t version;
int32_t mylen = 0;
FUNC_ENTRY;
header.byte = readChar(&curdata);
if (header.bits.type != CONNECT)
goto exit;
curdata += MQTTPacket_decodeBuf(curdata, &mylen); /* read remaining length */
/* do we have enough data to read the protocol version byte? */
if (!readMQTTLenString(&Protocol, &curdata, enddata) || enddata - curdata < 0)
goto exit;
version = (int)readChar(&curdata); /* Protocol version */
/* If we don't recognize the protocol version, we don't parse the connect packet on the
* basis that we don't know what the format will be. */
if (MQTTPacket_checkVersion(&Protocol, version))
{
flags.all = readChar(&curdata);
data->cleansession = flags.bits.cleansession;
data->keepAliveInterval = readInt(&curdata);
if (!readMQTTLenString(&data->clientID, &curdata, enddata))
goto exit;
data->willFlag = flags.bits.will;
if (flags.bits.will)
{
data->will.qos = flags.bits.willQoS;
data->will.retained = flags.bits.willRetain;
if (!readMQTTLenString(&data->will.topicName, &curdata, enddata) ||
!readMQTTLenString(&data->will.message, &curdata, enddata))
goto exit;
}
if (flags.bits.username)
{
if (enddata - curdata < 3 || !readMQTTLenString(&data->username, &curdata, enddata))
goto exit; /* username flag set, but no username supplied - invalid */
if (flags.bits.password &&
(enddata - curdata < 3 || !readMQTTLenString(&data->password, &curdata, enddata)))
goto exit; /* password flag set, but no password supplied - invalid */
}
else if (flags.bits.password)
goto exit; /* password flag set without username - invalid */
rc = 1;
}
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Serializes the connack packet into the supplied buffer.
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param connack_rc the integer connack return code to be used
* @param sessionPresent the MQTT 3.1.1 sessionPresent flag
* @return serialized length, or error if 0
*/
int32_t MQTTSerialize_connack(uint8_t* buf, int32_t buflen, uint8_t connack_rc, uint8_t sessionPresent)
{
MQTTHeader header = {0};
int32_t rc = 0;
uint8_t *ptr = buf;
MQTTConnackFlags flags = {0};
FUNC_ENTRY;
if (buflen < 2)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.byte = 0;
header.bits.type = CONNACK;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */
flags.all = 0;
flags.bits.sessionpresent = sessionPresent;
writeChar(&ptr, flags.all);
writeChar(&ptr, connack_rc);
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}

@ -0,0 +1,109 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "StackTrace.h"
#include "MQTTPacket.h"
#include <string.h>
#define min(a, b) ((a < b) ? 1 : 0)
/**
* Deserializes the supplied (wire) buffer into publish data
* @param dup returned integer - the MQTT dup flag
* @param qos returned integer - the MQTT QoS value
* @param retained returned integer - the MQTT retained flag
* @param packetid returned integer - the MQTT packet identifier
* @param topicName returned MQTTString - the MQTT topic in the publish
* @param payload returned byte buffer - the MQTT publish payload
* @param payloadlen returned integer - the length of the MQTT payload
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success
*/
int32_t MQTTDeserialize_publish(uint8_t* dup, uint8_t* qos, uint8_t* retained, uint16_t* packetid, MQTTString* topicName,
uint8_t** payload, int32_t* payloadlen, uint8_t* buf, int32_t buflen)
{
MQTTHeader header = {0};
uint8_t* curdata = buf;
uint8_t* enddata = NULL;
int32_t rc = 0;
int32_t mylen = 0;
FUNC_ENTRY;
header.byte = readChar(&curdata);
if (header.bits.type != PUBLISH)
goto exit;
*dup = header.bits.dup;
*qos = header.bits.qos;
*retained = header.bits.retain;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen;
/* do we have enough data to read the protocol version byte? */
if (!readMQTTLenString(topicName, &curdata, enddata) || enddata - curdata < 0)
goto exit;
if (*qos > 0)
*packetid = readInt(&curdata);
*payloadlen = enddata - curdata;
*payload = curdata;
rc = 1;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Deserializes the supplied (wire) buffer into an ack
* @param packettype returned integer - the MQTT packet type
* @param dup returned integer - the MQTT dup flag
* @param packetid returned integer - the MQTT packet identifier
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure
*/
int32_t MQTTDeserialize_ack(uint8_t* packettype, uint8_t* dup, uint16_t* packetid, uint8_t* buf, int32_t buflen)
{
MQTTHeader header = {0};
uint8_t* curdata = buf;
uint8_t* enddata = NULL;
int32_t rc = 0;
int32_t mylen;
FUNC_ENTRY;
header.byte = readChar(&curdata);
*dup = header.bits.dup;
*packettype = header.bits.type;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen;
if (enddata - curdata < 2)
goto exit;
*packetid = readInt(&curdata);
rc = 1;
exit:
FUNC_EXIT_RC(rc);
return rc;
}

@ -0,0 +1,265 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "StackTrace.h"
#include "MQTTPacket.h"
#include <string.h>
const char* MQTTPacket_names[] =
{
"RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
"PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ",
"PINGRESP", "DISCONNECT"
};
const char* MQTTPacket_getName(uint16_t packetid)
{
return MQTTPacket_names[packetid];
}
int32_t MQTTStringFormat_connect(char* strbuf, int32_t strbuflen, MQTTPacket_connectData* data)
{
int32_t strindex = 0;
strindex = snprintf(strbuf, strbuflen,
"CONNECT MQTT version %d, client id %.*s, clean session %d, keep alive %d",
(int32_t)data->MQTTVersion, data->clientID.lenstring.len, data->clientID.lenstring.data,
(int32_t)data->cleansession, data->keepAliveInterval);
if (data->willFlag)
strindex += snprintf(&strbuf[strindex], strbuflen - strindex,
", will QoS %d, will retain %d, will topic %.*s, will message %.*s",
data->will.qos, data->will.retained,
data->will.topicName.lenstring.len, data->will.topicName.lenstring.data,
data->will.message.lenstring.len, data->will.message.lenstring.data);
if (data->username.lenstring.data && data->username.lenstring.len > 0)
strindex += snprintf(&strbuf[strindex], strbuflen - strindex,
", user name %.*s", data->username.lenstring.len, data->username.lenstring.data);
if (data->password.lenstring.data && data->password.lenstring.len > 0)
strindex += snprintf(&strbuf[strindex], strbuflen - strindex,
", password %.*s", data->password.lenstring.len, data->password.lenstring.data);
return strindex;
}
int32_t MQTTStringFormat_connack(char* strbuf, int32_t strbuflen, uint8_t connack_rc, uint8_t sessionPresent)
{
int32_t strindex = snprintf(strbuf, strbuflen, "CONNACK session present %d, rc %d", sessionPresent, connack_rc);
return strindex;
}
int32_t MQTTStringFormat_publish(char* strbuf, int32_t strbuflen, uint8_t dup, uint8_t qos, uint8_t retained,
uint16_t packetid, MQTTString topicName, uint8_t* payload, int32_t payloadlen)
{
int32_t strindex = snprintf(strbuf, strbuflen,
"PUBLISH dup %d, QoS %d, retained %d, packet id %d, topic %.*s, payload length %d, payload %.*s",
dup, qos, retained, packetid,
(topicName.lenstring.len < 20) ? topicName.lenstring.len : 20, topicName.lenstring.data,
payloadlen, (payloadlen < 20) ? payloadlen : 20, payload);
return strindex;
}
int32_t MQTTStringFormat_ack(char* strbuf, int32_t strbuflen, uint8_t packettype, uint8_t dup, uint16_t packetid)
{
int32_t strindex = snprintf(strbuf, strbuflen, "%s, packet id %d", MQTTPacket_names[packettype], packetid);
if (dup)
strindex += snprintf(strbuf + strindex, strbuflen - strindex, ", dup %d", dup);
return strindex;
}
int32_t MQTTStringFormat_subscribe(char* strbuf, int32_t strbuflen, uint8_t dup, uint16_t packetid, int32_t count,
MQTTString topicFilters[], int32_t requestedQoSs[])
{
return snprintf(strbuf, strbuflen, "SUBSCRIBE dup %d, packet id %d count %d topic %.*s qos %d",
dup, packetid, count, topicFilters[0].lenstring.len, topicFilters[0].lenstring.data,
requestedQoSs[0]);
}
int32_t MQTTStringFormat_suback(char* strbuf, int32_t strbuflen, uint16_t packetid, int32_t count, int32_t* grantedQoSs)
{
return snprintf(strbuf, strbuflen,
"SUBACK packet id %d count %d granted qos %d", packetid, count, grantedQoSs[0]);
}
int32_t MQTTStringFormat_unsubscribe(char* strbuf, int32_t strbuflen, uint8_t dup, uint16_t packetid,
int32_t count, MQTTString topicFilters[])
{
return snprintf(strbuf, strbuflen, "UNSUBSCRIBE dup %d, packet id %d count %d topic %.*s",
dup, packetid, count, topicFilters[0].lenstring.len, topicFilters[0].lenstring.data);
}
char* MQTTFormat_toClientString(char* strbuf, int32_t strbuflen, uint8_t* buf, int32_t buflen)
{
int32_t index = 0;
int32_t rem_length = 0;
MQTTHeader header = {0};
int32_t strindex = 0;
header.byte = buf[index++];
index += MQTTPacket_decodeBuf(&buf[index], &rem_length);
switch (header.bits.type)
{
case CONNACK:
{
uint8_t sessionPresent, connack_rc;
if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) == 1)
strindex = MQTTStringFormat_connack(strbuf, strbuflen, connack_rc, sessionPresent);
}
break;
case PUBLISH:
{
uint8_t dup, retained, *payload;
uint16_t packetid;
uint8_t qos;
int32_t payloadlen;
MQTTString topicName = MQTTString_initializer;
if (MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName,
&payload, &payloadlen, buf, buflen) == 1)
strindex = MQTTStringFormat_publish(strbuf, strbuflen, dup, qos, retained,
packetid, topicName, payload, payloadlen);
}
break;
case PUBACK:
case PUBREC:
case PUBREL:
case PUBCOMP:
{
uint8_t packettype, dup;
uint16_t packetid;
if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf, buflen) == 1)
strindex = MQTTStringFormat_ack(strbuf, strbuflen, packettype, dup, packetid);
}
break;
case SUBACK:
{
uint16_t packetid;
int32_t maxcount = 1, count = 0;
int32_t grantedQoSs[1];
if (MQTTDeserialize_suback(&packetid, maxcount, &count, grantedQoSs, buf, buflen) == 1)
strindex = MQTTStringFormat_suback(strbuf, strbuflen, packetid, count, grantedQoSs);
}
break;
case UNSUBACK:
{
uint16_t packetid;
if (MQTTDeserialize_unsuback(&packetid, buf, buflen) == 1)
strindex = MQTTStringFormat_ack(strbuf, strbuflen, UNSUBACK, 0, packetid);
}
break;
case PINGREQ:
case PINGRESP:
case DISCONNECT:
strindex = snprintf(strbuf, strbuflen, "%s", MQTTPacket_names[header.bits.type]);
break;
}
return strbuf;
}
char* MQTTFormat_toServerString(char* strbuf, int32_t strbuflen, uint8_t* buf, int32_t buflen)
{
int32_t index = 0;
int32_t rem_length = 0;
MQTTHeader header = {0};
int32_t strindex = 0;
header.byte = buf[index++];
index += MQTTPacket_decodeBuf(&buf[index], &rem_length);
switch (header.bits.type)
{
case CONNECT:
{
MQTTPacket_connectData data;
int32_t rc;
if ((rc = MQTTDeserialize_connect(&data, buf, buflen)) == 1)
strindex = MQTTStringFormat_connect(strbuf, strbuflen, &data);
}
break;
case PUBLISH:
{
uint8_t dup, retained, *payload;
uint16_t packetid;
uint8_t qos;
int32_t payloadlen;
MQTTString topicName = MQTTString_initializer;
if (MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName,
&payload, &payloadlen, buf, buflen) == 1)
strindex = MQTTStringFormat_publish(strbuf, strbuflen, dup, qos, retained,
packetid, topicName, payload, payloadlen);
}
break;
case PUBACK:
case PUBREC:
case PUBREL:
case PUBCOMP:
{
uint8_t packettype, dup;
uint16_t packetid;
if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf, buflen) == 1)
strindex = MQTTStringFormat_ack(strbuf, strbuflen, packettype, dup, packetid);
}
break;
case SUBSCRIBE:
{
uint8_t dup;
uint16_t packetid;
int32_t maxcount = 1, count = 0;
MQTTString topicFilters[1];
int32_t requestedQoSs[1];
if (MQTTDeserialize_subscribe(&dup, &packetid, maxcount, &count,
topicFilters, requestedQoSs, buf, buflen) == 1)
strindex = MQTTStringFormat_subscribe(strbuf, strbuflen, dup, packetid, count, topicFilters, requestedQoSs);;
}
break;
case UNSUBSCRIBE:
{
uint8_t dup;
uint16_t packetid;
int32_t maxcount = 1, count = 0;
MQTTString topicFilters[1];
if (MQTTDeserialize_unsubscribe(&dup, &packetid, maxcount, &count, topicFilters, buf, buflen) == 1)
strindex = MQTTStringFormat_unsubscribe(strbuf, strbuflen, dup, packetid, count, topicFilters);
}
break;
case PINGREQ:
case PINGRESP:
case DISCONNECT:
strindex = snprintf(strbuf, strbuflen, "%s", MQTTPacket_names[header.bits.type]);
break;
}
strbuf[strbuflen] = '\0';
return strbuf;
}

@ -0,0 +1,37 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#if !defined(MQTTFORMAT_H)
#define MQTTFORMAT_H
#include "StackTrace.h"
#include "MQTTPacket.h"
const char* MQTTPacket_getName(uint16_t packetid);
int32_t MQTTStringFormat_connect(char* strbuf, int32_t strbuflen, MQTTPacket_connectData* data);
int32_t MQTTStringFormat_connack(char* strbuf, int32_t strbuflen, uint8_t connack_rc, uint8_t sessionPresent);
int32_t MQTTStringFormat_publish(char* strbuf, int32_t strbuflen, uint8_t dup, uint8_t qos, uint8_t retained,
uint16_t packetid, MQTTString topicName, uint8_t* payload, int32_t payloadlen);
int32_t MQTTStringFormat_ack(char* strbuf, int32_t strbuflen, uint8_t packettype, uint8_t dup, uint16_t packetid);
int32_t MQTTStringFormat_subscribe(char* strbuf, int32_t strbuflen, uint8_t dup, uint16_t packetid, int32_t count,
MQTTString topicFilters[], int32_t requestedQoSs[]);
int32_t MQTTStringFormat_suback(char* strbuf, int32_t strbuflen, uint16_t packetid, int32_t count, int32_t* grantedQoSs);
int32_t MQTTStringFormat_unsubscribe(char* strbuf, int32_t strbuflen, uint8_t dup, uint16_t packetid,
int32_t count, MQTTString topicFilters[]);
char* MQTTFormat_toClientString(char* strbuf, int32_t strbuflen, uint8_t* buf, int32_t buflen);
char* MQTTFormat_toServerString(char* strbuf, int32_t strbuflen, uint8_t* buf, int32_t buflen);
#endif

@ -0,0 +1,443 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Sergio R. Caprile - non-blocking packet read functions for stream transport
*******************************************************************************/
#include "StackTrace.h"
#include "MQTTPacket.h"
#include <string.h>
/**
* Encodes the message length according to the MQTT algorithm
* @param buf the buffer into which the encoded data is written
* @param length the length to be encoded
* @return the number of bytes written to buffer
*/
int32_t MQTTPacket_encode(uint8_t* buf, int32_t length)
{
int32_t rc = 0;
FUNC_ENTRY;
do
{
uint8_t d = length % 128;
length /= 128;
/* if there are more digits to encode, set the top bit of this digit */
if (length > 0)
d |= 0x80;
buf[rc++] = d;
} while (length > 0);
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Decodes the message length according to the MQTT algorithm
* @param getcharfn pointer to function to read the next character from the data source
* @param value the decoded length returned
* @return the number of bytes read from the socket
*/
int32_t MQTTPacket_decode(int32_t (*getcharfn)(uint8_t*, int32_t), int32_t* value)
{
uint8_t c;
int32_t multiplier = 1;
int32_t len = 0;
#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
FUNC_ENTRY;
*value = 0;
do
{
int32_t rc = MQTTPACKET_READ_ERROR;
if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
{
rc = MQTTPACKET_READ_ERROR; /* bad data */
goto exit;
}
rc = (*getcharfn)(&c, 1);
if (rc != 1)
goto exit;
*value += (c & 127) * multiplier;
multiplier *= 128;
} while ((c & 128) != 0);
exit:
FUNC_EXIT_RC(len);
return len;
}
int32_t MQTTPacket_len(int32_t rem_len)
{
rem_len += 1; /* header byte */
/* now remaining_length field */
if (rem_len < 128)
rem_len += 1;
else if (rem_len < 16384)
rem_len += 2;
else if (rem_len < 2097151)
rem_len += 3;
else
rem_len += 4;
return rem_len;
}
static uint8_t* bufptr;
int32_t bufchar(uint8_t* c, int32_t count)
{
for (int32_t i = 0; i < count; ++i)
*c = *bufptr++;
return count;
}
int32_t MQTTPacket_decodeBuf(uint8_t* buf, int32_t* value)
{
bufptr = buf;
return MQTTPacket_decode(bufchar, value);
}
/**
* Calculates an integer from two bytes read from the input buffer
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
* @return the integer value calculated
*/
int32_t readInt(uint8_t** pptr)
{
uint8_t* ptr = *pptr;
int32_t len = 256*(*ptr) + (*(ptr+1));
*pptr += 2;
return len;
}
/**
* Reads one character from the input buffer.
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
* @return the character read
*/
char readChar(uint8_t** pptr)
{
char c = **pptr;
(*pptr)++;
return c;
}
/**
* Writes one character to an output buffer.
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param c the character to write
*/
void writeChar(uint8_t** pptr, char c)
{
**pptr = c;
(*pptr)++;
}
/**
* Writes an integer as 2 bytes to an output buffer.
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param anInt the integer to write
*/
void writeInt(uint8_t** pptr, int32_t anInt)
{
**pptr = (uint8_t)(anInt / 256);
(*pptr)++;
**pptr = (uint8_t)(anInt % 256);
(*pptr)++;
}
/**
* Writes a "UTF" string to an output buffer. Converts C string to length-delimited.
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param string the C string to write
*/
void writeCString(uint8_t** pptr, const char* string)
{
int32_t len = strlen(string);
writeInt(pptr, len);
memcpy(*pptr, string, len);
*pptr += len;
}
int32_t getLenStringLen(char* ptr)
{
int32_t len = 256*((uint8_t)(*ptr)) + (uint8_t)(*(ptr+1));
return len;
}
void writeMQTTString(uint8_t** pptr, MQTTString mqttstring)
{
if (mqttstring.lenstring.len > 0)
{
writeInt(pptr, mqttstring.lenstring.len);
memcpy(*pptr, mqttstring.lenstring.data, mqttstring.lenstring.len);
*pptr += mqttstring.lenstring.len;
}
else if (mqttstring.cstring)
writeCString(pptr, mqttstring.cstring);
else
writeInt(pptr, 0);
}
/**
* @param mqttstring the MQTTString structure into which the data is to be read
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param enddata pointer to the end of the data: do not read beyond
* @return 1 if successful, 0 if not
*/
int32_t readMQTTLenString(MQTTString* mqttstring, uint8_t** pptr, uint8_t* enddata)
{
int32_t rc = 0;
FUNC_ENTRY;
/* the first two bytes are the length of the string */
if (enddata - (*pptr) > 1) /* enough length to read the integer? */
{
mqttstring->lenstring.len = readInt(pptr); /* increments pptr to point past length */
if (&(*pptr)[mqttstring->lenstring.len] <= enddata)
{
mqttstring->lenstring.data = (char*)*pptr;
*pptr += mqttstring->lenstring.len;
rc = 1;
}
}
mqttstring->cstring = NULL;
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Return the length of the MQTTstring - C string if there is one, otherwise the length delimited string
* @param mqttstring the string to return the length of
* @return the length of the string
*/
int32_t MQTTstrlen(MQTTString mqttstring)
{
int rc = 0;
if (mqttstring.cstring)
rc = strlen(mqttstring.cstring);
else
rc = mqttstring.lenstring.len;
return rc;
}
/**
* Compares an MQTTString to a C string
* @param a the MQTTString to compare
* @param bptr the C string to compare
* @return boolean - equal or not
*/
int32_t MQTTPacket_equals(MQTTString* a, char* bptr)
{
int32_t alen = 0, blen = 0;
char *aptr;
if (a->cstring)
{
aptr = a->cstring;
alen = strlen(a->cstring);
}
else
{
aptr = a->lenstring.data;
alen = a->lenstring.len;
}
blen = strlen(bptr);
return (alen == blen) && (strncmp(aptr, bptr, alen) == 0);
}
/**
* Helper function to read packet data from some source into a buffer
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param getfn pointer to a function which will read any number of bytes from the needed source
* @return integer MQTT packet type, or -1 on error
* @note the whole message must fit into the caller's buffer
*/
int32_t MQTTPacket_read(uint8_t* buf, int32_t buflen, int32_t (*getfn)(uint8_t*, int32_t))
{
int32_t rc = -1;
MQTTHeader header = {0};
int32_t len = 0;
int32_t rem_len = 0;
/* 1. read the header byte. This has the packet type in it */
if ((*getfn)(buf, 1) != 1)
goto exit;
len = 1;
/* 2. read the remaining length. This is variable in itself */
MQTTPacket_decode(getfn, &rem_len);
len += MQTTPacket_encode(buf + 1, rem_len); /* put the original remaining length back into the buffer */
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
if((rem_len + len) > buflen)
goto exit;
if ((*getfn)(buf + len, rem_len) != rem_len)
goto exit;
header.byte = buf[0];
rc = header.bits.type;
exit:
return rc;
}
/**
* Decodes the message length according to the MQTT algorithm, non-blocking
* @param trp pointer to a transport structure holding what is needed to solve getting data from it
* @param value the decoded length returned
* @return integer the number of bytes read from the socket, 0 for call again, or -1 on error
*/
static int32_t MQTTPacket_decodenb(MQTTTransport *trp)
{
uint8_t c;
int32_t rc = MQTTPACKET_READ_ERROR;
FUNC_ENTRY;
if (trp->len == 0)
{ /* initialize on first call */
trp->multiplier = 1;
trp->rem_len = 0;
}
do
{
int32_t frc;
if (trp->len >= MAX_NO_OF_REMAINING_LENGTH_BYTES)
goto exit;
if ((frc=(*trp->getfn)(trp->sck, &c, 1)) == -1)
goto exit;
if (frc == 0)
{
rc = 0;
goto exit;
}
++(trp->len);
trp->rem_len += (c & 127) * trp->multiplier;
trp->multiplier *= 128;
} while ((c & 128) != 0);
rc = trp->len;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Helper function to read packet data from some source into a buffer, non-blocking
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param trp pointer to a transport structure holding what is needed to solve getting data from it
* @return integer MQTT packet type, 0 for call again, or -1 on error
* @note the whole message must fit into the caller's buffer
*/
int32_t MQTTPacket_readnb(uint8_t* buf, int32_t buflen, MQTTTransport *trp)
{
int32_t rc = -1, frc;
MQTTHeader header = {0};
switch (trp->state)
{
default:
trp->state = 0;
/*FALLTHROUGH*/
case 0:
/* read the header byte. This has the packet type in it */
if ((frc=(*trp->getfn)(trp->sck, buf, 1)) == -1)
goto exit;
if (frc == 0)
return 0;
trp->len = 0;
++trp->state;
/*FALLTHROUGH*/
/* read the remaining length. This is variable in itself */
case 1:
if ((frc=MQTTPacket_decodenb(trp)) == MQTTPACKET_READ_ERROR)
goto exit;
if (frc == 0)
return 0;
trp->len = 1 + MQTTPacket_encode(buf + 1, trp->rem_len); /* put the original remaining length back into the buffer */
if ((trp->rem_len + trp->len) > buflen)
goto exit;
++trp->state;
/*FALLTHROUGH*/
case 2:
/* read the rest of the buffer using a callback to supply the rest of the data */
if ((frc=(*trp->getfn)(trp->sck, buf + trp->len, trp->rem_len)) == -1)
goto exit;
if (frc == 0)
return 0;
trp->rem_len -= frc;
trp->len += frc;
if (trp->rem_len)
return 0;
header.byte = buf[0];
rc = header.bits.type;
break;
}
exit:
trp->state = 0;
return rc;
}

@ -0,0 +1,136 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Xiang Rong - 442039 Add makefile to Embedded C client
*******************************************************************************/
#ifndef MQTTPACKET_H_
#define MQTTPACKET_H_
#include <stdint.h>
#if defined(__cplusplus) /* If this is a C++ compiler, use C linkage */
extern "C" {
#endif
#if defined(WIN32_DLL) || defined(WIN64_DLL)
#define DLLImport __declspec(dllimport)
#define DLLExport __declspec(dllexport)
#elif defined(LINUX_SO)
#define DLLImport extern
#define DLLExport __attribute__ ((visibility ("default")))
#else
#define DLLImport
#define DLLExport
#endif
enum errors
{
MQTTPACKET_BUFFER_TOO_SHORT = -2,
MQTTPACKET_READ_ERROR = -1,
MQTTPACKET_READ_COMPLETE
};
enum msgTypes
{
CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE,
SUBACK, UNSUBSCRIBE, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT
};
/**
* Bitfields for the MQTT header byte.
*/
typedef union
{
uint8_t byte; /**< the whole byte */
#if defined(REVERSED)
struct
{
uint8_t type : 4; /**< message type nibble */
uint8_t dup : 1; /**< DUP flag bit */
uint8_t qos : 2; /**< QoS value, 0, 1 or 2 */
uint8_t retain : 1; /**< retained flag bit */
} bits;
#else
struct
{
uint8_t retain : 1; /**< retained flag bit */
uint8_t qos : 2; /**< QoS value, 0, 1 or 2 */
uint8_t dup : 1; /**< DUP flag bit */
uint8_t type : 4; /**< message type nibble */
} bits;
#endif
} MQTTHeader;
typedef struct
{
int32_t len;
char* data;
} MQTTLenString;
typedef struct
{
char* cstring;
MQTTLenString lenstring;
} MQTTString;
#define MQTTString_initializer {NULL, {0, NULL}}
int32_t MQTTstrlen(MQTTString mqttstring);
#include "MQTTConnect.h"
#include "MQTTPublish.h"
#include "MQTTSubscribe.h"
#include "MQTTUnsubscribe.h"
#include "MQTTFormat.h"
int32_t MQTTSerialize_ack(uint8_t* buf, int32_t buflen, uint8_t type, uint8_t dup, uint16_t packetid);
int32_t MQTTDeserialize_ack(uint8_t* packettype, uint8_t* dup, uint16_t* packetid, uint8_t* buf, int32_t buflen);
int32_t MQTTPacket_len(int32_t rem_len);
int32_t MQTTPacket_equals(MQTTString* a, char* b);
int32_t MQTTPacket_encode(uint8_t* buf, int32_t length);
int32_t MQTTPacket_decode(int32_t (*getcharfn)(uint8_t*, int32_t), int32_t* value);
int32_t MQTTPacket_decodeBuf(uint8_t* buf, int32_t* value);
int32_t readInt(uint8_t** pptr);
char readChar(uint8_t** pptr);
void writeChar(uint8_t** pptr, char c);
void writeInt(uint8_t** pptr, int32_t anInt);
int32_t readMQTTLenString(MQTTString* mqttstring, uint8_t** pptr, uint8_t* enddata);
void writeCString(uint8_t** pptr, const char* string);
void writeMQTTString(uint8_t** pptr, MQTTString mqttstring);
DLLExport int32_t MQTTPacket_read(uint8_t* buf, int32_t buflen, int32_t (*getfn)(uint8_t*, int32_t));
typedef struct
{
int32_t (*getfn)(void *, uint8_t*, int32_t); /* must return -1 for error, 0 for call again, or the number of bytes read */
void *sck; /* pointer to whatever the system may use to identify the transport */
int32_t multiplier;
int32_t rem_len;
int32_t len;
char state;
}MQTTTransport;
int32_t MQTTPacket_readnb(uint8_t* buf, int32_t buflen, MQTTTransport *trp);
#ifdef __cplusplus /* If this is a C++ compiler, use C linkage */
}
#endif
#endif /* MQTTPACKET_H_ */

@ -0,0 +1,38 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Xiang Rong - 442039 Add makefile to Embedded C client
*******************************************************************************/
#ifndef MQTTPUBLISH_H_
#define MQTTPUBLISH_H_
#if !defined(DLLImport)
#define DLLImport
#endif
#if !defined(DLLExport)
#define DLLExport
#endif
DLLExport int32_t MQTTSerialize_publish(uint8_t* buf, int32_t buflen, uint8_t dup, uint8_t qos, uint8_t retained, uint16_t packetid,
MQTTString topicName, uint8_t* payload, int32_t payloadlen);
DLLExport int32_t MQTTDeserialize_publish(uint8_t* dup, uint8_t* qos, uint8_t* retained, uint16_t* packetid, MQTTString* topicName,
uint8_t** payload, int32_t* payloadlen, uint8_t* buf, int32_t len);
DLLExport int32_t MQTTSerialize_puback(uint8_t* buf, int32_t buflen, uint16_t packetid);
DLLExport int32_t MQTTSerialize_pubrel(uint8_t* buf, int32_t buflen, uint8_t dup, uint16_t packetid);
DLLExport int32_t MQTTSerialize_pubcomp(uint8_t* buf, int32_t buflen, uint16_t packetid);
#endif /* MQTTPUBLISH_H_ */

@ -0,0 +1,171 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - fix for https://bugs.eclipse.org/bugs/show_bug.cgi?id=453144
*******************************************************************************/
#include "MQTTPacket.h"
#include "StackTrace.h"
#include <string.h>
/**
* Determines the length of the MQTT publish packet that would be produced using the supplied parameters
* @param qos the MQTT QoS of the publish (packetid is omitted for QoS 0)
* @param topicName the topic name to be used in the publish
* @param payloadlen the length of the payload to be sent
* @return the length of buffer needed to contain the serialized version of the packet
*/
int32_t MQTTSerialize_publishLength(uint8_t qos, MQTTString topicName, int32_t payloadlen)
{
int32_t len = 2 + MQTTstrlen(topicName) + payloadlen;
if (qos > 0)
len += 2; /* packetid */
return len;
}
/**
* Serializes the supplied publish data into the supplied buffer, ready for sending
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param dup integer - the MQTT dup flag
* @param qos integer - the MQTT QoS value
* @param retained integer - the MQTT retained flag
* @param packetid integer - the MQTT packet identifier
* @param topicName MQTTString - the MQTT topic in the publish
* @param payload byte buffer - the MQTT publish payload
* @param payloadlen integer - the length of the MQTT payload
* @return the length of the serialized data. <= 0 indicates error
*/
int32_t MQTTSerialize_publish(uint8_t* buf, int32_t buflen, uint8_t dup, uint8_t qos, uint8_t retained, uint16_t packetid,
MQTTString topicName, uint8_t* payload, int32_t payloadlen)
{
uint8_t *ptr = buf;
MQTTHeader header = {0};
int32_t rem_len = 0;
int32_t rc = 0;
FUNC_ENTRY;
if (MQTTPacket_len(rem_len = MQTTSerialize_publishLength(qos, topicName, payloadlen)) > buflen)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.bits.type = PUBLISH;
header.bits.dup = dup;
header.bits.qos = qos;
header.bits.retain = retained;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */;
writeMQTTString(&ptr, topicName);
if (qos > 0)
writeInt(&ptr, packetid);
memcpy(ptr, payload, payloadlen);
ptr += payloadlen;
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Serializes the ack packet into the supplied buffer.
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param type the MQTT packet type
* @param dup the MQTT dup flag
* @param packetid the MQTT packet identifier
* @return serialized length, or error if 0
*/
int32_t MQTTSerialize_ack(uint8_t* buf, int32_t buflen, uint8_t packettype, uint8_t dup, uint16_t packetid)
{
MQTTHeader header = {0};
int32_t rc = 0;
uint8_t *ptr = buf;
FUNC_ENTRY;
if (buflen < 4)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.bits.type = packettype;
header.bits.dup = dup;
header.bits.qos = (packettype == PUBREL) ? 1 : 0;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */
writeInt(&ptr, packetid);
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Serializes a puback packet into the supplied buffer.
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0
*/
int32_t MQTTSerialize_puback(uint8_t* buf, int32_t buflen, uint16_t packetid)
{
return MQTTSerialize_ack(buf, buflen, PUBACK, 0, packetid);
}
/**
* Serializes a pubrel packet into the supplied buffer.
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param dup integer - the MQTT dup flag
* @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0
*/
int32_t MQTTSerialize_pubrel(uint8_t* buf, int32_t buflen, uint8_t dup, uint16_t packetid)
{
return MQTTSerialize_ack(buf, buflen, PUBREL, dup, packetid);
}
/**
* Serializes a pubrel packet into the supplied buffer.
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0
*/
int32_t MQTTSerialize_pubcomp(uint8_t* buf, int32_t buflen, uint16_t packetid)
{
return MQTTSerialize_ack(buf, buflen, PUBCOMP, 0, packetid);
}

@ -0,0 +1,38 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Xiang Rong - 442039 Add makefile to Embedded C client
*******************************************************************************/
#ifndef MQTTSUBSCRIBE_H_
#define MQTTSUBSCRIBE_H_
#if !defined(DLLImport)
#define DLLImport
#endif
#if !defined(DLLExport)
#define DLLExport
#endif
DLLExport int32_t MQTTSerialize_subscribe(uint8_t* buf, int32_t buflen, uint8_t dup, uint16_t packetid,
int32_t count, MQTTString topicFilters[], int32_t requestedQoSs[]);
DLLExport int32_t MQTTDeserialize_subscribe(uint8_t* dup, uint16_t* packetid,
int32_t maxcount, int32_t* count, MQTTString topicFilters[], int32_t requestedQoSs[], uint8_t* buf, int32_t len);
DLLExport int32_t MQTTSerialize_suback(uint8_t* buf, int32_t buflen, uint16_t packetid, int32_t count, int32_t* grantedQoSs);
DLLExport int32_t MQTTDeserialize_suback(uint16_t* packetid, int32_t maxcount, int32_t* count, int32_t grantedQoSs[], uint8_t* buf, int32_t len);
#endif /* MQTTSUBSCRIBE_H_ */

@ -0,0 +1,140 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "MQTTPacket.h"
#include "StackTrace.h"
#include <string.h>
/**
* Determines the length of the MQTT subscribe packet that would be produced using the supplied parameters
* @param count the number of topic filter strings in topicFilters
* @param topicFilters the array of topic filter strings to be used in the publish
* @return the length of buffer needed to contain the serialized version of the packet
*/
int32_t MQTTSerialize_subscribeLength(int32_t count, MQTTString topicFilters[])
{
int32_t len = 2; /* packetid */
for (int32_t i = 0; i < count; ++i)
len += 2 + MQTTstrlen(topicFilters[i]) + 1; /* length + topic + req_qos */
return len;
}
/**
* Serializes the supplied subscribe data into the supplied buffer, ready for sending
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied bufferr
* @param dup integer - the MQTT dup flag
* @param packetid integer - the MQTT packet identifier
* @param count - number of members in the topicFilters and reqQos arrays
* @param topicFilters - array of topic filter names
* @param requestedQoSs - array of requested QoS
* @return the length of the serialized data. <= 0 indicates error
*/
int32_t MQTTSerialize_subscribe(uint8_t* buf, int32_t buflen, uint8_t dup, uint16_t packetid, int32_t count,
MQTTString topicFilters[], int32_t requestedQoSs[])
{
uint8_t *ptr = buf;
MQTTHeader header = {0};
int32_t rem_len = 0;
int32_t rc = 0;
FUNC_ENTRY;
if (MQTTPacket_len(rem_len = MQTTSerialize_subscribeLength(count, topicFilters)) > buflen)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.byte = 0;
header.bits.type = SUBSCRIBE;
header.bits.dup = dup;
header.bits.qos = 1;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */;
writeInt(&ptr, packetid);
for (int32_t i = 0; i < count; ++i)
{
writeMQTTString(&ptr, topicFilters[i]);
writeChar(&ptr, requestedQoSs[i]);
}
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Deserializes the supplied (wire) buffer into suback data
* @param packetid returned integer - the MQTT packet identifier
* @param maxcount - the maximum number of members allowed in the grantedQoSs array
* @param count returned integer - number of members in the grantedQoSs array
* @param grantedQoSs returned array of integers - the granted qualities of service
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure
*/
int32_t MQTTDeserialize_suback(uint16_t* packetid, int32_t maxcount, int32_t* count, int32_t grantedQoSs[], uint8_t* buf, int32_t buflen)
{
MQTTHeader header = {0};
uint8_t* curdata = buf;
uint8_t* enddata = NULL;
int32_t rc = 0;
int32_t mylen;
FUNC_ENTRY;
header.byte = readChar(&curdata);
if (header.bits.type != SUBACK)
goto exit;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen;
if (enddata - curdata < 2)
goto exit;
*packetid = readInt(&curdata);
*count = 0;
while (curdata < enddata)
{
if (*count > maxcount)
{
rc = -1;
goto exit;
}
grantedQoSs[(*count)++] = readChar(&curdata);
}
rc = 1;
exit:
FUNC_EXIT_RC(rc);
return rc;
}

@ -0,0 +1,114 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "MQTTPacket.h"
#include "StackTrace.h"
#include <string.h>
/**
* Deserializes the supplied (wire) buffer into subscribe data
* @param dup integer returned - the MQTT dup flag
* @param packetid integer returned - the MQTT packet identifier
* @param maxcount - the maximum number of members allowed in the topicFilters and requestedQoSs arrays
* @param count - number of members in the topicFilters and requestedQoSs arrays
* @param topicFilters - array of topic filter names
* @param requestedQoSs - array of requested QoS
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @return the length of the serialized data. <= 0 indicates error
*/
int32_t MQTTDeserialize_subscribe(uint8_t* dup, uint16_t* packetid, int32_t maxcount, int32_t* count, MQTTString topicFilters[],
int32_t requestedQoSs[], uint8_t* buf, int32_t buflen)
{
MQTTHeader header = {0};
uint8_t* curdata = buf;
uint8_t* enddata = NULL;
int32_t rc = -1;
int32_t mylen = 0;
FUNC_ENTRY;
header.byte = readChar(&curdata);
if (header.bits.type != SUBSCRIBE)
goto exit;
*dup = header.bits.dup;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen;
*packetid = readInt(&curdata);
*count = 0;
while (curdata < enddata)
{
if (!readMQTTLenString(&topicFilters[*count], &curdata, enddata))
goto exit;
if (curdata >= enddata) /* do we have enough data to read the req_qos version byte? */
goto exit;
requestedQoSs[*count] = readChar(&curdata);
(*count)++;
}
rc = 1;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Serializes the supplied suback data into the supplied buffer, ready for sending
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param packetid integer - the MQTT packet identifier
* @param count - number of members in the grantedQoSs array
* @param grantedQoSs - array of granted QoS
* @return the length of the serialized data. <= 0 indicates error
*/
int32_t MQTTSerialize_suback(uint8_t* buf, int32_t buflen, uint16_t packetid, int32_t count, int32_t* grantedQoSs)
{
MQTTHeader header = {0};
int32_t rc = -1;
uint8_t *ptr = buf;
FUNC_ENTRY;
if (buflen < 2 + count)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.byte = 0;
header.bits.type = SUBACK;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, 2 + count); /* write remaining length */
writeInt(&ptr, packetid);
for (int32_t i = 0; i < count; ++i)
writeChar(&ptr, grantedQoSs[i]);
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}

@ -0,0 +1,37 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Xiang Rong - 442039 Add makefile to Embedded C client
*******************************************************************************/
#ifndef MQTTUNSUBSCRIBE_H_
#define MQTTUNSUBSCRIBE_H_
#if !defined(DLLImport)
#define DLLImport
#endif
#if !defined(DLLExport)
#define DLLExport
#endif
DLLExport int32_t MQTTSerialize_unsubscribe(uint8_t* buf, int32_t buflen, uint8_t dup, uint16_t packetid,
int32_t count, MQTTString topicFilters[]);
DLLExport int32_t MQTTDeserialize_unsubscribe(uint8_t* dup, uint16_t* packetid, int32_t max_count, int32_t* count, MQTTString topicFilters[],
uint8_t* buf, int32_t len);
DLLExport int32_t MQTTSerialize_unsuback(uint8_t* buf, int32_t buflen, uint16_t packetid);
DLLExport int32_t MQTTDeserialize_unsuback(uint16_t* packetid, uint8_t* buf, int32_t len);
#endif /* MQTTUNSUBSCRIBE_H_ */

@ -0,0 +1,107 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "MQTTPacket.h"
#include "StackTrace.h"
#include <string.h>
/**
* Determines the length of the MQTT unsubscribe packet that would be produced using the supplied parameters
* @param count the number of topic filter strings in topicFilters
* @param topicFilters the array of topic filter strings to be used in the publish
* @return the length of buffer needed to contain the serialized version of the packet
*/
int32_t MQTTSerialize_unsubscribeLength(int32_t count, MQTTString topicFilters[])
{
int32_t len = 2; /* packetid */
for (int32_t i = 0; i < count; ++i)
len += 2 + MQTTstrlen(topicFilters[i]); /* length + topic*/
return len;
}
/**
* Serializes the supplied unsubscribe data into the supplied buffer, ready for sending
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @param dup integer - the MQTT dup flag
* @param packetid integer - the MQTT packet identifier
* @param count - number of members in the topicFilters array
* @param topicFilters - array of topic filter names
* @return the length of the serialized data. <= 0 indicates error
*/
int32_t MQTTSerialize_unsubscribe(uint8_t* buf, int32_t buflen, uint8_t dup, uint16_t packetid,
int32_t count, MQTTString topicFilters[])
{
uint8_t *ptr = buf;
MQTTHeader header = {0};
int32_t rem_len = 0;
int32_t rc = -1;
FUNC_ENTRY;
if (MQTTPacket_len(rem_len = MQTTSerialize_unsubscribeLength(count, topicFilters)) > buflen)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.byte = 0;
header.bits.type = UNSUBSCRIBE;
header.bits.dup = dup;
header.bits.qos = 1;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, rem_len); /* write remaining length */;
writeInt(&ptr, packetid);
for (int32_t i = 0; i < count; ++i)
writeMQTTString(&ptr, topicFilters[i]);
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Deserializes the supplied (wire) buffer into unsuback data
* @param packetid returned integer - the MQTT packet identifier
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure
*/
int32_t MQTTDeserialize_unsuback(uint16_t* packetid, uint8_t* buf, int32_t buflen)
{
uint8_t type = 0;
uint8_t dup = 0;
int32_t rc = 0;
FUNC_ENTRY;
rc = MQTTDeserialize_ack(&type, &dup, packetid, buf, buflen);
if (type == UNSUBACK)
rc = 1;
FUNC_EXIT_RC(rc);
return rc;
}

@ -0,0 +1,100 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "MQTTPacket.h"
#include "StackTrace.h"
#include <string.h>
/**
* Deserializes the supplied (wire) buffer into unsubscribe data
* @param dup integer returned - the MQTT dup flag
* @param packetid integer returned - the MQTT packet identifier
* @param maxcount - the maximum number of members allowed in the topicFilters and requestedQoSs arrays
* @param count - number of members in the topicFilters and requestedQoSs arrays
* @param topicFilters - array of topic filter names
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @return the length of the serialized data. <= 0 indicates error
*/
int32_t MQTTDeserialize_unsubscribe(uint8_t* dup, uint16_t* packetid, int32_t maxcount, int32_t* count, MQTTString topicFilters[],
uint8_t* buf, int32_t len)
{
MQTTHeader header = {0};
uint8_t* curdata = buf;
uint8_t* enddata = NULL;
int32_t rc = 0;
int32_t mylen = 0;
FUNC_ENTRY;
header.byte = readChar(&curdata);
if (header.bits.type != UNSUBSCRIBE)
goto exit;
*dup = header.bits.dup;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen;
*packetid = readInt(&curdata);
*count = 0;
while (curdata < enddata)
{
if (!readMQTTLenString(&topicFilters[*count], &curdata, enddata))
goto exit;
(*count)++;
}
rc = 1;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Serializes the supplied unsuback data into the supplied buffer, ready for sending
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param packetid integer - the MQTT packet identifier
* @return the length of the serialized data. <= 0 indicates error
*/
int32_t MQTTSerialize_unsuback(uint8_t* buf, int32_t buflen, uint16_t packetid)
{
MQTTHeader header = {0};
int32_t rc = 0;
uint8_t *ptr = buf;
FUNC_ENTRY;
if (buflen < 2)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.byte = 0;
header.bits.type = UNSUBACK;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */
writeInt(&ptr, packetid);
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}

@ -0,0 +1,78 @@
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - fix for bug #434081
*******************************************************************************/
#ifndef STACKTRACE_H_
#define STACKTRACE_H_
#include <stdio.h>
#define NOSTACKTRACE 1
#if defined(NOSTACKTRACE)
#define FUNC_ENTRY
#define FUNC_ENTRY_NOLOG
#define FUNC_ENTRY_MED
#define FUNC_ENTRY_MAX
#define FUNC_EXIT
#define FUNC_EXIT_NOLOG
#define FUNC_EXIT_MED
#define FUNC_EXIT_MAX
#define FUNC_EXIT_RC(x)
#define FUNC_EXIT_MED_RC(x)
#define FUNC_EXIT_MAX_RC(x)
#else
#if defined(WIN32)
#define inline __inline
#define FUNC_ENTRY StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MINIMUM)
#define FUNC_ENTRY_NOLOG StackTrace_entry(__FUNCTION__, __LINE__, -1)
#define FUNC_ENTRY_MED StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MEDIUM)
#define FUNC_ENTRY_MAX StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MAXIMUM)
#define FUNC_EXIT StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MINIMUM)
#define FUNC_EXIT_NOLOG StackTrace_exit(__FUNCTION__, __LINE__, -1)
#define FUNC_EXIT_MED StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MEDIUM)
#define FUNC_EXIT_MAX StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MAXIMUM)
#define FUNC_EXIT_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MINIMUM)
#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MEDIUM)
#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MAXIMUM)
#else
#define FUNC_ENTRY StackTrace_entry(__func__, __LINE__, TRACE_MINIMUM)
#define FUNC_ENTRY_NOLOG StackTrace_entry(__func__, __LINE__, -1)
#define FUNC_ENTRY_MED StackTrace_entry(__func__, __LINE__, TRACE_MEDIUM)
#define FUNC_ENTRY_MAX StackTrace_entry(__func__, __LINE__, TRACE_MAXIMUM)
#define FUNC_EXIT StackTrace_exit(__func__, __LINE__, NULL, TRACE_MINIMUM)
#define FUNC_EXIT_NOLOG StackTrace_exit(__func__, __LINE__, NULL, -1)
#define FUNC_EXIT_MED StackTrace_exit(__func__, __LINE__, NULL, TRACE_MEDIUM)
#define FUNC_EXIT_MAX StackTrace_exit(__func__, __LINE__, NULL, TRACE_MAXIMUM)
#define FUNC_EXIT_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MINIMUM)
#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MEDIUM)
#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MAXIMUM)
void StackTrace_entry(const char* name, int32_t line, int32_t trace);
void StackTrace_exit(const char* name, int32_t line, void* return_value, int32_t trace);
void StackTrace_printStack(FILE* dest);
char* StackTrace_get(unsigned long);
#endif
#endif
#endif /* STACKTRACE_H_ */

@ -0,0 +1,79 @@
#include "mqtt_interface.h"
#include "wizchip_conf.h"
#include "socket.h"
//#include <terminal_io.h>
#include <stdint.h>
/*
uint32_t MilliTimer;
void MilliTimer_Handler(void)
{
MilliTimer++;
}
*/
int8_t expired(Timer* timer)
{
int32_t left = (timer->end_time) - millis();
return (left < 0);
}
void countdown_ms(Timer* timer, uint32_t timeout)
{
timer->end_time = millis() + timeout;
}
void countdown(Timer* timer, uint32_t timeout)
{
timer->end_time = millis() + (timeout * 1000UL);
}
int32_t left_ms(Timer* timer)
{
int32_t left = timer->end_time - millis();
return (left < 0) ? 0 : left;
}
void InitTimer(Timer* timer)
{
timer->end_time = 0;
}
void NewNetwork(Network* n)
{
//n->my_socket = 0; //initialized outside actually..
n->mqttread = w5500_read;
n->mqttwrite = w5500_write;
n->disconnect = w5500_disconnect;
}
int32_t w5500_read(Network* n, uint8_t* buffer, int32_t len, int32_t timeout_ms)
{
if ((getSn_SR(n->my_socket) == SOCK_ESTABLISHED) && (getSn_RX_RSR(n->my_socket) > 0))
return recv(n->my_socket, buffer, len);
return 0;
}
int32_t w5500_write(Network* n, uint8_t* buffer, int32_t len, int32_t timeout_ms)
{
if (getSn_SR(n->my_socket) == SOCK_ESTABLISHED)
return send(n->my_socket, buffer, len);
return 0;
}
void w5500_disconnect(Network* n)
{
disconnect(n->my_socket);
}
int32_t ConnectNetwork(Network* n, uint8_t* ip, uint16_t port)
{
socket(n->my_socket, Sn_MR_TCP, 12345, 0);
connect(n->my_socket, ip, port);
return 0;
}

@ -0,0 +1,42 @@
#ifndef __MQTT_INTERFACE_H_
#define __MQTT_INTERFACE_H_
#include <stdint.h>
#include "../../globals.h"
typedef struct Timer Timer;
struct Timer
{
uint32_t systick_period;
uint32_t end_time;
};
typedef struct Network Network;
struct Network
{
int32_t my_socket;
int32_t (*mqttread) (Network*, uint8_t*, int32_t, int32_t);
int32_t (*mqttwrite) (Network*, uint8_t*, int32_t, int32_t);
void (*disconnect) (Network*);
};
void InitTimer(Timer*);
/*
void MilliTimer_Handler(void);
*/
int8_t expired(Timer*);
void countdown_ms(Timer*, uint32_t);
void countdown(Timer*, uint32_t);
int32_t left_ms(Timer*);
int32_t w5500_read(Network*, uint8_t*, int32_t, int32_t);
int32_t w5500_write(Network*, uint8_t*, int32_t, int32_t);
void w5500_disconnect(Network*);
void NewNetwork(Network*);
int32_t ConnectNetwork(Network*, uint8_t*, uint16_t);
#endif

@ -14,6 +14,8 @@ wiz_NetInfo netInfo = { .mac = {0x00, 0x08, 0xdc, 0xab, 0xcd, 0xef}, // Mac add
.dns = {8,8,8,8}, // DNS address (google dns)
.gw = {192, 168, 0, 1}, // Gateway address
.dhcp = NETINFO_STATIC}; //Static IP configuration
uint8_t MQTT_targetIP[4] = {192, 168, 0, 100}; // IP брокера MQTT
#else
//NIC metrics for another PC (second IP configuration)
wiz_NetInfo netInfo = { .mac = {0x00, 0x08, 0xdc, 0xab, 0xcd, 0xef}, // Mac address
@ -22,5 +24,6 @@ wiz_NetInfo netInfo = { .mac = {0x00, 0x08, 0xdc, 0xab, 0xcd, 0xef}, // Mac add
.dns = {8,8,8,8}, // DNS address (google dns)
.gw = {192, 168, 1, 1}, // Gateway address
.dhcp = NETINFO_STATIC}; //Static IP configuration
uint8_t MQTT_targetIP[4] = {192, 168, 1, 81}; // IP брокера MQTT
#endif

@ -41,6 +41,8 @@ static FATFS Fatfs; //File system object for each logical drive. >= 2
#define PRINTF(...)
#endif
#define SPRINTF(__S, FORMAT, args...) sprintf_P(__S, PSTR(FORMAT),##args)
#define IP_WORK
//SPI CLOCK 4 or 8Mhz
@ -67,6 +69,7 @@ extern const char compile_time[] PROGMEM;
extern const char str_prog_name[] PROGMEM;
extern wiz_NetInfo netInfo;
extern uint8_t MQTT_targetIP[4];
#define CHK_RAM_LEAKAGE
#define CHK_UPTIME

@ -21,12 +21,15 @@
#include "Ethernet/wizchip_conf.h"
#include "Application/loopback/loopback.h"
#include "Internet/MQTT/mqtt_interface.h"
#include "Internet/MQTT/MQTTClient.h"
#define _MAIN_DEBUG_
/*
* 22. MQTT + Mosquitto in LAN test
*
* Used code from:
* Used as base code from:
* Nadyrshin Ruslan - MQTTPacket (MQTT client/server v3.1.1 adapted for AVR MCU).
* YouTube-channel: https://www.youtube.com/channel/UChButpZaL5kUUl_zTyIDFkQ
*
@ -56,7 +59,7 @@ volatile unsigned long _millis; // for millis tick !! Overflow every ~49.7 days
//*********Program metrics
const char compile_date[] PROGMEM = __DATE__; // Mmm dd yyyy - Дата компиляции
const char compile_time[] PROGMEM = __TIME__; // hh:mm:ss - Время компиляции
const char str_prog_name[] PROGMEM = "\r\nAtMega1284p v1.0alpha Static IP MQTT && Loop-back WIZNET_5500 ETHERNET 05/04/2019\r\n"; // Program name
const char str_prog_name[] PROGMEM = "\r\nAtMega1284p v1.1 Static IP MQTT && Loop-back WIZNET_5500 ETHERNET 06/04/2019\r\n"; // Program name
#if defined(__AVR_ATmega128__)
const char PROGMEM str_mcu[] = "ATmega128"; //CPU is m128
@ -76,6 +79,37 @@ const char PROGMEM str_mcu[] = "ATmega1284p"; //CPU is m1284p
const char PROGMEM str_mcu[] = "Unknown CPU"; //CPU is unknown
#endif
//******************* MQTT: BEGIN
#define SOCK_MQTT 2
// Receive Buffer
#define MQTT_BUFFER_SIZE 512 // 2048
uint8_t mqtt_readBuffer[MQTT_BUFFER_SIZE];
volatile uint16_t mes_id;
//MQTT subscribe call-back is here
void messageArrived(MessageData* md)
{
char _topic_name[64] = "\0";
char _message[128] = "\0";
MQTTMessage* message = md->message;
MQTTString* topic = md->topicName;
strncpy(_topic_name, topic->lenstring.data, topic->lenstring.len);
strncpy(_message, message->payload, message->payloadlen);
PRINTF("<<MQTT Sub: [%s] %s", _topic_name , _message);
//md->topicName->
/*
for (uint8_t i = 0; i < md->topicName->lenstring.len; i++)
putchar(*(md->topicName->lenstring.data + i));
printf(" (%.*s)\r\n", (int32_t)message->payloadlen, (char*)message->payload);
*/
}
//******************* MQTT: END
//FUNC headers
static void avr_init(void);
@ -263,35 +297,6 @@ void IO_LIBRARY_Init(void) {
}
//***************** WIZCHIP INIT: END
/*
void spi_speed_tst(void)
{
// Here on SPI pins: MOSI 400Khz freq out, on SCLK 3.2MhzOUT
while(1)
{
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
SPI_WRITE(0xF0);
}
}
*/
int main()
{
//uint8_t prev_sw1 = 1; // VAR for sw1 pressing detect
@ -324,11 +329,52 @@ int main()
wdt_reset();
}
//****************MQTT client initialize
//Find MQTT broker and connect with it
uint8_t mqtt_buf[100];
int32_t mqtt_rc = 0;
Network mqtt_network;
Client mqtt_client;
mqtt_network.my_socket = SOCK_MQTT;
// Можно определить IP узла по DNS-имени, IP узла будет в массиве targetIP
//DNS_init(1, tempBuffer);
//DNS_run(gWIZNETINFO.dns, "test.mosquitto.org", targetIP);
PRINTF(">>Trying connect to MQTT broker: %d.%d.%d.%d ..\r\n", MQTT_targetIP[0], MQTT_targetIP[1], MQTT_targetIP[2], MQTT_targetIP[3]);
NewNetwork(&mqtt_network);
ConnectNetwork(&mqtt_network, MQTT_targetIP, 1883);
MQTTClient(&mqtt_client, &mqtt_network, 1000, mqtt_buf, 100, mqtt_readBuffer, MQTT_BUFFER_SIZE);
//Connection to MQTT broker
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
data.willFlag = 0;
data.MQTTVersion = 4;//3;
data.clientID.cstring = (char*)"w5500_avr_client";
data.username.cstring = (char*)"user1234";
data.password.cstring = (char*)"\0";
data.keepAliveInterval = 60;
data.cleansession = 1;
mqtt_rc = MQTTConnect(&mqtt_client, &data);
if (mqtt_rc == SUCCESSS)
{
PRINTF("++MQTT Connected SUCCESS: %ld\r\n", mqtt_rc);
}
else
{
PRINTF("--MQTT Connected ERROR: %ld\r\n", mqtt_rc);
while(1);//Reboot the board
}
// Subscribe to all topics
char SubString[] = "/#";
mqtt_rc = MQTTSubscribe(&mqtt_client, SubString, QOS0, messageArrived);
PRINTF("Subscribed (%s) %d\r\n", SubString, mqtt_rc);
/* Loopback Test: TCP Server and UDP */
// Test for Ethernet data transfer validation
uint32_t timer_link_1sec = millis();
uint32_t timer_uptime_60sec = millis();
uint32_t timer_mqtt_pub_10sec = millis();
while(1)
{
//Here at least every 1sec
@ -337,12 +383,36 @@ int main()
//Use Hercules Terminal to check loopback tcp:5000 and udp:3000
/*
* https://www.hw-group.com/software/hercules-setup-utility
* */
*
*/
loopback_tcps(SOCK_TCPS,ethBuf0,PORT_TCPS);
loopback_udps(SOCK_UDPS,ethBuf0,PORT_UDPS);
//loopback_ret = loopback_tcpc(SOCK_TCPS, gDATABUF, destip, destport);
//if(loopback_ret < 0) printf("loopback ret: %ld\r\n", loopback_ret); // TCP Socket Error code
// MQTT pub event every 10 sec
if((millis()-timer_mqtt_pub_10sec)> 10000)
{
static uint32_t mqtt_pub_count = 0;
//here every 10 sec
timer_mqtt_pub_10sec = millis();
char _msg[64];
//Every 10sec push message: "Uptime: xxx sec; Free RAM: xxxxx bytes", to BLYNK server (widget Terminal)
int len = SPRINTF(_msg, "Uptime: %lu sec; Free RAM: %d bytes\r\n", millis()/1000, freeRam());
if(len > 0)
{
PRINTF(">>MQTT pub msg №%lu\r\n", ++mqtt_pub_count);
MQTTMessage pubMessage;
pubMessage.qos = QOS0;
pubMessage.id = mes_id++;
pubMessage.payloadlen = (size_t)len;
pubMessage.payload = _msg;
MQTTPublish(&mqtt_client, "/w5500_avr_dbg", &pubMessage);
}
}
// MQTT broker connection and sub receive
MQTTYield(&mqtt_client, 100);//~100msec blocking here
if((millis()-timer_link_1sec)> 1000)
{