#include "fgc/MqttControlChannel.h" #include "fgc/Logger.h" #include namespace fgc { namespace { constexpr int kQos = 1; const auto kConnectTimeout = std::chrono::seconds(5); const auto kPublishTimeout = std::chrono::seconds(3); } // namespace MqttControlChannel::MqttControlChannel(const std::string& broker_ip, const std::string& tower, const std::string& user, const std::string& password) : tower_(tower), topic_target_hdg_("GGS/FWT/" + tower + "/target_HDG"), topic_control_code_("GGS/FWT/" + tower + "/ControlCode"), topic_status_("GGS/FWT/" + tower + "/StatusCode"), topic_cam_event_("GGS/FWT/" + tower + "/CamEvent"), client_(broker_ip, tower) { conn_opts_.set_keep_alive_interval(20); conn_opts_.set_clean_session(true); conn_opts_.set_automatic_reconnect(true); // replaces the old manual reconnect loop if (!user.empty()) conn_opts_.set_user_name(user); if (!password.empty()) conn_opts_.set_password(password); client_.set_callback(*this); } MqttControlChannel::~MqttControlChannel() { disconnect(); } bool MqttControlChannel::connect() { try { client_.connect(conn_opts_, nullptr, *this)->wait_for(kConnectTimeout); connected_ = client_.is_connected(); } catch (const mqtt::exception& e) { LOG_ERROR << "MQTT connect failed: " << e.what(); connected_ = false; } if (connected_) LOG_INFO << "MQTT connected to broker as '" << tower_ << "'"; return connected_; } void MqttControlChannel::disconnect() { if (!client_.is_connected()) return; try { client_.disconnect()->wait(); } catch (const mqtt::exception& e) { LOG_WARN << "MQTT disconnect: " << e.what(); } connected_ = false; } bool MqttControlChannel::connected() const { return connected_; } void MqttControlChannel::connected(const std::string& /*cause*/) { LOG_INFO << "MQTT (re)connected; subscribing"; client_.subscribe(topic_target_hdg_, kQos); client_.subscribe(topic_control_code_, kQos); connected_ = true; } void MqttControlChannel::connection_lost(const std::string& cause) { LOG_WARN << "MQTT connection lost" << (cause.empty() ? "" : ": " + cause); connected_ = false; // Paho will auto-reconnect when configured; the broker callback re-subscribes. } void MqttControlChannel::on_failure(const mqtt::token& /*tok*/) { LOG_WARN << "MQTT action failed"; } void MqttControlChannel::message_arrived(mqtt::const_message_ptr msg) { const std::string topic = msg->get_topic(); const std::string payload = msg->to_string(); LOG_TRACE_CAT(LogCat::Mqtt) << "RX " << topic << ' ' << payload; std::lock_guard lock(mutex_); try { if (topic == topic_target_hdg_) { (void)std::stoi(payload); // validate it is an integer sub_data_.target_heading = payload; sub_data_.heading_available = true; } else if (topic == topic_control_code_) { sub_data_.control_code = std::stoi(payload); sub_data_.control_code_available = true; } } catch (const std::exception& e) { LOG_WARN << "MQTT payload conversion error on " << topic << ": " << e.what(); } } ControlCommand MqttControlChannel::poll() { std::lock_guard lock(mutex_); ControlCommand out = sub_data_; sub_data_.control_code_available = false; sub_data_.heading_available = false; return out; } void MqttControlChannel::publish(const std::string& topic, const std::string& payload) { if (!client_.is_connected()) return; LOG_TRACE_CAT(LogCat::Mqtt) << "PUB " << topic << ' ' << payload; try { auto msg = mqtt::make_message(topic, payload); msg->set_qos(kQos); msg->set_retained(true); client_.publish(msg)->wait_for(kPublishTimeout); } catch (const mqtt::exception& e) { LOG_WARN << "MQTT publish failed: " << e.what(); } } void MqttControlChannel::publishStatus(int code) { publish(topic_status_, std::to_string(code)); } void MqttControlChannel::publishCamEvent(const CamEvent& e) { std::string payload = "{ \"fwt\":\"" + e.tower + "\" ,\"cam\":\"" + e.camera + "\", \"hdg\":" + std::to_string(e.heading_decideg) + ", \"pit\":" + std::to_string(e.pitch_decideg) + ", \"time\":" + std::to_string(e.timestamp_ms) + " }"; publish(topic_cam_event_, payload); } } // namespace fgc