#pragma once #include #include #include #include struct mqtt_sub_data { bool ctl_avail = false; bool hdg_avail = false; std::string target_heading=""; int control_code = 0; void set_control_code(int c) { ctl_avail = true; control_code = c; } void set_target_heading(std::string target) { hdg_avail = true; target_heading = target; } }; // Callbacks for the success or failures of requested actions. // This could be used to initiate further action, but here we just log the // results to the console. class action_listener : public virtual mqtt::iaction_listener { std::string name_; void on_failure(const mqtt::token& tok) override { std::cout << name_ << " failure"; if (tok.get_message_id() != 0) std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl; std::cout << std::endl; } void on_success(const mqtt::token& tok) override { std::cout << name_ << " success"; if (tok.get_message_id() != 0) std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl; auto top = tok.get_topics(); if (top && !top->empty()) std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl; std::cout << std::endl; } public: action_listener(const std::string& name) : name_(name) {} }; ///////////////////////////////////////////////////////////////////////////// /** * Local callback & listener class for use with the client connection. * This is primarily intended to receive messages, but it will also monitor * the connection to the broker. If the connection is lost, it will attempt * to restore the connection and re-subscribe to the topic. */ class MQTTCallback : public virtual mqtt::callback, public virtual mqtt::iaction_listener { const std::string tpoic_target_hdg; const std::string topic_control_mode; mqtt_sub_data sub_data; std::mutex mqtt_mut; std::string fwt_name; // Counter for the number of connection retries int nretry_; // The MQTT client mqtt::async_client& cli_; // Options to use if we need to reconnect mqtt::connect_options& connOpts_; // An action listener to display the result of actions. action_listener subListener_; // This deomonstrates manually reconnecting to the broker by calling // connect() again. This is a possibility for an application that keeps // a copy of it's original connect_options, or if the app wants to // reconnect with different options. // Another way this can be done manually, if using the same options, is // to just call the async_client::reconnect() method. void reconnect() { std::this_thread::sleep_for(std::chrono::milliseconds(2500)); try { cli_.connect(connOpts_, nullptr, *this); } catch (const mqtt::exception& exc) { std::cerr << "Error: " << exc.what() << std::endl; } } // Re-connection failure void on_failure(const mqtt::token& tok) override; // (Re)connection success // Either this or connected() can be used for callbacks. void on_success(const mqtt::token& tok) override {} // Re-connection success void connected(const std::string& cause) override; // Callback for when the connection is lost. // This will initiate the attempt to manually reconnect. void connection_lost(const std::string& cause) override { std::cout << "\nConnection lost" << std::endl; if (!cause.empty()) std::cout << "\tcause: " << cause << std::endl; std::cout << "Reconnecting..." << std::endl; nretry_ = 0; reconnect(); } // Callback for when a message arrives. void message_arrived(mqtt::const_message_ptr msg) override; void delivery_complete(mqtt::delivery_token_ptr token) override { std::cout << "MQTT delivery complete" << std::endl; } public: MQTTCallback(mqtt::async_client& cli, mqtt::connect_options& connOpts, std::string tower_name) : nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription"), fwt_name(tower_name), tpoic_target_hdg("GGS/FWT/" + tower_name + "/target_HDG"), topic_control_mode("GGS/FWT/" + tower_name + "/ControlCode") {} mqtt_sub_data get_sub_data() { std::unique_lock ul(mqtt_mut); mqtt_sub_data tmp = sub_data; sub_data.hdg_avail = false; sub_data.ctl_avail = false; return tmp; } }; class MQTTClient { public: MQTTClient(const std::string& serverURI, const std::string& clientId, std::string tower_name, std::string login_user, std::string login_pw); ~MQTTClient(); void connect_client(); void run(); void disconnect(); void publish(const std::string& topic, const std::string& payload); void subscribe(const std::string& topic); void receiveMessages(); bool is_connected_to_server(); bool running = false; bool connected = false; MQTTCallback callback; private: std::string fwt_name; mqtt::async_client client; mqtt::token_ptr subToken; mqtt::token_ptr conToken; mqtt::connect_options connOpts; std::thread mqtt_thread; };