158 lines
4.8 KiB
C++
Executable File
158 lines
4.8 KiB
C++
Executable File
#include "MQTT.h"
|
|
|
|
const int QOS = 1;
|
|
const int N_RETRY_ATTEMPTS = 5;
|
|
const std::string SERVER_ADDRESS{ "172.16.42.1" };//localhost:1883");
|
|
const std::string CLIENT_ID{ "gimbaltest" };
|
|
|
|
|
|
void MQTTCallback::on_failure(const mqtt::token& tok)
|
|
{
|
|
std::cout << "Connection failed" << std::endl;
|
|
if (++nretry_ > N_RETRY_ATTEMPTS)
|
|
std::cout << "Connection attempt failed already a few times" << std::endl;
|
|
reconnect();
|
|
}
|
|
|
|
|
|
|
|
|
|
// (Re)connection success
|
|
void MQTTCallback::connected(const std::string& cause) {
|
|
std::cout << "\nConnection success" << std::endl;
|
|
std::cout << "\nSubscribing to topics.." << std::endl;
|
|
|
|
cli_.subscribe(tpoic_target_hdg, QOS, nullptr, subListener_);
|
|
cli_.subscribe(topic_control_mode, QOS, nullptr, subListener_);
|
|
}
|
|
|
|
void MQTTCallback::message_arrived(mqtt::const_message_ptr msg) {
|
|
std::cout << "Message arrived" << std::endl;
|
|
std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl;
|
|
std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl;
|
|
if (msg->get_topic() == "GGS/FWT/" + fwt_name + "/target_HDG") {
|
|
try {
|
|
int value = static_cast<int>(std::stoi(msg->to_string()));
|
|
std::unique_lock<std::mutex> ul(mqtt_mut);
|
|
sub_data.set_target_heading(msg->to_string());
|
|
}
|
|
catch (const std::invalid_argument& e) {
|
|
std::cerr << "MQTT type convertion invalid argument: " << e.what() << std::endl;
|
|
}
|
|
catch (const std::out_of_range& e) {
|
|
std::cerr << "MQTT type convertion out of range: " << e.what() << std::endl;
|
|
}
|
|
}
|
|
else if (msg->get_topic() == "GGS/FWT/" + fwt_name + "/ControlCode") {
|
|
try {
|
|
std::unique_lock<std::mutex> ul(mqtt_mut);
|
|
sub_data.set_control_code(static_cast<int>(std::stoi(msg->to_string())));
|
|
}
|
|
catch (const std::invalid_argument& e) {
|
|
std::cerr << "MQTT type convertion invalid argument: " << e.what() << std::endl;
|
|
}
|
|
catch (const std::out_of_range& e) {
|
|
std::cerr << "MQTT type convertion out of range: " << e.what() << std::endl;
|
|
}
|
|
}
|
|
}
|
|
|
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////7x
|
|
|
|
MQTTClient::MQTTClient(const std::string& serverURI, const std::string& clientId, std::string tower_name)
|
|
: client(serverURI, clientId), callback(client, connOpts, tower_name)
|
|
{
|
|
connOpts.set_keep_alive_interval(20);
|
|
connOpts.set_clean_session(true);
|
|
std::cout << "MQTT object created" << std::endl;
|
|
client.set_callback(callback);
|
|
std::cout << "MQTT callback set" << std::endl;
|
|
}
|
|
|
|
MQTTClient::~MQTTClient() {
|
|
if (client.is_connected())
|
|
disconnect();
|
|
}
|
|
|
|
void MQTTClient::connect_client() {
|
|
std::cout << "connecting MQTT..." << std::endl;
|
|
running = true;
|
|
mqtt_thread = std::thread(&MQTTClient::run, this);
|
|
|
|
}
|
|
|
|
void MQTTClient::run() {
|
|
const auto TIMEOUT = std::chrono::seconds(5);
|
|
try {
|
|
conToken = client.connect(connOpts, nullptr, callback);
|
|
conToken->wait_for(TIMEOUT);
|
|
connected = true;
|
|
}
|
|
catch (const mqtt::exception& exc) {
|
|
std::cerr << "Error: " << exc.what() << std::endl;
|
|
running = false;
|
|
connected = false;
|
|
return;
|
|
}
|
|
std::cout << "MQTT connected" << std::endl;
|
|
|
|
while (running)
|
|
;
|
|
}
|
|
|
|
void MQTTClient::disconnect() {
|
|
// Disconnect
|
|
try {
|
|
std::cout << "\nDisconnecting from the MQTT server..." << std::flush;
|
|
client.disconnect()->wait();
|
|
std::cout << "OK" << std::endl;
|
|
}
|
|
catch (const mqtt::exception& exc) {
|
|
std::cerr << exc.what() << std::endl;
|
|
}
|
|
running = false;
|
|
mqtt_thread.join();
|
|
}
|
|
|
|
void MQTTClient::publish(const std::string& topic, const std::string& payload) {
|
|
const auto TIMEOUT = std::chrono::seconds(3);
|
|
mqtt::delivery_token_ptr pubtok;
|
|
mqtt::message_ptr msg = mqtt::make_message(topic, payload);
|
|
msg->set_qos(1);
|
|
msg->set_retained(true);
|
|
try {
|
|
pubtok = client.publish(msg);
|
|
pubtok->wait_for(TIMEOUT);
|
|
}
|
|
catch (const mqtt::exception& exc) {
|
|
std::cerr << "Error: " << exc.what() << std::endl;
|
|
}
|
|
}
|
|
|
|
void MQTTClient::subscribe(const std::string& topic) {
|
|
try {
|
|
subToken = client.subscribe(topic, 1);
|
|
subToken->wait();
|
|
}
|
|
catch (const mqtt::exception& exc) {
|
|
std::cerr << "Error: " << exc.what() << std::endl;
|
|
}
|
|
}
|
|
|
|
void MQTTClient::receiveMessages() {
|
|
try {
|
|
client.start_consuming();
|
|
}
|
|
catch (const mqtt::exception& exc) {
|
|
std::cerr << "Error: " << exc.what() << std::endl;
|
|
}
|
|
while (true) {
|
|
|
|
}
|
|
}
|
|
|
|
bool MQTTClient::is_connected_to_server()
|
|
{
|
|
return client.is_connected();
|
|
}
|