Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for Azure IoT Hub #11906

Merged
merged 1 commit into from
Apr 27, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions tasmota/xdrv_02_mqtt.ino
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
#define MQTT_WIFI_CLIENT_TIMEOUT 200 // Wifi TCP connection timeout (default is 5000 mSec)
#endif

#if defined(USE_MQTT_AZURE_IOT)
#include <JsonParser.h>
#undef MQTT_PORT
#define MQTT_PORT 8883
#endif //USE_MQTT_AZURE_IOT

#define USE_MQTT_NEW_PUBSUBCLIENT

// #define DEBUG_DUMP_TLS // allow dumping of TLS Flash keys
Expand Down Expand Up @@ -225,7 +231,14 @@ void MqttDisconnect(void) {
}

void MqttSubscribeLib(const char *topic) {
MqttClient.subscribe(topic);
#if defined(USE_MQTT_AZURE_IOT)
// Azure IoT Hub currently does not support custom topics: https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-mqtt-support
String realTopicString = "devices/" + String(SettingsText(SET_MQTT_CLIENT));
realTopicString += "/messages/devicebound/#";
MqttClient.subscribe(realTopicString.c_str());
#else //USE_MQTT_AZURE_IOT
Comment on lines +234 to +239
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with the code, however identation does not follow the Tasmota convention. The ifdefs start at the beginning of the line, and the code is identated as usual.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I will fix this once merged.

MqttClient.subscribe(topic);
#endif //USE_MQTT_AZURE_IOT
MqttClient.loop(); // Solve LmacRxBlk:1 messages
}

Expand All @@ -244,7 +257,25 @@ bool MqttPublishLib(const char* topic, bool retained) {
}
}

bool result = MqttClient.publish(topic, TasmotaGlobal.mqtt_data, retained);
bool result;
#if defined(USE_MQTT_AZURE_IOT)
String sourceTopicString = String(topic);
sourceTopicString.replace("/", "%2F");
String topicString = "devices/" + String(SettingsText(SET_MQTT_CLIENT));
topicString+= "/messages/events/topic=" + sourceTopicString;

JsonParser mqtt_message((char*) String(TasmotaGlobal.mqtt_data).c_str());
JsonParserObject message_object = mqtt_message.getRootObject();
if (message_object.isValid()) { // only sending valid JSON, yet this is optional
result = MqttClient.publish(topicString.c_str(), TasmotaGlobal.mqtt_data, retained);
AddLog(LOG_LEVEL_INFO, PSTR(D_LOG_MQTT "Sending '%s'"), TasmotaGlobal.mqtt_data);
} else {
AddLog(LOG_LEVEL_INFO, PSTR(D_LOG_MQTT "Invalid JSON, '%s' for topic '%s', not sending to Azure IoT Hub"), TasmotaGlobal.mqtt_data, topic);
result = true;
}
#else //USE_MQTT_AZURE_IOT
result = MqttClient.publish(topic, TasmotaGlobal.mqtt_data, retained);
#endif //USE_MQTT_AZURE_IOT
yield(); // #3313
return result;
}
Expand Down Expand Up @@ -276,7 +307,24 @@ void MqttDataHandler(char* mqtt_topic, uint8_t* mqtt_data, unsigned int data_len

// Save MQTT data ASAP as it's data is discarded by PubSubClient with next publish as used in MQTTlog
char topic[TOPSZ];
strlcpy(topic, mqtt_topic, sizeof(topic));
#if defined(USE_MQTT_AZURE_IOT)
// for Azure, we read the topic from the property of the message
String fullTopicString = String(mqtt_topic);
int startOfTopic = fullTopicString.indexOf("TOPIC=");
if (startOfTopic == -1){
AddLog(LOG_LEVEL_ERROR, PSTR(D_LOG_MQTT "Azure IoT message without the property TOPIC, case sensitive."));
return;
}
String newTopic = fullTopicString.substring(startOfTopic + 6);
newTopic.replace("%2F", "/");
if (newTopic.indexOf("/") == -1){
AddLog(LOG_LEVEL_ERROR, PSTR(D_LOG_MQTT "Invalid Topic %s"), newTopic.c_str());
return;
}
strlcpy(topic, newTopic.c_str(), sizeof(topic));
#else //USE_MQTT_AZURE_IOT
strlcpy(topic, mqtt_topic, sizeof(topic));
#endif //USE_MQTT_AZURE_IOT
mqtt_data[data_len] = 0;
char data[data_len +1];
memcpy(data, mqtt_data, sizeof(data));
Expand Down Expand Up @@ -698,7 +746,17 @@ void MqttReconnect(void) {
}
#endif

#ifdef USE_MQTT_AZURE_IOT
if (String(mqtt_pwd).indexOf("SharedAccessSignature") == -1) {
AddLog(LOG_LEVEL_ERROR, PSTR(D_LOG_MQTT "Azure IoT incorrect SAS Token format. Refer to https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-mqtt-support"));
return;
}
String azureMqtt_userString = String(SettingsText(SET_MQTT_HOST)) + "/" + String(SettingsText(SET_MQTT_CLIENT)); + "/?api-version=2018-06-30";
if (MqttClient.connect(TasmotaGlobal.mqtt_client, azureMqtt_userString.c_str(), mqtt_pwd, stopic, 1, lwt_retain, TasmotaGlobal.mqtt_data, MQTT_CLEAN_SESSION)) {
#else //USE_MQTT_AZURE_IOT
if (MqttClient.connect(TasmotaGlobal.mqtt_client, mqtt_user, mqtt_pwd, stopic, 1, lwt_retain, TasmotaGlobal.mqtt_data, MQTT_CLEAN_SESSION)) {
#endif //USE_MQTT_AZURE_IOT

#ifdef USE_MQTT_TLS
if (Mqtt.mqtt_tls) {
#ifdef ESP8266
Expand Down