158 lines
5.3 KiB
C++
Executable File
158 lines
5.3 KiB
C++
Executable File
#pragma once
|
|
|
|
#include <iostream>
|
|
#include <mqtt/async_client.h>
|
|
#include <thread>
|
|
#include <mutex>
|
|
struct mqtt_sub_data
|
|
{
|
|
bool ctl_avail = false;
|
|
bool hdg_avail = false;
|
|
std::string target_heading="";
|
|
int control_code = 0;
|
|
void set_control_code(int c) {
|
|
ctl_avail = true;
|
|
control_code = c;
|
|
}
|
|
void set_target_heading(std::string target) {
|
|
hdg_avail = true;
|
|
target_heading = target;
|
|
}
|
|
};
|
|
|
|
// Callbacks for the success or failures of requested actions.
|
|
// This could be used to initiate further action, but here we just log the
|
|
// results to the console.
|
|
class action_listener : public virtual mqtt::iaction_listener
|
|
{
|
|
std::string name_;
|
|
|
|
void on_failure(const mqtt::token& tok) override {
|
|
std::cout << name_ << " failure";
|
|
if (tok.get_message_id() != 0)
|
|
std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
|
|
std::cout << std::endl;
|
|
}
|
|
|
|
void on_success(const mqtt::token& tok) override {
|
|
std::cout << name_ << " success";
|
|
if (tok.get_message_id() != 0)
|
|
std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
|
|
auto top = tok.get_topics();
|
|
if (top && !top->empty())
|
|
std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;
|
|
std::cout << std::endl;
|
|
}
|
|
|
|
public:
|
|
action_listener(const std::string& name) : name_(name) {}
|
|
};
|
|
|
|
/////////////////////////////////////////////////////////////////////////////
|
|
|
|
/**
|
|
* Local callback & listener class for use with the client connection.
|
|
* This is primarily intended to receive messages, but it will also monitor
|
|
* the connection to the broker. If the connection is lost, it will attempt
|
|
* to restore the connection and re-subscribe to the topic.
|
|
*/
|
|
class MQTTCallback : public virtual mqtt::callback,
|
|
public virtual mqtt::iaction_listener
|
|
|
|
{
|
|
const std::string tpoic_target_hdg;
|
|
const std::string topic_control_mode;
|
|
mqtt_sub_data sub_data;
|
|
std::mutex mqtt_mut;
|
|
std::string fwt_name;
|
|
// Counter for the number of connection retries
|
|
int nretry_;
|
|
// The MQTT client
|
|
mqtt::async_client& cli_;
|
|
// Options to use if we need to reconnect
|
|
mqtt::connect_options& connOpts_;
|
|
// An action listener to display the result of actions.
|
|
action_listener subListener_;
|
|
|
|
// This deomonstrates manually reconnecting to the broker by calling
|
|
// connect() again. This is a possibility for an application that keeps
|
|
// a copy of it's original connect_options, or if the app wants to
|
|
// reconnect with different options.
|
|
// Another way this can be done manually, if using the same options, is
|
|
// to just call the async_client::reconnect() method.
|
|
void reconnect() {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
|
try {
|
|
cli_.connect(connOpts_, nullptr, *this);
|
|
}
|
|
catch (const mqtt::exception& exc) {
|
|
std::cerr << "Error: " << exc.what() << std::endl;
|
|
}
|
|
}
|
|
|
|
|
|
// Re-connection failure
|
|
void on_failure(const mqtt::token& tok) override;
|
|
|
|
// (Re)connection success
|
|
// Either this or connected() can be used for callbacks.
|
|
void on_success(const mqtt::token& tok) override {}
|
|
// Re-connection success
|
|
void connected(const std::string& cause) override;
|
|
|
|
// Callback for when the connection is lost.
|
|
// This will initiate the attempt to manually reconnect.
|
|
void connection_lost(const std::string& cause) override {
|
|
std::cout << "\nConnection lost" << std::endl;
|
|
if (!cause.empty())
|
|
std::cout << "\tcause: " << cause << std::endl;
|
|
|
|
std::cout << "Reconnecting..." << std::endl;
|
|
nretry_ = 0;
|
|
reconnect();
|
|
}
|
|
|
|
// Callback for when a message arrives.
|
|
void message_arrived(mqtt::const_message_ptr msg) override;
|
|
|
|
void delivery_complete(mqtt::delivery_token_ptr token) override {
|
|
std::cout << "Message delivery complete" << std::endl;
|
|
}
|
|
|
|
public:
|
|
MQTTCallback(mqtt::async_client& cli, mqtt::connect_options& connOpts, std::string tower_name)
|
|
: nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription"), fwt_name(tower_name), tpoic_target_hdg("GGS/FWT/" + tower_name + "/target_HDG"), topic_control_mode("GGS/FWT/" + tower_name + "/ControlCode") {}
|
|
mqtt_sub_data get_sub_data() {
|
|
std::unique_lock<std::mutex> ul(mqtt_mut);
|
|
mqtt_sub_data tmp = sub_data;
|
|
sub_data.hdg_avail = false;
|
|
sub_data.ctl_avail = false;
|
|
return tmp;
|
|
}
|
|
};
|
|
|
|
class MQTTClient {
|
|
public:
|
|
MQTTClient(const std::string& serverURI, const std::string& clientId, std::string tower_name);
|
|
~MQTTClient();
|
|
|
|
void connect_client();
|
|
void run();
|
|
void disconnect();
|
|
void publish(const std::string& topic, const std::string& payload);
|
|
void subscribe(const std::string& topic);
|
|
void receiveMessages();
|
|
bool is_connected_to_server();
|
|
|
|
bool running = false;
|
|
bool connected = false;
|
|
MQTTCallback callback;
|
|
private:
|
|
std::string fwt_name;
|
|
mqtt::async_client client;
|
|
mqtt::token_ptr subToken;
|
|
mqtt::token_ptr conToken;
|
|
mqtt::connect_options connOpts;
|
|
std::thread mqtt_thread;
|
|
};
|