tic2mqtt/tic_mqtt.py

132 lines
4.1 KiB
Python

import paho.mqtt.client as mqtt
import time
import json
import logging
log = logging.getLogger(__name__)
state_class_from_unit = {
'Wh': 'total_increasing',
'VArh': 'power',
'A': 'measurement',
'V': 'measurement',
'kVA': 'measurement',
'VA': 'measurement',
'W': 'measurement',
}
device_class_from_unit = {
'Wh': 'energy',
'VArh': 'power',
'A': 'current',
'V': 'voltage',
'kVA': 'apparent_power',
'VA': 'apparent_power',
'W': 'power',
}
class TicPublisher:
def __init__(self, hostname=None, port=1883):
self.client = None
if hostname:
self.client = mqtt.Client()
self.client.connect(hostname, port)
#self.client.loop_start()
self._ads = None
self._manufacturer = None
self._year = None
self._model = None
self._serial = None
self._last_ts = {}
self._ha_config = {}
def close(self):
if self.client:
#self.client.loop_stop()
self.client = None
def set_ads(self, ads, manufacturer, year, model, serial):
log.debug(f"ads: {ads} manufacturer: {manufacturer} year: {year} model: {model} serial: {serial}")
self._ads = ads
self._manufacturer = manufacturer
self._year = year
self._model = model
self._serial = serial
def get_topic(self, label):
label = label.replace('+', 'PLUS').replace('-', 'MOINS')
topic = f"tic/{self._ads}/{label}"
return topic
def send_ha_config(self, msg, topic):
if not self._ads:
return
label = msg.get('label')
unit = msg.get('unit')
device_class = device_class_from_unit.get(unit) if unit else None
state_class = state_class_from_unit.get(unit) if unit else None
unique_id = f"tic_{self._ads}_{label.replace('+','PLUS').replace('-','MOINS')}"
node_id = f"tic"
object_id = unique_id
config_topic = f"homeassistant/sensor/{node_id}/{object_id}/config"
config = {
'unique_id': unique_id,
'name': label,
'state_topic': topic,
'value_template': "{% if 'value' in value_json %}{{ value_json.value }}{% endif %}",
'device_class': device_class,
'unit_of_measurement': unit,
'device': {
'identifiers': [ f'tic_{self._ads}' ],
'manufacturer': self._manufacturer,
'model': self._model,
'name': 'Tic Enedis',
},
'origin': {
'name': 'tic2mqtt',
'sw_version': "0.1",
'support_url': "https://git.grandou.net/gilles/tic2mqtt",
}
}
if unit:
config['unit_of_measurement'] = unit
if device_class:
config['device_class'] = device_class
if state_class:
config['state_class'] = state_class
log.debug(f"HA config topic: {config_topic}")
log.debug(f"HA config: {config}")
result = self.client.publish(config_topic, json.dumps(config), retain=True)
def send_msg(self, msg):
log.debug(f"msg: {msg}")
if not self.client:
return
if not self._ads:
return
if msg.get('label') is None:
return
if msg.get('value') is None:
return
ts = time.time()
label = msg.get('label')
value = msg.get('value')
unit = msg.get('unit')
if self._last_ts.get(label):
if ts < self._last_ts[label]['ts'] + 60 and value == self._last_ts[label]['value']:
log.debug(f"skip msg {label} {value}")
return
self._last_ts[label] = { 'value': value, 'ts': ts }
topic = self.get_topic(label)
if not self._ha_config.get('label'):
self.send_ha_config(msg, topic)
_msg = { 'value': msg['value'] }
if msg.get('date'):
_msg['date'] = msg['date']
log.debug(f"topic: {topic} msg: {_msg}")
result = self.client.publish(topic, json.dumps(_msg))