186 lines
7.0 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "ml307_mqtt.h"
#include <esp_log.h>
static const char *TAG = "Ml307Mqtt";
Ml307Mqtt::Ml307Mqtt(Ml307AtModem& modem, int mqtt_id) : modem_(modem), mqtt_id_(mqtt_id) {
event_group_handle_ = xEventGroupCreate();
command_callback_it_ = modem_.RegisterCommandResponseCallback([this](const std::string command, const std::vector<AtArgumentValue>& arguments) {
if (command == "MQTTURC" && arguments.size() >= 2) {
if (arguments[1].int_value == mqtt_id_) {
auto type = arguments[0].string_value;
if (type == "conn") {
if (arguments[2].int_value == 0) {
xEventGroupSetBits(event_group_handle_, MQTT_CONNECTED_EVENT);
} else {
if (connected_) {
connected_ = false;
if (on_disconnected_callback_) {
on_disconnected_callback_();
}
}
xEventGroupSetBits(event_group_handle_, MQTT_DISCONNECTED_EVENT);
}
ESP_LOGI(TAG, "MQTT connection state: %s", ErrorToString(arguments[2].int_value).c_str());
} else if (type == "suback") {
} else if (type == "publish" && arguments.size() >= 7) {
auto topic = arguments[3].string_value;
if (arguments[4].int_value == arguments[5].int_value) {
if (on_message_callback_) {
on_message_callback_(topic, modem_.DecodeHex(arguments[6].string_value));
}
} else {
message_payload_.append(modem_.DecodeHex(arguments[6].string_value));
if (message_payload_.size() >= arguments[4].int_value && on_message_callback_) {
on_message_callback_(topic, message_payload_);
message_payload_.clear();
}
}
} else {
ESP_LOGI(TAG, "unhandled MQTT event: %s", type.c_str());
}
}
} else if (command == "MQTTSTATE" && arguments.size() == 1) {
connected_ = arguments[0].int_value != 3;
xEventGroupSetBits(event_group_handle_, MQTT_INITIALIZED_EVENT);
}
});
}
Ml307Mqtt::~Ml307Mqtt() {
modem_.UnregisterCommandResponseCallback(command_callback_it_);
vEventGroupDelete(event_group_handle_);
}
bool Ml307Mqtt::Connect(const std::string broker_address, int broker_port, const std::string client_id, const std::string username, const std::string password) {
broker_address_ = broker_address;
broker_port_ = broker_port;
client_id_ = client_id;
username_ = username;
password_ = password;
EventBits_t bits;
if (IsConnected()) {
// 断开之前的连接
Disconnect();
bits = xEventGroupWaitBits(event_group_handle_, MQTT_DISCONNECTED_EVENT, pdTRUE, pdFALSE, pdMS_TO_TICKS(MQTT_CONNECT_TIMEOUT_MS));
if (!(bits & MQTT_DISCONNECTED_EVENT)) {
ESP_LOGE(TAG, "Failed to disconnect from previous connection");
return false;
}
}
if (broker_port_ == 8883) {
if (!modem_.Command(std::string("AT+MQTTCFG=\"ssl\",") + std::to_string(mqtt_id_) + ",1")) {
ESP_LOGE(TAG, "Failed to set MQTT to use SSL");
return false;
}
}
// Set clean session
if (!modem_.Command(std::string("AT+MQTTCFG=\"clean\",") + std::to_string(mqtt_id_) + ",1")) {
ESP_LOGE(TAG, "Failed to set MQTT clean session");
return false;
}
// Set keep alive
if (!modem_.Command(std::string("AT+MQTTCFG=\"pingreq\",") + std::to_string(mqtt_id_) + "," + std::to_string(keep_alive_seconds_))) {
ESP_LOGE(TAG, "Failed to set MQTT keep alive");
return false;
}
// Set HEX encoding
if (!modem_.Command("AT+MQTTCFG=\"encoding\"," + std::to_string(mqtt_id_) + ",1,1")) {
ESP_LOGE(TAG, "Failed to set MQTT to use HEX encoding");
return false;
}
// 创建MQTT连接
std::string command = "AT+MQTTCONN=" + std::to_string(mqtt_id_) + ",\"" + broker_address_ + "\"," + std::to_string(broker_port_) + ",\"" + client_id_ + "\",\"" + username_ + "\",\"" + password_ + "\"";
if (!modem_.Command(command)) {
ESP_LOGE(TAG, "Failed to create MQTT connection");
return false;
}
// 等待连接完成
bits = xEventGroupWaitBits(event_group_handle_, MQTT_CONNECTED_EVENT | MQTT_DISCONNECTED_EVENT, pdTRUE, pdFALSE, pdMS_TO_TICKS(MQTT_CONNECT_TIMEOUT_MS));
if (!(bits & MQTT_CONNECTED_EVENT)) {
ESP_LOGE(TAG, "Failed to connect to MQTT broker");
return false;
}
connected_ = true;
if (on_connected_callback_) {
on_connected_callback_();
}
return true;
}
bool Ml307Mqtt::IsConnected() {
// 检查这个 id 是否已经连接
modem_.Command(std::string("AT+MQTTSTATE=") + std::to_string(mqtt_id_));
auto bits = xEventGroupWaitBits(event_group_handle_, MQTT_INITIALIZED_EVENT, pdTRUE, pdFALSE, pdMS_TO_TICKS(MQTT_CONNECT_TIMEOUT_MS));
if (!(bits & MQTT_INITIALIZED_EVENT)) {
ESP_LOGE(TAG, "Failed to initialize MQTT connection");
return false;
}
return connected_;
}
void Ml307Mqtt::Disconnect() {
if (!connected_) {
return;
}
modem_.Command(std::string("AT+MQTTDISC=") + std::to_string(mqtt_id_));
}
bool Ml307Mqtt::Publish(const std::string topic, const std::string payload, int qos) {
if (!connected_) {
return false;
}
std::string command = "AT+MQTTPUB=" + std::to_string(mqtt_id_) + ",\"" + topic + "\",";
command += std::to_string(qos) + ",0,0,";
command += std::to_string(payload.size()) + "," + modem_.EncodeHex(payload);
return modem_.Command(command);
}
bool Ml307Mqtt::Subscribe(const std::string topic, int qos) {
if (!connected_) {
return false;
}
std::string command = "AT+MQTTSUB=" + std::to_string(mqtt_id_) + ",\"" + topic + "\"," + std::to_string(qos);
return modem_.Command(command);
}
bool Ml307Mqtt::Unsubscribe(const std::string topic) {
if (!connected_) {
return false;
}
std::string command = "AT+MQTTUNSUB=" + std::to_string(mqtt_id_) + ",\"" + topic + "\"";
return modem_.Command(command);
}
std::string Ml307Mqtt::ErrorToString(int error_code) {
switch (error_code) {
case 0:
return "连接成功";
case 1:
return "正在重连";
case 2:
return "断开:用户主动断开";
case 3:
return "断开:拒绝连接(协议版本、标识符、用户名或密码错误)";
case 4:
return "断开:服务器断开";
case 5:
return "断开Ping包超时断开";
case 6:
return "断开:网络异常断开";
case 255:
return "断开:未知错误";
default:
return "未知错误";
}
}