tic2mqtt/tic.py

211 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import time
import tic_device
import tic_standard
import tic_hist
log = logging.getLogger(__name__)
TIC_STX = b'\x02'
TIC_ETX = b'\x03'
TIC_SGR = b'\x0a'
TIC_EGR = b'\x0d'
TIC_MODE_STD = "standard"
TIC_MODE_HIS = "historique"
type_str = {
'61': "Compteur monophasé 60 A généralisation Linky G3 - arrivée puissance haute",
'62': "Compteur monophasé 90 A généralisation Linky G1 - arrivée puissance basse",
'63': "Compteur triphasé 60 A généralisation Linky G1 - arrivée puissance basse",
'64': "Compteur monophasé 60 A généralisation Linky G3 - arrivée puissance basse",
'70': "Compteur monophasé Linky 60 A mise au point G3",
'71': "Compteur triphasé Linky 60 A mise au point G3",
'75': "Compteur monophasé 90 A généralisation Linky G3 - arrivée puissance basse",
'76': "Compteur triphasé 60 A généralisation Linky G3 - arrivée puissance basse",
}
manufacturer_str = {
"01": "CROUZET / MONETEL",
"02": "SAGEM / SAGEMCOM",
"03": "SCHLUMBERGER / ACTARIS / ITRON",
"04": "LANDIS ET GYR / SIEMENS METERING / LANDIS+GYR",
"05": "SAUTER / STEPPER ENERGIE France / ZELLWEGER",
"06": "ITRON",
"07": "MAEC",
"08": "MATRA-CHAUVIN ARNOUX / ENERDIS",
"09": "FAURE-HERMAN",
"10": "SEVME / SIS",
"11": "MAGNOL / ELSTER / HONEYWELL",
"12": "GAZ THERMIQUE",
"14": "GHIELMETTI / DIALOG E.S. / MICRONIQUE",
"15": "MECELEC",
"16": "LEGRAND / BACO",
"17": "SERD-SCHLUMBERGER",
"18": "SCHNEIDER / MERLIN GERIN / GARDY",
"19": "GENERAL ELECTRIC / POWER CONTROL",
"20": "NUOVO PIGNONE / DRESSER",
"21": "SCLE",
"22": "EDF",
"23": "GDF / GDF-SUEZ",
"24": "HAGER GENERAL ELECTRIC",
"25": "DELTA-DORE",
"26": "RIZ",
"27": "ISKRAEMECO",
"28": "GMT",
"29": "ANALOG DEVICE",
"30": "MICHAUD",
"31": "HEXING ELECTRICAL CO. Ltd",
"32": "SIAME",
"33": "LARSEN & TOUBRO Limited",
"34": "ELSTER / HONEYWELL",
"35": "ELECTRONIC AFZAR AZMA",
"36": "ADVANCED ELECTRONIC COMPAGNY Ldt",
"37": "AEM",
"38": "ZHEJIANG CHINT INSTRUMENT & METER CO. Ldt",
"39": "ZIV",
"70": "LANDIS et GYR (export ou régie)",
"71": "STEPPER ENERGIE France (export ou régie)",
"81": "SAGEM / SAGEMCOM",
"82": "LANDIS ET GYR / SIEMENS METERING / LANDIS+GYR",
"83": "ELSTER / HONEYWELL",
"84": "SAGEM / SAGEMCOM",
"85": "ITRON",
}
class Tic:
def __init__(self, input):
self.input = input
self.device = tic_device.TicDevice(input)
self.ads = None
self.device_manufacturer = None
self.device_year = None
self.device_model = None
self.device_serial = None
self.mode = None
self._tic = None
self._cb_ads = None
self._cb_msg = None
def set_mode(self, mode):
if mode == TIC_MODE_STD:
self.mode = TIC_MODE_STD
self._tic = tic_standard.TicStandard()
self.device.set_baudrate(9600)
log.info(f"set_mode: {self._tic.name()}")
elif mode == TIC_MODE_HIS:
self.mode = TIC_MODE_HIS
self._tic = tic_hist.TicHist()
self.device.set_baudrate(1200)
log.info(f"set_mode: {self._tic.name()}")
else:
self.mode = None
self._tic = None
log.info(f"set_mode: unsupported")
def detect_mode(self):
for mode in ( TIC_MODE_STD, TIC_MODE_HIS ):
self.set_mode(mode)
f = self.get_frame()
if not f:
continue
groups = self.split_frame(f)
for group in groups:
if not self.check_sum(group):
continue
self.decode_group(group)
if self.ads:
log.info(f"detected {self._tic.name()}")
return mode
log.error("no valid frame detected")
return None
def get_frame(self):
frame = b''
while True:
c = self.device.get_char()
if not c:
return None
if c == TIC_STX:
break
while True:
c = self.device.get_char()
if not c:
return None
if c == TIC_ETX:
break
frame = frame + c
return frame
def split_frame(self, frame):
groups = []
start = 0
while start <= len(frame):
start = frame.find(TIC_SGR, start)
if start < 0:
return groups
end = frame.find(TIC_EGR, start)
if end < 0:
return groups
groups.append(frame[start+1:end])
start = end + 1
return groups
def check_sum(self, group):
return self._tic.check_sum(group)
def decode_group(self, group):
msg = self._tic.decode_group(group)
log.debug(f"msg: {msg}")
if msg and msg.get('label') in ( 'ADSC', 'ADCO' ):
self.parse_ads(msg['value'])
return msg
def parse_ads(self, ads):
if self.ads != ads:
self.ads = ads
self.device_manufacturer = ma = manufacturer_str.get(ads[0:2], "unknown")
try:
self.device_year = 2000 + int(ads[2:4])
except ValueError:
self.device_year = None
self.device_model = type_str.get(ads[4:6], "unknown")
self.device_serial = ads[6:]
log.info(f"device: {self.ads}")
log.info(f"manufacturer: {self.device_manufacturer}")
log.info(f"year: {self.device_year}")
log.info(f"model: {self.device_model}")
log.info(f"serial number: {self.device_serial}")
if self._cb_ads:
self._cb_ads(self.ads, self.device_manufacturer, self.device_year, self.device_model, self.device_serial)
def set_callback_ads(self, function):
self._cb_ads = function
if self.ads:
ads = self.ads
self.ads = None
self.parse_ads(self.ads)
def set_callback_msg(self, function):
self._cb_msg = function
def run(self):
while True:
frame = self.get_frame()
if not frame:
return
groups = self.split_frame(frame)
for group in groups:
if not self.check_sum(group):
continue
msg = self.decode_group(group)
log.info(msg)
if msg and self._cb_msg:
self._cb_msg(msg)
#if not self.device.is_serial():
# time.sleep(1)