import logging import time import tic_device import tic_standard import tic_hist log = logging.getLogger(__name__) TIC_STX = b'\x02' TIC_ETX = b'\x03' TIC_SGR = b'\x0a' TIC_EGR = b'\x0d' TIC_MODE_STD = "standard" TIC_MODE_HIS = "historique" type_str = { '61': "Compteur monophasé 60 A généralisation Linky G3 - arrivée puissance haute", '62': "Compteur monophasé 90 A généralisation Linky G1 - arrivée puissance basse", '63': "Compteur triphasé 60 A généralisation Linky G1 - arrivée puissance basse", '64': "Compteur monophasé 60 A généralisation Linky G3 - arrivée puissance basse", '70': "Compteur monophasé Linky 60 A mise au point G3", '71': "Compteur triphasé Linky 60 A mise au point G3", '75': "Compteur monophasé 90 A généralisation Linky G3 - arrivée puissance basse", '76': "Compteur triphasé 60 A généralisation Linky G3 - arrivée puissance basse", } manufacturer_str = { "01": "CROUZET / MONETEL", "02": "SAGEM / SAGEMCOM", "03": "SCHLUMBERGER / ACTARIS / ITRON", "04": "LANDIS ET GYR / SIEMENS METERING / LANDIS+GYR", "05": "SAUTER / STEPPER ENERGIE France / ZELLWEGER", "06": "ITRON", "07": "MAEC", "08": "MATRA-CHAUVIN ARNOUX / ENERDIS", "09": "FAURE-HERMAN", "10": "SEVME / SIS", "11": "MAGNOL / ELSTER / HONEYWELL", "12": "GAZ THERMIQUE", "14": "GHIELMETTI / DIALOG E.S. / MICRONIQUE", "15": "MECELEC", "16": "LEGRAND / BACO", "17": "SERD-SCHLUMBERGER", "18": "SCHNEIDER / MERLIN GERIN / GARDY", "19": "GENERAL ELECTRIC / POWER CONTROL", "20": "NUOVO PIGNONE / DRESSER", "21": "SCLE", "22": "EDF", "23": "GDF / GDF-SUEZ", "24": "HAGER – GENERAL ELECTRIC", "25": "DELTA-DORE", "26": "RIZ", "27": "ISKRAEMECO", "28": "GMT", "29": "ANALOG DEVICE", "30": "MICHAUD", "31": "HEXING ELECTRICAL CO. Ltd", "32": "SIAME", "33": "LARSEN & TOUBRO Limited", "34": "ELSTER / HONEYWELL", "35": "ELECTRONIC AFZAR AZMA", "36": "ADVANCED ELECTRONIC COMPAGNY Ldt", "37": "AEM", "38": "ZHEJIANG CHINT INSTRUMENT & METER CO. Ldt", "39": "ZIV", "70": "LANDIS et GYR (export ou régie)", "71": "STEPPER ENERGIE France (export ou régie)", "81": "SAGEM / SAGEMCOM", "82": "LANDIS ET GYR / SIEMENS METERING / LANDIS+GYR", "83": "ELSTER / HONEYWELL", "84": "SAGEM / SAGEMCOM", "85": "ITRON", } class Tic: def __init__(self, input): self.input = input self.device = tic_device.TicDevice(input) self.ads = None self.device_manufacturer = None self.device_year = None self.device_model = None self.device_serial = None self.mode = None self._tic = None self._cb_ads = None self._cb_msg = None def set_mode(self, mode): if mode == TIC_MODE_STD: self.mode = TIC_MODE_STD self._tic = tic_standard.TicStandard() self.device.set_baudrate(9600) log.info(f"set_mode: {self._tic.name()}") elif mode == TIC_MODE_HIS: self.mode = TIC_MODE_HIS self._tic = tic_hist.TicHist() self.device.set_baudrate(1200) log.info(f"set_mode: {self._tic.name()}") else: self.mode = None self._tic = None log.info(f"set_mode: unsupported") def detect_mode(self): for mode in ( TIC_MODE_STD, TIC_MODE_HIS ): self.set_mode(mode) f = self.get_frame() if not f: continue groups = self.split_frame(f) for group in groups: if not self.check_sum(group): continue self.decode_group(group) if self.ads: log.info(f"detected {self._tic.name()}") return mode log.error("no valid frame detected") return None def get_frame(self): frame = b'' while True: c = self.device.get_char() if not c: return None if c == TIC_STX: break while True: c = self.device.get_char() if not c: return None if c == TIC_ETX: break frame = frame + c return frame def split_frame(self, frame): groups = [] start = 0 while start <= len(frame): start = frame.find(TIC_SGR, start) if start < 0: return groups end = frame.find(TIC_EGR, start) if end < 0: return groups groups.append(frame[start+1:end]) start = end + 1 return groups def check_sum(self, group): return self._tic.check_sum(group) def decode_group(self, group): msg = self._tic.decode_group(group) log.debug(f"msg: {msg}") if msg and msg.get('label') in ( 'ADSC', 'ADCO' ): self.parse_ads(msg['value']) return msg def parse_ads(self, ads): if self.ads != ads: self.ads = ads self.device_manufacturer = ma = manufacturer_str.get(ads[0:2], "unknown") try: self.device_year = 2000 + int(ads[2:4]) except ValueError: self.device_year = None self.device_model = type_str.get(ads[4:6], "unknown") self.device_serial = ads[6:] log.info(f"device: {self.ads}") log.info(f"manufacturer: {self.device_manufacturer}") log.info(f"year: {self.device_year}") log.info(f"model: {self.device_model}") log.info(f"serial number: {self.device_serial}") if self._cb_ads: self._cb_ads(self.ads, self.device_manufacturer, self.device_year, self.device_model, self.device_serial) def set_callback_ads(self, function): self._cb_ads = function if self.ads: ads = self.ads self.ads = None self.parse_ads(self.ads) def set_callback_msg(self, function): self._cb_msg = function def run(self): while True: frame = self.get_frame() if not frame: return groups = self.split_frame(frame) for group in groups: if not self.check_sum(group): continue msg = self.decode_group(group) log.info(msg) if msg and self._cb_msg: self._cb_msg(msg) #if not self.device.is_serial(): # time.sleep(1)