This repository has been archived on 2026-06-17. You can view files and clone it, but cannot push or open issues or pull requests.
FireWatchTower_2axis/host/MQTT.h

158 lines
5.3 KiB
C++
Executable File

#pragma once
#include <iostream>
#include <mqtt/async_client.h>
#include <thread>
#include <mutex>
struct mqtt_sub_data
{
bool ctl_avail = false;
bool hdg_avail = false;
std::string target_heading="";
int control_code = 0;
void set_control_code(int c) {
ctl_avail = true;
control_code = c;
}
void set_target_heading(std::string target) {
hdg_avail = true;
target_heading = target;
}
};
// Callbacks for the success or failures of requested actions.
// This could be used to initiate further action, but here we just log the
// results to the console.
class action_listener : public virtual mqtt::iaction_listener
{
std::string name_;
void on_failure(const mqtt::token& tok) override {
std::cout << name_ << " failure";
if (tok.get_message_id() != 0)
std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
std::cout << std::endl;
}
void on_success(const mqtt::token& tok) override {
std::cout << name_ << " success";
if (tok.get_message_id() != 0)
std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
auto top = tok.get_topics();
if (top && !top->empty())
std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;
std::cout << std::endl;
}
public:
action_listener(const std::string& name) : name_(name) {}
};
/////////////////////////////////////////////////////////////////////////////
/**
* Local callback & listener class for use with the client connection.
* This is primarily intended to receive messages, but it will also monitor
* the connection to the broker. If the connection is lost, it will attempt
* to restore the connection and re-subscribe to the topic.
*/
class MQTTCallback : public virtual mqtt::callback,
public virtual mqtt::iaction_listener
{
const std::string tpoic_target_hdg;
const std::string topic_control_mode;
mqtt_sub_data sub_data;
std::mutex mqtt_mut;
std::string fwt_name;
// Counter for the number of connection retries
int nretry_;
// The MQTT client
mqtt::async_client& cli_;
// Options to use if we need to reconnect
mqtt::connect_options& connOpts_;
// An action listener to display the result of actions.
action_listener subListener_;
// This deomonstrates manually reconnecting to the broker by calling
// connect() again. This is a possibility for an application that keeps
// a copy of it's original connect_options, or if the app wants to
// reconnect with different options.
// Another way this can be done manually, if using the same options, is
// to just call the async_client::reconnect() method.
void reconnect() {
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
try {
cli_.connect(connOpts_, nullptr, *this);
}
catch (const mqtt::exception& exc) {
std::cerr << "Error: " << exc.what() << std::endl;
}
}
// Re-connection failure
void on_failure(const mqtt::token& tok) override;
// (Re)connection success
// Either this or connected() can be used for callbacks.
void on_success(const mqtt::token& tok) override {}
// Re-connection success
void connected(const std::string& cause) override;
// Callback for when the connection is lost.
// This will initiate the attempt to manually reconnect.
void connection_lost(const std::string& cause) override {
std::cout << "\nConnection lost" << std::endl;
if (!cause.empty())
std::cout << "\tcause: " << cause << std::endl;
std::cout << "Reconnecting..." << std::endl;
nretry_ = 0;
reconnect();
}
// Callback for when a message arrives.
void message_arrived(mqtt::const_message_ptr msg) override;
void delivery_complete(mqtt::delivery_token_ptr token) override {
std::cout << "Message delivery complete" << std::endl;
}
public:
MQTTCallback(mqtt::async_client& cli, mqtt::connect_options& connOpts, std::string tower_name)
: nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription"), fwt_name(tower_name), tpoic_target_hdg("GGS/FWT/" + tower_name + "/target_HDG"), topic_control_mode("GGS/FWT/" + tower_name + "/ControlCode") {}
mqtt_sub_data get_sub_data() {
std::unique_lock<std::mutex> ul(mqtt_mut);
mqtt_sub_data tmp = sub_data;
sub_data.hdg_avail = false;
sub_data.ctl_avail = false;
return tmp;
}
};
class MQTTClient {
public:
MQTTClient(const std::string& serverURI, const std::string& clientId, std::string tower_name);
~MQTTClient();
void connect_client();
void run();
void disconnect();
void publish(const std::string& topic, const std::string& payload);
void subscribe(const std::string& topic);
void receiveMessages();
bool is_connected_to_server();
bool running = false;
bool connected = false;
MQTTCallback callback;
private:
std::string fwt_name;
mqtt::async_client client;
mqtt::token_ptr subToken;
mqtt::token_ptr conToken;
mqtt::connect_options connOpts;
std::thread mqtt_thread;
};