From 9f810d28b0624ba40ef9e35036daa4390ca707ab Mon Sep 17 00:00:00 2001 From: Gilles Grandou Date: Sun, 26 May 2024 15:42:50 +0200 Subject: [PATCH] wip --- tic.py | 199 +++++++++++++++++++++++++++++++++ tic2mqtt | 291 +++--------------------------------------------- tic_device.py | 41 +++++++ tic_hist.py | 65 +++++++++++ tic_mqtt.py | 19 ++++ tic_standard.py | 153 +++++++++++++++++++++++++ 6 files changed, 491 insertions(+), 277 deletions(-) create mode 100644 tic.py create mode 100644 tic_device.py create mode 100644 tic_hist.py create mode 100644 tic_mqtt.py create mode 100644 tic_standard.py diff --git a/tic.py b/tic.py new file mode 100644 index 0000000..25a00b4 --- /dev/null +++ b/tic.py @@ -0,0 +1,199 @@ +import logging + +import tic_device +import tic_standard +import tic_hist + +log = logging.getLogger(__name__) + +TIC_STX = b'\x02' +TIC_ETX = b'\x03' +TIC_SGR = b'\x0a' +TIC_EGR = b'\x0d' + +TIC_MODE_STD = 1 +TIC_MODE_HIS = 2 + +type_str = { + '61': "Compteur monophasé 60 A généralisation Linky G3 - arrivée puissance haute", + '62': "Compteur monophasé 90 A généralisation Linky G1 - arrivée puissance basse", + '63': "Compteur triphasé 60 A généralisation Linky G1 - arrivée puissance basse", + '64': "Compteur monophasé 60 A généralisation Linky G3 - arrivée puissance basse", + '70': "Compteur monophasé Linky 60 A mise au point G3", + '71': "Compteur triphasé Linky 60 A mise au point G3", + '75': "Compteur monophasé 90 A généralisation Linky G3 - arrivée puissance basse", + '76': "Compteur triphasé 60 A généralisation Linky G3 - arrivée puissance basse", +} + +manufacturer_str = { + "01": "CROUZET / MONETEL", + "02": "SAGEM / SAGEMCOM", + "03": "SCHLUMBERGER / ACTARIS / ITRON", + "04": "LANDIS ET GYR / SIEMENS METERING / LANDIS+GYR", + "05": "SAUTER / STEPPER ENERGIE France / ZELLWEGER", + "06": "ITRON", + "07": "MAEC", + "08": "MATRA-CHAUVIN ARNOUX / ENERDIS", + "09": "FAURE-HERMAN", + "10": "SEVME / SIS", + "11": "MAGNOL / ELSTER / HONEYWELL", + "12": "GAZ THERMIQUE", + "14": "GHIELMETTI / DIALOG E.S. / MICRONIQUE", + "15": "MECELEC", + "16": "LEGRAND / BACO", + "17": "SERD-SCHLUMBERGER", + "18": "SCHNEIDER / MERLIN GERIN / GARDY", + "19": "GENERAL ELECTRIC / POWER CONTROL", + "20": "NUOVO PIGNONE / DRESSER", + "21": "SCLE", + "22": "EDF", + "23": "GDF / GDF-SUEZ", + "24": "HAGER – GENERAL ELECTRIC", + "25": "DELTA-DORE", + "26": "RIZ", + "27": "ISKRAEMECO", + "28": "GMT", + "29": "ANALOG DEVICE", + "30": "MICHAUD", + "31": "HEXING ELECTRICAL CO. Ltd", + "32": "SIAME", + "33": "LARSEN & TOUBRO Limited", + "34": "ELSTER / HONEYWELL", + "35": "ELECTRONIC AFZAR AZMA", + "36": "ADVANCED ELECTRONIC COMPAGNY Ldt", + "37": "AEM", + "38": "ZHEJIANG CHINT INSTRUMENT & METER CO. Ldt", + "39": "ZIV", + "70": "LANDIS et GYR (export ou régie)", + "71": "STEPPER ENERGIE France (export ou régie)", + "81": "SAGEM / SAGEMCOM", + "82": "LANDIS ET GYR / SIEMENS METERING / LANDIS+GYR", + "83": "ELSTER / HONEYWELL", + "84": "SAGEM / SAGEMCOM", + "85": "ITRON", +} + + +class Tic: + def __init__(self, input): + self.input = input + self.device = tic_device.TicDevice(input) + self.ads = None + self.device_manufacturer = None + self.device_year = None + self.device_model = None + self.device_serial = None + self.mode = None + self._tic = None + + def detect_mode(self): + if self._detect_mode(): + return + if self.device.is_serial(): + log.debug("tring speed 1200bps") + self.device.set_baudrate(1200) + if self._detect_mode(): + return + log.error("no TIC protocol detected") + return None + + def _detect_mode(self): + ok_mask = 0x00 + tic_map = { TIC_STX: 0x01, + TIC_ETX: 0x02, + TIC_SGR: 0x04, + TIC_EGR: 0x08, + b'\x09': 0x10 } # TODO + + nb_reset = 0 + while c := self.device.get_char(): + ok = tic_map.get(c, 0) + if ok_mask or ok == 0x01: + ok_mask = ok_mask | ok + if ok_mask == 0x1f: + log.info(f"Standard mode detected") + self.mode = TIC_MODE_STD + self._tic = tic_standard.TicStandard() + return self.mode + if ok_mask == 0x0f: + log.info(f"Historiqual mode detected") + self.mode = TIC_MODE_HIS + self._tic = tic_hist.TicHist() + return self.mode + if not ok and (c < b' ' or c > b'\x7f'): + log.debug(f"bad char: {c}") + ok_mask = 0x00 + nb_reset = nb_reset + 1 + if nb_reset >= 20: + break + log.debug("Timeout/EOF") + return None + + def get_frame(self): + frame = b'' + while True: + c = self.device.get_char() + if not c: + return None + if c == TIC_STX: + break + while True: + c = self.device.get_char() + if not c: + return None + if c == TIC_ETX: + break + frame = frame + c + return frame + + def split_frame(self, frame): + groups = [] + start = 0 + while start <= len(frame): + start = frame.find(TIC_SGR, start) + if start < 0: + return groups + end = frame.find(TIC_EGR, start) + if end < 0: + return groups + groups.append(frame[start+1:end]) + start = end + 1 + return groups + + def check_sum(self, group): + return self._tic.check_sum(group) + + def decode_group(self, group): + msg = self._tic.decode_group(group) + if msg['label'] == 'ADSC': + self.parse_ads(msg['value']) + return msg + + def parse_ads(self, ads): + if self.ads != ads: + self.ads = ads + self.device_manufacturer = ma = manufacturer_str.get(ads[0:2], "unknown") + try: + self.device_year = 2000 + int(ads[2:4]) + except ValueError: + self.device_year = None + self.device_model = type_str.get(ads[4:6], "unknown") + self.device_serial = ads[6:] + log.info(f"device: {self.ads}") + log.info(f"manufacturer: {self.device_manufacturer}") + log.info(f"year: {self.device_year}") + log.info(f"model: {self.device_model}") + log.info(f"serial number: {self.device_serial}") + + def run(self): + while True: + frame = self.get_frame() + if not frame: + return + groups = self.split_frame(frame) + for group in groups: + if not self.check_sum(group): + continue + msg = self.decode_group(group) + log.info(msg) + diff --git a/tic2mqtt b/tic2mqtt index 5af1fc9..93c3a22 100755 --- a/tic2mqtt +++ b/tic2mqtt @@ -1,284 +1,13 @@ #!/usr/bin/env python3 -import sys import argparse -import time -import serial import logging -import datetime -import re -from pprint import pprint -TIC_MODE_STD = 1 -TIC_MODE_HIS = 2 - -TIC_STX = b'\x02' -TIC_ETX = b'\x03' -TIC_SGR = b'\x0a' -TIC_SEP_STD = b'\x09' -TIC_SEP_HIS = b'\x20' -TIC_EGR = b'\x0d' +import tic +import tic_mqtt log = logging.getLogger(__name__) -tic_msg_std = { - 'ADSC': { 'len': 12, 'value': 'adsc' }, - 'VTIC': { 'len': 2, 'value': 'num' }, - 'DATE': { 'len': 0 }, - 'NGTF': { 'len': 16 }, - 'LTARF': { 'len': 16 }, - 'EAST': { 'len': 9, 'value': 'num' }, - 'EASF01': { 'len': 9, 'value': 'num' }, - 'EASF02': { 'len': 9, 'value': 'num' }, - 'EASF03': { 'len': 9, 'value': 'num' }, - 'EASF04': { 'len': 9, 'value': 'num' }, - 'EASF05': { 'len': 9, 'value': 'num' }, - 'EASF06': { 'len': 9, 'value': 'num' }, - 'EASF07': { 'len': 9, 'value': 'num' }, - 'EASF08': { 'len': 9, 'value': 'num' }, - 'EASF09': { 'len': 9, 'value': 'num' }, - 'EASF10': { 'len': 9, 'value': 'num' }, - 'EASD01': { 'len': 9, 'value': 'num' }, - 'EASD02': { 'len': 9, 'value': 'num' }, - 'EASD03': { 'len': 9, 'value': 'num' }, - 'EASD04': { 'len': 9, 'value': 'num' }, - 'EAIT': { 'len': 9, 'value': 'num' }, - 'ERQ1': { 'len': 9, 'value': 'num' }, - 'ERQ2': { 'len': 9, 'value': 'num' }, - 'ERQ3': { 'len': 9, 'value': 'num' }, - 'ERQ4': { 'len': 9, 'value': 'num' }, - 'IRMS1': { 'len': 3, 'value': 'num' }, - 'IRMS2': { 'len': 3, 'value': 'num' }, - 'IRMS3': { 'len': 3, 'value': 'num' }, - 'URMS1': { 'len': 3, 'value': 'num' }, - 'URMS2': { 'len': 3, 'value': 'num' }, - 'URMS3': { 'len': 3, 'value': 'num' }, - 'PREF': { 'len': 2, 'value': 'num' }, - 'PCOUP': { 'len': 2, 'value': 'num' }, - 'SINSTS': { 'len': 5, 'value': 'num' }, - 'SINST1': { 'len': 5, 'value': 'num' }, - 'SINST2': { 'len': 5, 'value': 'num' }, - 'SINST3': { 'len': 5, 'value': 'num' }, - 'SMAXSN': { 'len': 5, 'value': 'num' }, - 'SMAXSN1': { 'len': 5, 'value': 'num' }, - 'SMAXSN2': { 'len': 5, 'value': 'num' }, - 'SMAXSN3': { 'len': 5, 'value': 'num' }, - 'SMAXSN-1': { 'len': 5, 'value': 'num' }, - 'SMAXSN1-1': { 'len': 5, 'value': 'num' }, - 'SMAXSN2-1': { 'len': 5, 'value': 'num' }, - 'SMAXSN3-1': { 'len': 5, 'value': 'num' }, - 'SINSTI': { 'len': 5, 'value': 'num' }, - 'SMAXIN': { 'len': 5, 'value': 'num' }, - 'SMAXIN-1': { 'len': 5, 'value': 'num' }, - 'CCASN': { 'len': 5, 'value': 'num' }, - 'CCASN-1': { 'len': 5, 'value': 'num' }, - 'CCAIN': { 'len': 5, 'value': 'num' }, - 'CCAIN-1': { 'len': 5, 'value': 'num' }, - 'UMOY1': { 'len': 3, 'value': 'num' }, - 'UMOY2': { 'len': 3, 'value': 'num' }, - 'UMOY3': { 'len': 3, 'value': 'num' }, - 'STGE': { 'len': 8 }, - 'DPM1': { 'len': 2 }, - 'FPM1': { 'len': 2 }, - 'DPM2': { 'len': 2 }, - 'FPM2': { 'len': 2 }, - 'DPM3': { 'len': 2 }, - 'FPM3': { 'len': 2 }, - 'MSG1': { 'len': 32 }, - 'MSG2': { 'len': 16 }, - 'PRM': { 'len': 14 }, - 'RELAIS': { 'len': 3 }, - 'NTARF': { 'len': 2 }, - 'NJOURF': { 'len': 2 }, - 'NJOURF+1': { 'len': 2 }, - 'PJOURF+1': { 'len': 98 }, - 'PPOINTE': { 'len': 98 }, -} - -class TicDevice: - def __init__(self, name): - self._is_serial = False - self.device = None - try: - self.device = serial.Serial(name, - bytesize=serial.SEVENBITS, - parity=serial.PARITY_EVEN, - stopbits=serial.STOPBITS_ONE, - timeout=2.0) - self.device.baudrate = 9600 - self._is_serial = True - except: - if self.device: - self.device.close() - self.device = None - if not self.device: - self.device = open(name, 'rb') - self._is_serial = False - - def set_baudrate(self, rate): - if self._is_serial: - self.device.baudrate = rate - - def get_char(self): - c = self.device.read(1) - if not c: - if self._is_serial: - log.debug("RX Timeout") - else: - log.debug("EOF.") - return c - - def is_serial(self): - return self._is_serial - -class Tic: - def __init__(self, input): - self.input = input - self.device = TicDevice(input) - self.mode = None - - def detect_mode(self): - if self._detect_mode(): - return - if self.device.is_serial(): - log.debug("tring speed 1200bps") - self.device.set_baudrate(1200) - if self._detect_mode(): - return - log.error("no TIC protocol detected") - return None - - def _detect_mode(self): - ok_mask = 0x00 - tic_map = { TIC_STX: 0x01, - TIC_ETX: 0x02, - TIC_SGR: 0x04, - TIC_EGR: 0x08, - TIC_SEP_STD: 0x10 } - - nb_reset = 0 - while c := self.device.get_char(): - ok = tic_map.get(c, 0) - if ok_mask or ok == 0x01: - ok_mask = ok_mask | ok - if ok_mask == 0x1f: - log.info(f"Standard mode detected") - self.mode = TIC_MODE_STD - return self.mode - if ok_mask == 0x0f: - log.info(f"Historiqual mode detected") - self.mode = TIC_MODE_HIS - return self.mode - if not ok and (c < b' ' or c > b'\x7f'): - log.debug(f"bad char: {c}") - ok_mask = 0x00 - nb_reset = nb_reset + 1 - if nb_reset >= 20: - break - log.debug("Timeout/EOF") - return None - - def get_frame(self): - frame = b'' - while True: - c = self.device.get_char() - if not c: - return None - if c == TIC_STX: - break - while True: - c = self.device.get_char() - if not c: - return None - if c == TIC_ETX: - break - frame = frame + c - return frame - - def split_frame(self, frame): - groups = [] - start = 0 - while start <= len(frame): - start = frame.find(TIC_SGR, start) - if start < 0: - return groups - end = frame.find(TIC_EGR, start) - if end < 0: - return groups - groups.append(frame[start+1:end]) - start = end + 1 - return groups - - def check_sum(self, group): - checksum = group[-1] - end = -2 if self.mode == TIC_MODE_HIS else -1 - sum = 0 - for c in group[:end]: - sum = sum + c - sum = (sum & 0x3f) + 0x20 - return sum == checksum - - def decode_group(self, group): - sep = (TIC_SEP_HIS if self.mode == TIC_MODE_HIS else TIC_SEP_STD).decode('ascii') - group = group[:-2].decode('ascii') - fields = group.split(sep) - msg = {} - if len(fields) < 2 or len(fields) > 4: - log.warning(f'corrupted group: {group}') - return None - label = fields[0] - value = fields[-1] - date = None - if len(fields) == 3: - date = self.parse_date(fields[1]) - - if not label in tic_msg_std: - log.warning(f'unknown label: {group}') - return None - if len(value) != tic_msg_std[label]['len']: - log.warning(f'bad value length: {group}') - return None - value_type = tic_msg_std[label].get('value', None) - if value_type == 'num': - try: - value = int(value) - except: - log.warning(f'bad numerical value: {group}') - return None - else: - value = re.sub('\s\s+', ' ', value).strip() - - msg['label'] = label - msg['value'] = value - msg['date'] = date - return msg - - def parse_date(self, str): - try: - dy = int(str[1:3]) + 2000 - dm = int(str[3:5]) - dd = int(str[5:7]) - th = int(str[7:9]) - tm = int(str[9:11]) - ts = int(str[11:13]) - date = datetime.datetime(year=dy, month=dm, day=dd, hour=th, minute=tm, second=ts) - return date - except Exception as e: - print(e) - return None - - def run(self): - while True: - frame = self.get_frame() - if not frame: - return - groups = self.split_frame(frame) - for group in groups: - if not self.check_sum(group): - continue - msg = self.decode_group(group) - print(msg) - def main(): parser = argparse.ArgumentParser(prog="tic2mqtt", @@ -291,13 +20,21 @@ def main(): help='select TIC mode') parser.add_argument('-l', '--log', choices=['debug', 'info', 'warning', 'error'], default='error', help='set the log level') + parser.add_argument('-b', '--broker', + help='MQTT broker hostname') args = parser.parse_args() - logging.basicConfig(filename='/dev/stderr', level=getattr(logging, args.log.upper())) + logging.basicConfig(level=getattr(logging, args.log.upper())) - tic = Tic(args.input) - tic.detect_mode() - tic.run() + tc = tic.Tic(args.input) + tc.detect_mode() + + mqtt = tic_mqtt.TicPublisher(args.broker) + + tc.run() + + if mqtt: + mqtt.close() diff --git a/tic_device.py b/tic_device.py new file mode 100644 index 0000000..9238bdd --- /dev/null +++ b/tic_device.py @@ -0,0 +1,41 @@ +import logging + +log = logging.getLogger(__name__) + + +class TicDevice: + def __init__(self, name): + self._is_serial = False + self.device = None + try: + self.device = serial.Serial(name, + bytesize=serial.SEVENBITS, + parity=serial.PARITY_EVEN, + stopbits=serial.STOPBITS_ONE, + timeout=2.0) + self.device.baudrate = 9600 + self._is_serial = True + except: + if self.device: + self.device.close() + self.device = None + if not self.device: + self.device = open(name, 'rb') + self._is_serial = False + + def set_baudrate(self, rate): + if self._is_serial: + self.device.baudrate = rate + + def get_char(self): + c = self.device.read(1) + if not c: + if self._is_serial: + log.debug("RX Timeout") + else: + log.debug("EOF.") + return c + + def is_serial(self): + return self._is_serial + diff --git a/tic_hist.py b/tic_hist.py new file mode 100644 index 0000000..5036e94 --- /dev/null +++ b/tic_hist.py @@ -0,0 +1,65 @@ +import re +import logging + +log = logging.getLogger(__name__) + +TIC_SEP_HIS = b'\x20' + +msgs = { + 'ADCO': { 'len': 12, 'value': 'adsc' }, + 'OPTARIF': { 'len': 4 }, + 'ISOUSC': { 'len': 2, 'value': 'num' }, + 'HCHC': { 'len': 9, 'value': 'num' }, + 'HCHP': { 'len': 9, 'value': 'num' }, + 'PTEC': { 'len': 4 }, + 'IINST': { 'len': 3, 'value': 'num' }, + 'ADPS': { 'len': 3, 'value': 'num' }, + 'IMAX': { 'len': 3, 'value': 'num' }, + 'PAPP': { 'len': 5, 'value': 'num' }, + 'HHPHC': { 'len': 1 }, + 'MOTDETAT': { 'len': 6, 'value': 'num' }, +} + +class TicHist: + def __init__(self): + self.sep = TIC_SEP_HIS.decode('ascii') + pass + + def check_sum(self, group): + checksum = group[-1] + sum = 0 + for c in group[:-2]: + sum = sum + c + sum = (sum & 0x3f) + 0x20 + return sum == checksum + + def decode_group(self, group): + group = group[:-2].decode('ascii') + fields = group.split(self.sep) + msg = {} + if len(fields) < 2 or len(fields) > 4: + log.warning(f'corrupted group: {group}') + return None + label = fields[0] + value = fields[-1] + + if not label in msgs: + log.warning(f'unknown label: {group}') + return None + if len(value) != msgs[label]['len']: + log.warning(f'bad value length: {group}') + return None + value_type = msgs[label].get('value', None) + if value_type == 'num': + try: + value = int(value) + except: + log.warning(f'bad numerical value: {group}') + return None + else: + value = re.sub('\s\s+', ' ', value).strip() + + msg['label'] = label + msg['value'] = value + return msg + diff --git a/tic_mqtt.py b/tic_mqtt.py new file mode 100644 index 0000000..144eb99 --- /dev/null +++ b/tic_mqtt.py @@ -0,0 +1,19 @@ +import paho.mqtt.client as mqtt +import logging + +log = logging.getLogger(__name__) + + +class TicPublisher: + def __init__(self, hostname=None, port=1883): + self.client = None + if hostname: + self.client = mqtt.Client() + self.client.connect(hostname, port) + #self.client.loop_start() + + def close(self): + if self.client: + #self.client.loop_stop() + self.client = None + diff --git a/tic_standard.py b/tic_standard.py new file mode 100644 index 0000000..441a071 --- /dev/null +++ b/tic_standard.py @@ -0,0 +1,153 @@ +import logging +import datetime +import time +import re + +log = logging.getLogger(__name__) + +TIC_SEP_STD = b'\x09' + +msgs = { + 'ADSC': { 'len': 12, 'value': 'adsc' }, + 'VTIC': { 'len': 2, 'value': 'num' }, + 'DATE': { 'len': 0 }, + 'NGTF': { 'len': 16 }, + 'LTARF': { 'len': 16 }, + 'EAST': { 'len': 9, 'value': 'num' }, + 'EASF01': { 'len': 9, 'value': 'num' }, + 'EASF02': { 'len': 9, 'value': 'num' }, + 'EASF03': { 'len': 9, 'value': 'num' }, + 'EASF04': { 'len': 9, 'value': 'num' }, + 'EASF05': { 'len': 9, 'value': 'num' }, + 'EASF06': { 'len': 9, 'value': 'num' }, + 'EASF07': { 'len': 9, 'value': 'num' }, + 'EASF08': { 'len': 9, 'value': 'num' }, + 'EASF09': { 'len': 9, 'value': 'num' }, + 'EASF10': { 'len': 9, 'value': 'num' }, + 'EASD01': { 'len': 9, 'value': 'num' }, + 'EASD02': { 'len': 9, 'value': 'num' }, + 'EASD03': { 'len': 9, 'value': 'num' }, + 'EASD04': { 'len': 9, 'value': 'num' }, + 'EAIT': { 'len': 9, 'value': 'num' }, + 'ERQ1': { 'len': 9, 'value': 'num' }, + 'ERQ2': { 'len': 9, 'value': 'num' }, + 'ERQ3': { 'len': 9, 'value': 'num' }, + 'ERQ4': { 'len': 9, 'value': 'num' }, + 'IRMS1': { 'len': 3, 'value': 'num' }, + 'IRMS2': { 'len': 3, 'value': 'num' }, + 'IRMS3': { 'len': 3, 'value': 'num' }, + 'URMS1': { 'len': 3, 'value': 'num' }, + 'URMS2': { 'len': 3, 'value': 'num' }, + 'URMS3': { 'len': 3, 'value': 'num' }, + 'PREF': { 'len': 2, 'value': 'num' }, + 'PCOUP': { 'len': 2, 'value': 'num' }, + 'SINSTS': { 'len': 5, 'value': 'num' }, + 'SINST1': { 'len': 5, 'value': 'num' }, + 'SINST2': { 'len': 5, 'value': 'num' }, + 'SINST3': { 'len': 5, 'value': 'num' }, + 'SMAXSN': { 'len': 5, 'value': 'num' }, + 'SMAXSN1': { 'len': 5, 'value': 'num' }, + 'SMAXSN2': { 'len': 5, 'value': 'num' }, + 'SMAXSN3': { 'len': 5, 'value': 'num' }, + 'SMAXSN-1': { 'len': 5, 'value': 'num' }, + 'SMAXSN1-1': { 'len': 5, 'value': 'num' }, + 'SMAXSN2-1': { 'len': 5, 'value': 'num' }, + 'SMAXSN3-1': { 'len': 5, 'value': 'num' }, + 'SINSTI': { 'len': 5, 'value': 'num' }, + 'SMAXIN': { 'len': 5, 'value': 'num' }, + 'SMAXIN-1': { 'len': 5, 'value': 'num' }, + 'CCASN': { 'len': 5, 'value': 'num' }, + 'CCASN-1': { 'len': 5, 'value': 'num' }, + 'CCAIN': { 'len': 5, 'value': 'num' }, + 'CCAIN-1': { 'len': 5, 'value': 'num' }, + 'UMOY1': { 'len': 3, 'value': 'num' }, + 'UMOY2': { 'len': 3, 'value': 'num' }, + 'UMOY3': { 'len': 3, 'value': 'num' }, + 'STGE': { 'len': 8 }, + 'DPM1': { 'len': 2 }, + 'FPM1': { 'len': 2 }, + 'DPM2': { 'len': 2 }, + 'FPM2': { 'len': 2 }, + 'DPM3': { 'len': 2 }, + 'FPM3': { 'len': 2 }, + 'MSG1': { 'len': 32 }, + 'MSG2': { 'len': 16 }, + 'PRM': { 'len': 14 }, + 'RELAIS': { 'len': 3 }, + 'NTARF': { 'len': 2 }, + 'NJOURF': { 'len': 2 }, + 'NJOURF+1': { 'len': 2 }, + 'PJOURF+1': { 'len': 98 }, + 'PPOINTE': { 'len': 98 }, +} + + +class TicStandard: + def __init__(self): + self.sep = TIC_SEP_STD.decode('ascii') + tz_winter = datetime.timezone(datetime.timedelta(seconds=-time.timezone), name=time.tzname[0]) + tz_summer = datetime.timezone(datetime.timedelta(seconds=-time.altzone), name=time.tzname[1]) + self.tz_info = { + 'H': tz_winter, + 'E': tz_summer, + } + pass + + def check_sum(self, group): + checksum = group[-1] + sum = 0 + for c in group[:-1]: + sum = sum + c + sum = (sum & 0x3f) + 0x20 + return sum == checksum + + def decode_group(self, group): + group = group[:-2].decode('ascii') + fields = group.split(self.sep) + msg = {} + if len(fields) < 2 or len(fields) > 4: + log.warning(f'corrupted group: {group}') + return None + label = fields[0] + value = fields[-1] + date = None + if len(fields) == 3: + date = self.parse_date(fields[1]) + + if not label in msgs: + log.warning(f'unknown label: {group}') + return None + if len(value) != msgs[label]['len']: + log.warning(f'bad value length: {group}') + return None + value_type = msgs[label].get('value', None) + if value_type == 'num': + try: + value = int(value) + except: + log.warning(f'bad numerical value: {group}') + return None + else: + value = re.sub('\s\s+', ' ', value).strip() + + msg['label'] = label + msg['value'] = value + if date: + msg['date'] = date + return msg + + def parse_date(self, str): + try: + tz = self.tz_info.get(str[0].upper()) + dy = int(str[1:3]) + 2000 + dm = int(str[3:5]) + dd = int(str[5:7]) + th = int(str[7:9]) + tm = int(str[9:11]) + ts = int(str[11:13]) + date = datetime.datetime(year=dy, month=dm, day=dd, hour=th, minute=tm, second=ts, tzinfo=tz) + return date.isoformat() + except Exception as e: + print(e) + return None +