132 lines
4.1 KiB
Python
132 lines
4.1 KiB
Python
import paho.mqtt.client as mqtt
|
|
import time
|
|
import json
|
|
import logging
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
state_class_from_unit = {
|
|
'Wh': 'total_increasing',
|
|
'VArh': 'power',
|
|
'A': 'measurement',
|
|
'V': 'measurement',
|
|
'kVA': 'measurement',
|
|
'VA': 'measurement',
|
|
'W': 'measurement',
|
|
}
|
|
|
|
device_class_from_unit = {
|
|
'Wh': 'energy',
|
|
'VArh': 'power',
|
|
'A': 'current',
|
|
'V': 'voltage',
|
|
'kVA': 'apparent_power',
|
|
'VA': 'apparent_power',
|
|
'W': 'power',
|
|
}
|
|
|
|
class TicPublisher:
|
|
def __init__(self, hostname=None, port=1883):
|
|
self.client = None
|
|
if hostname:
|
|
self.client = mqtt.Client()
|
|
self.client.connect(hostname, port)
|
|
#self.client.loop_start()
|
|
self._ads = None
|
|
self._manufacturer = None
|
|
self._year = None
|
|
self._model = None
|
|
self._serial = None
|
|
self._last_ts = {}
|
|
self._ha_config = {}
|
|
|
|
def close(self):
|
|
if self.client:
|
|
#self.client.loop_stop()
|
|
self.client = None
|
|
|
|
def set_ads(self, ads, manufacturer, year, model, serial):
|
|
log.debug(f"ads: {ads} manufacturer: {manufacturer} year: {year} model: {model} serial: {serial}")
|
|
self._ads = ads
|
|
self._manufacturer = manufacturer
|
|
self._year = year
|
|
self._model = model
|
|
self._serial = serial
|
|
|
|
def get_topic(self, label):
|
|
label = label.replace('+', 'PLUS').replace('-', 'MOINS')
|
|
topic = f"tic/{self._ads}/{label}"
|
|
return topic
|
|
|
|
def send_ha_config(self, msg, topic):
|
|
if not self._ads:
|
|
return
|
|
label = msg.get('label')
|
|
unit = msg.get('unit')
|
|
device_class = device_class_from_unit.get(unit) if unit else None
|
|
state_class = state_class_from_unit.get(unit) if unit else None
|
|
unique_id = f"tic_{self._ads}_{label.replace('+','PLUS').replace('-','MOINS')}"
|
|
node_id = f"tic"
|
|
object_id = unique_id
|
|
|
|
config_topic = f"homeassistant/sensor/{node_id}/{object_id}/config"
|
|
config = {
|
|
'unique_id': unique_id,
|
|
'name': label,
|
|
'state_topic': topic,
|
|
'value_template': "{% if 'value' in value_json %}{{ value_json.value }}{% endif %}",
|
|
'device_class': device_class,
|
|
'unit_of_measurement': unit,
|
|
'device': {
|
|
'identifiers': [ f'tic_{self._ads}' ],
|
|
'manufacturer': self._manufacturer,
|
|
'model': self._model,
|
|
'name': 'Tic Enedis',
|
|
},
|
|
'origin': {
|
|
'name': 'tic2mqtt',
|
|
'sw_version': "0.1",
|
|
'support_url': "https://git.grandou.net/gilles/tic2mqtt",
|
|
}
|
|
}
|
|
if unit:
|
|
config['unit_of_measurement'] = unit
|
|
if device_class:
|
|
config['device_class'] = device_class
|
|
if state_class:
|
|
config['state_class'] = state_class
|
|
|
|
log.debug(f"HA config topic: {config_topic}")
|
|
log.debug(f"HA config: {config}")
|
|
result = self.client.publish(config_topic, json.dumps(config), retain=True)
|
|
|
|
def send_msg(self, msg):
|
|
log.debug(f"msg: {msg}")
|
|
if not self.client:
|
|
return
|
|
if not self._ads:
|
|
return
|
|
if msg.get('label') is None:
|
|
return
|
|
if msg.get('value') is None:
|
|
return
|
|
ts = time.time()
|
|
label = msg.get('label')
|
|
value = msg.get('value')
|
|
unit = msg.get('unit')
|
|
if self._last_ts.get(label):
|
|
if ts < self._last_ts[label]['ts'] + 60 and value == self._last_ts[label]['value']:
|
|
log.debug(f"skip msg {label} {value}")
|
|
return
|
|
self._last_ts[label] = { 'value': value, 'ts': ts }
|
|
topic = self.get_topic(label)
|
|
if not self._ha_config.get('label'):
|
|
self.send_ha_config(msg, topic)
|
|
_msg = { 'value': msg['value'] }
|
|
if msg.get('date'):
|
|
_msg['date'] = msg['date']
|
|
log.debug(f"topic: {topic} msg: {_msg}")
|
|
result = self.client.publish(topic, json.dumps(_msg))
|
|
|
|
|