#include "mqtt.h" #include "logger.h" #include #include #include #include static void mosq_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str) { switch (level) { case MOSQ_LOG_WARNING: log_warning("%s", str); break; case MOSQ_LOG_ERR: log_error("%s", str); break; } } struct mosquitto *mqtt_open(const char *host, int port, int keepalive) { struct mosquitto *mosq; int res; mosquitto_lib_init(); /* Create MQTT client. */ mosq = mosquitto_new(NULL, 1, NULL); if (mosq == NULL) { log_error("Cannot create mosquitto client: %s", strerror(errno)); return NULL; } mosquitto_log_callback_set(mosq, mosq_log_callback); /* Connect to broker. */ res = mosquitto_connect(mosq, host, port, keepalive); if (res != MOSQ_ERR_SUCCESS) { log_error("Unable to connect to MQTT broker %s:%d: %s", host, port, mosquitto_strerror(res)); return NULL; } /* Start network loop. */ res = mosquitto_loop_start(mosq); if (res != MOSQ_ERR_SUCCESS) { log_error("Unable to start loop: %s", mosquitto_strerror(res)); return NULL; } return mosq; } void mqtt_close(struct mosquitto *mosq) { mosquitto_loop_stop(mosq, 1); mosquitto_disconnect(mosq); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); } int mqtt_publish(struct mosquitto *mosq, const char *topic_prefix, const char *topic_suffix, const void *payload, int qos) { char topic[TOPIC_MAXLEN + 1]; int res; if (topic_prefix != NULL) { if (topic_suffix != NULL) { sprintf(topic, "%s%s", topic_prefix, topic_suffix); } else { sprintf(topic, "%s", topic_prefix); } } else { if (topic_suffix != NULL) { sprintf(topic, "%s", topic_suffix); } else { return -1; } } res = mosquitto_publish(mosq, NULL, topic, strlen(payload), payload, qos, 1); if (res != 0) log_error("Cannot publish topic %s: %s\n", topic, mosquitto_strerror(res)); return res; } const char *mqtt_strerror(int err) { return mosquitto_strerror(err); }