advancedmqttbridge/own_modules/mqttManager.mjs
2021-08-02 17:04:26 +02:00

89 lines
3.3 KiB
JavaScript

import mqtt from 'mqtt'
import config from '../config/config.js'
import request from 'request';
export default class MqttManager{
constructor(){}
init(){
this.messageHandlers = []
this.client = mqtt.connect(config.mqtt.url, config.mqtt.options)
console.log("[MQTT] Connected to MQTT broker ["+config.mqtt.url+"].");
this.ready = true
this.client.on('message', (topic, message) => {
if(this.getSubscribedTopics().includes(topic)){
console.log("[MQTT] Got message ["+message+"] on handled topic ["+topic+"].");
const suitingMessageHandlers = this.messageHandlers.filter(handler => handler.topic == topic && handler.message == message);
suitingMessageHandlers.forEach(handler => {
console.log("[MQTT] Sending web request ["+handler.requestUrl+"].");
request(handler.requestUrl, { json: true }, (err, res, body) => {
if (err) {
console.log("[MQTT] Error sending web request ["+handler.requestUrl+"].");
}else{
console.log("[MQTT] Sent web request ["+handler.requestUrl+"].");
}
});
});
}else{
console.log("[MQTT] Got message ["+message+"] on unhandled topic ["+topic+"].");
}
});
}
subscribe(topic){
this.client.subscribe(topic, function(err){
if(err != null){
console.log("[MQTT] Error subscribing to topic ["+topic+"].");
}else{
console.log("[MQTT] Subscribed to topic ["+topic+"].");
}
});
}
unsubscribe(topic){
this.client.unsubscribe(topic, function(err){
if(err != null){
console.log("[MQTT] Error unsubscribing to topic ["+topic+"].");
}else{
console.log("[MQTT] Unsubscribed to topic ["+topic+"].");
}
});
}
publishMessage(topic, message){
this.client.publish(topic, message);
console.log("[MQTT] Publishing message ["+message+"] in topic ["+topic+"].");
}
getSubscribedTopics(){
return this.messageHandlers.map(h => h.topic);
}
addMessageHandler(messageHandler){
if(!this.getSubscribedTopics().includes(messageHandler.topic)){
this.subscribe(messageHandler.topic)
}
this.messageHandlers.push(messageHandler);
console.log("[MQTT] Added new message handler ["+JSON.stringify(messageHandler)+"].");
}
removeMessageHandler(messageHandler){
const messageHandlerIndex = this.messageHandlers.findIndex(handler => handler.topic == messageHandler.topic && handler.message == messageHandler.message && handler.requestUrl == messageHandler.requestUrl);
if(messageHandlerIndex != -1){
this.messageHandlers = this.messageHandlers.splice(messageHandlerIndex,1);
console.log("[MQTT] Removed message handler ["+JSON.stringify(messageHandler)+"].");
}else{
console.log("[MQTT] Error removing message handler ["+JSON.stringify(messageHandler)+"].");
}
if(!this.getSubscribedTopics().includes(messageHandler.topic)){
this.unsubscribe(messageHandler.topic)
}
}
}