tic2mqtt/tic.py
2024-05-26 15:42:50 +02:00

200 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import tic_device
import tic_standard
import tic_hist
log = logging.getLogger(__name__)
TIC_STX = b'\x02'
TIC_ETX = b'\x03'
TIC_SGR = b'\x0a'
TIC_EGR = b'\x0d'
TIC_MODE_STD = 1
TIC_MODE_HIS = 2
type_str = {
'61': "Compteur monophasé 60 A généralisation Linky G3 - arrivée puissance haute",
'62': "Compteur monophasé 90 A généralisation Linky G1 - arrivée puissance basse",
'63': "Compteur triphasé 60 A généralisation Linky G1 - arrivée puissance basse",
'64': "Compteur monophasé 60 A généralisation Linky G3 - arrivée puissance basse",
'70': "Compteur monophasé Linky 60 A mise au point G3",
'71': "Compteur triphasé Linky 60 A mise au point G3",
'75': "Compteur monophasé 90 A généralisation Linky G3 - arrivée puissance basse",
'76': "Compteur triphasé 60 A généralisation Linky G3 - arrivée puissance basse",
}
manufacturer_str = {
"01": "CROUZET / MONETEL",
"02": "SAGEM / SAGEMCOM",
"03": "SCHLUMBERGER / ACTARIS / ITRON",
"04": "LANDIS ET GYR / SIEMENS METERING / LANDIS+GYR",
"05": "SAUTER / STEPPER ENERGIE France / ZELLWEGER",
"06": "ITRON",
"07": "MAEC",
"08": "MATRA-CHAUVIN ARNOUX / ENERDIS",
"09": "FAURE-HERMAN",
"10": "SEVME / SIS",
"11": "MAGNOL / ELSTER / HONEYWELL",
"12": "GAZ THERMIQUE",
"14": "GHIELMETTI / DIALOG E.S. / MICRONIQUE",
"15": "MECELEC",
"16": "LEGRAND / BACO",
"17": "SERD-SCHLUMBERGER",
"18": "SCHNEIDER / MERLIN GERIN / GARDY",
"19": "GENERAL ELECTRIC / POWER CONTROL",
"20": "NUOVO PIGNONE / DRESSER",
"21": "SCLE",
"22": "EDF",
"23": "GDF / GDF-SUEZ",
"24": "HAGER GENERAL ELECTRIC",
"25": "DELTA-DORE",
"26": "RIZ",
"27": "ISKRAEMECO",
"28": "GMT",
"29": "ANALOG DEVICE",
"30": "MICHAUD",
"31": "HEXING ELECTRICAL CO. Ltd",
"32": "SIAME",
"33": "LARSEN & TOUBRO Limited",
"34": "ELSTER / HONEYWELL",
"35": "ELECTRONIC AFZAR AZMA",
"36": "ADVANCED ELECTRONIC COMPAGNY Ldt",
"37": "AEM",
"38": "ZHEJIANG CHINT INSTRUMENT & METER CO. Ldt",
"39": "ZIV",
"70": "LANDIS et GYR (export ou régie)",
"71": "STEPPER ENERGIE France (export ou régie)",
"81": "SAGEM / SAGEMCOM",
"82": "LANDIS ET GYR / SIEMENS METERING / LANDIS+GYR",
"83": "ELSTER / HONEYWELL",
"84": "SAGEM / SAGEMCOM",
"85": "ITRON",
}
class Tic:
def __init__(self, input):
self.input = input
self.device = tic_device.TicDevice(input)
self.ads = None
self.device_manufacturer = None
self.device_year = None
self.device_model = None
self.device_serial = None
self.mode = None
self._tic = None
def detect_mode(self):
if self._detect_mode():
return
if self.device.is_serial():
log.debug("tring speed 1200bps")
self.device.set_baudrate(1200)
if self._detect_mode():
return
log.error("no TIC protocol detected")
return None
def _detect_mode(self):
ok_mask = 0x00
tic_map = { TIC_STX: 0x01,
TIC_ETX: 0x02,
TIC_SGR: 0x04,
TIC_EGR: 0x08,
b'\x09': 0x10 } # TODO
nb_reset = 0
while c := self.device.get_char():
ok = tic_map.get(c, 0)
if ok_mask or ok == 0x01:
ok_mask = ok_mask | ok
if ok_mask == 0x1f:
log.info(f"Standard mode detected")
self.mode = TIC_MODE_STD
self._tic = tic_standard.TicStandard()
return self.mode
if ok_mask == 0x0f:
log.info(f"Historiqual mode detected")
self.mode = TIC_MODE_HIS
self._tic = tic_hist.TicHist()
return self.mode
if not ok and (c < b' ' or c > b'\x7f'):
log.debug(f"bad char: {c}")
ok_mask = 0x00
nb_reset = nb_reset + 1
if nb_reset >= 20:
break
log.debug("Timeout/EOF")
return None
def get_frame(self):
frame = b''
while True:
c = self.device.get_char()
if not c:
return None
if c == TIC_STX:
break
while True:
c = self.device.get_char()
if not c:
return None
if c == TIC_ETX:
break
frame = frame + c
return frame
def split_frame(self, frame):
groups = []
start = 0
while start <= len(frame):
start = frame.find(TIC_SGR, start)
if start < 0:
return groups
end = frame.find(TIC_EGR, start)
if end < 0:
return groups
groups.append(frame[start+1:end])
start = end + 1
return groups
def check_sum(self, group):
return self._tic.check_sum(group)
def decode_group(self, group):
msg = self._tic.decode_group(group)
if msg['label'] == 'ADSC':
self.parse_ads(msg['value'])
return msg
def parse_ads(self, ads):
if self.ads != ads:
self.ads = ads
self.device_manufacturer = ma = manufacturer_str.get(ads[0:2], "unknown")
try:
self.device_year = 2000 + int(ads[2:4])
except ValueError:
self.device_year = None
self.device_model = type_str.get(ads[4:6], "unknown")
self.device_serial = ads[6:]
log.info(f"device: {self.ads}")
log.info(f"manufacturer: {self.device_manufacturer}")
log.info(f"year: {self.device_year}")
log.info(f"model: {self.device_model}")
log.info(f"serial number: {self.device_serial}")
def run(self):
while True:
frame = self.get_frame()
if not frame:
return
groups = self.split_frame(frame)
for group in groups:
if not self.check_sum(group):
continue
msg = self.decode_group(group)
log.info(msg)