tic2mqtt/tic.py

211 lines
6.6 KiB
Python
Raw Permalink Normal View History

2024-05-19 18:23:53 +02:00
import logging
import time
import tic_device
import tic_standard
import tic_hist
log = logging.getLogger(__name__)
TIC_STX = b'\x02'
TIC_ETX = b'\x03'
TIC_SGR = b'\x0a'
TIC_EGR = b'\x0d'
TIC_MODE_STD = "standard"
TIC_MODE_HIS = "historique"
type_str = {
'61': "Compteur monophasé 60 A généralisation Linky G3 - arrivée puissance haute",
'62': "Compteur monophasé 90 A généralisation Linky G1 - arrivée puissance basse",
'63': "Compteur triphasé 60 A généralisation Linky G1 - arrivée puissance basse",
'64': "Compteur monophasé 60 A généralisation Linky G3 - arrivée puissance basse",
'70': "Compteur monophasé Linky 60 A mise au point G3",
'71': "Compteur triphasé Linky 60 A mise au point G3",
'75': "Compteur monophasé 90 A généralisation Linky G3 - arrivée puissance basse",
'76': "Compteur triphasé 60 A généralisation Linky G3 - arrivée puissance basse",
}
manufacturer_str = {
"01": "CROUZET / MONETEL",
"02": "SAGEM / SAGEMCOM",
"03": "SCHLUMBERGER / ACTARIS / ITRON",
"04": "LANDIS ET GYR / SIEMENS METERING / LANDIS+GYR",
"05": "SAUTER / STEPPER ENERGIE France / ZELLWEGER",
"06": "ITRON",
"07": "MAEC",
"08": "MATRA-CHAUVIN ARNOUX / ENERDIS",
"09": "FAURE-HERMAN",
"10": "SEVME / SIS",
"11": "MAGNOL / ELSTER / HONEYWELL",
"12": "GAZ THERMIQUE",
"14": "GHIELMETTI / DIALOG E.S. / MICRONIQUE",
"15": "MECELEC",
"16": "LEGRAND / BACO",
"17": "SERD-SCHLUMBERGER",
"18": "SCHNEIDER / MERLIN GERIN / GARDY",
"19": "GENERAL ELECTRIC / POWER CONTROL",
"20": "NUOVO PIGNONE / DRESSER",
"21": "SCLE",
"22": "EDF",
"23": "GDF / GDF-SUEZ",
"24": "HAGER GENERAL ELECTRIC",
"25": "DELTA-DORE",
"26": "RIZ",
"27": "ISKRAEMECO",
"28": "GMT",
"29": "ANALOG DEVICE",
"30": "MICHAUD",
"31": "HEXING ELECTRICAL CO. Ltd",
"32": "SIAME",
"33": "LARSEN & TOUBRO Limited",
"34": "ELSTER / HONEYWELL",
"35": "ELECTRONIC AFZAR AZMA",
"36": "ADVANCED ELECTRONIC COMPAGNY Ldt",
"37": "AEM",
"38": "ZHEJIANG CHINT INSTRUMENT & METER CO. Ldt",
"39": "ZIV",
"70": "LANDIS et GYR (export ou régie)",
"71": "STEPPER ENERGIE France (export ou régie)",
"81": "SAGEM / SAGEMCOM",
"82": "LANDIS ET GYR / SIEMENS METERING / LANDIS+GYR",
"83": "ELSTER / HONEYWELL",
"84": "SAGEM / SAGEMCOM",
"85": "ITRON",
}
class Tic:
def __init__(self, input):
self.input = input
self.device = tic_device.TicDevice(input)
self.ads = None
self.device_manufacturer = None
self.device_year = None
self.device_model = None
self.device_serial = None
self.mode = None
self._tic = None
self._cb_ads = None
self._cb_msg = None
def set_mode(self, mode):
if mode == TIC_MODE_STD:
self.mode = TIC_MODE_STD
self._tic = tic_standard.TicStandard()
self.device.set_baudrate(9600)
log.info(f"set_mode: {self._tic.name()}")
elif mode == TIC_MODE_HIS:
self.mode = TIC_MODE_HIS
self._tic = tic_hist.TicHist()
self.device.set_baudrate(1200)
log.info(f"set_mode: {self._tic.name()}")
else:
self.mode = None
self._tic = None
log.info(f"set_mode: unsupported")
def detect_mode(self):
for mode in ( TIC_MODE_STD, TIC_MODE_HIS ):
self.set_mode(mode)
f = self.get_frame()
if not f:
continue
groups = self.split_frame(f)
for group in groups:
if not self.check_sum(group):
continue
self.decode_group(group)
if self.ads:
log.info(f"detected {self._tic.name()}")
return mode
log.error("no valid frame detected")
return None
def get_frame(self):
frame = b''
while True:
c = self.device.get_char()
if not c:
return None
if c == TIC_STX:
break
while True:
c = self.device.get_char()
if not c:
return None
if c == TIC_ETX:
break
frame = frame + c
return frame
def split_frame(self, frame):
groups = []
start = 0
while start <= len(frame):
start = frame.find(TIC_SGR, start)
if start < 0:
return groups
end = frame.find(TIC_EGR, start)
if end < 0:
return groups
groups.append(frame[start+1:end])
start = end + 1
return groups
def check_sum(self, group):
return self._tic.check_sum(group)
def decode_group(self, group):
msg = self._tic.decode_group(group)
log.debug(f"msg: {msg}")
if msg and msg.get('label') in ( 'ADSC', 'ADCO' ):
self.parse_ads(msg['value'])
return msg
def parse_ads(self, ads):
if self.ads != ads:
self.ads = ads
self.device_manufacturer = ma = manufacturer_str.get(ads[0:2], "unknown")
try:
self.device_year = 2000 + int(ads[2:4])
except ValueError:
self.device_year = None
self.device_model = type_str.get(ads[4:6], "unknown")
self.device_serial = ads[6:]
log.info(f"device: {self.ads}")
log.info(f"manufacturer: {self.device_manufacturer}")
log.info(f"year: {self.device_year}")
log.info(f"model: {self.device_model}")
log.info(f"serial number: {self.device_serial}")
if self._cb_ads:
self._cb_ads(self.ads, self.device_manufacturer, self.device_year, self.device_model, self.device_serial)
def set_callback_ads(self, function):
self._cb_ads = function
if self.ads:
ads = self.ads
self.ads = None
self.parse_ads(self.ads)
def set_callback_msg(self, function):
self._cb_msg = function
def run(self):
while True:
frame = self.get_frame()
if not frame:
return
groups = self.split_frame(frame)
for group in groups:
if not self.check_sum(group):
continue
msg = self.decode_group(group)
log.info(msg)
if msg and self._cb_msg:
self._cb_msg(msg)
#if not self.device.is_serial():
# time.sleep(1)