import logging import datetime import time import re log = logging.getLogger(__name__) TIC_SEP_STD = b'\x09' msgs = { 'ADSC': { 'len': 12 }, 'VTIC': { 'len': 2 }, 'DATE': { 'len': 0 }, 'NGTF': { 'len': 16 }, 'LTARF': { 'len': 16 }, 'EAST': { 'len': 9, 'unit': 'Wh' }, 'EASF01': { 'len': 9, 'unit': 'Wh' }, 'EASF02': { 'len': 9, 'unit': 'Wh' }, 'EASF03': { 'len': 9, 'unit': 'Wh' }, 'EASF04': { 'len': 9, 'unit': 'Wh' }, 'EASF05': { 'len': 9, 'unit': 'Wh' }, 'EASF06': { 'len': 9, 'unit': 'Wh' }, 'EASF07': { 'len': 9, 'unit': 'Wh' }, 'EASF08': { 'len': 9, 'unit': 'Wh' }, 'EASF09': { 'len': 9, 'unit': 'Wh' }, 'EASF10': { 'len': 9, 'unit': 'Wh' }, 'EASD01': { 'len': 9, 'unit': 'Wh' }, 'EASD02': { 'len': 9, 'unit': 'Wh' }, 'EASD03': { 'len': 9, 'unit': 'Wh' }, 'EASD04': { 'len': 9, 'unit': 'Wh' }, 'EAIT': { 'len': 9, 'unit': 'Wh' }, 'ERQ1': { 'len': 9, 'unit': 'VArh' }, 'ERQ2': { 'len': 9, 'unit': 'VArh' }, 'ERQ3': { 'len': 9, 'unit': 'VArh' }, 'ERQ4': { 'len': 9, 'unit': 'VArh' }, 'IRMS1': { 'len': 3, 'unit': 'A' }, 'IRMS2': { 'len': 3, 'unit': 'A' }, 'IRMS3': { 'len': 3, 'unit': 'A' }, 'URMS1': { 'len': 3, 'unit': 'V' }, 'URMS2': { 'len': 3, 'unit': 'V' }, 'URMS3': { 'len': 3, 'unit': 'V' }, 'PREF': { 'len': 2, 'unit': 'kVA' }, 'PCOUP': { 'len': 2, 'unit': 'kVA' }, 'SINSTS': { 'len': 5, 'unit': 'VA' }, 'SINST1': { 'len': 5, 'unit': 'VA' }, 'SINST2': { 'len': 5, 'unit': 'VA' }, 'SINST3': { 'len': 5, 'unit': 'VA' }, 'SMAXSN': { 'len': 5, 'unit': 'VA' }, 'SMAXSN1': { 'len': 5, 'unit': 'VA' }, 'SMAXSN2': { 'len': 5, 'unit': 'VA' }, 'SMAXSN3': { 'len': 5, 'unit': 'VA' }, 'SMAXSN-1': { 'len': 5, 'unit': 'VA' }, 'SMAXSN1-1': { 'len': 5, 'unit': 'VA' }, 'SMAXSN2-1': { 'len': 5, 'unit': 'VA' }, 'SMAXSN3-1': { 'len': 5, 'unit': 'VA' }, 'SINSTI': { 'len': 5, 'unit': 'VA' }, 'SMAXIN': { 'len': 5, 'unit': 'VA' }, 'SMAXIN-1': { 'len': 5, 'unit': 'VA' }, 'CCASN': { 'len': 5, 'unit': 'W' }, 'CCASN-1': { 'len': 5, 'unit': 'W' }, 'CCAIN': { 'len': 5, 'unit': 'W' }, 'CCAIN-1': { 'len': 5, 'unit': 'W' }, 'UMOY1': { 'len': 3, 'unit': 'V' }, 'UMOY2': { 'len': 3, 'unit': 'V' }, 'UMOY3': { 'len': 3, 'unit': 'V' }, 'STGE': { 'len': 8 }, 'DPM1': { 'len': 2 }, 'FPM1': { 'len': 2 }, 'DPM2': { 'len': 2 }, 'FPM2': { 'len': 2 }, 'DPM3': { 'len': 2 }, 'FPM3': { 'len': 2 }, 'MSG1': { 'len': 32 }, 'MSG2': { 'len': 16 }, 'PRM': { 'len': 14 }, 'RELAIS': { 'len': 3 }, 'NTARF': { 'len': 2 }, 'NJOURF': { 'len': 2 }, 'NJOURF+1': { 'len': 2 }, 'PJOURF+1': { 'len': 98 }, 'PPOINTE': { 'len': 98 }, } class TicStandard: def __init__(self): self.sep = TIC_SEP_STD.decode('ascii') tz_winter = datetime.timezone(datetime.timedelta(seconds=-time.timezone), name=time.tzname[0]) tz_summer = datetime.timezone(datetime.timedelta(seconds=-time.altzone), name=time.tzname[1]) self.tz_info = { 'H': tz_winter, 'E': tz_summer, } pass def name(self): return "mode standard" def check_sum(self, group): checksum = group[-1] sum = 0 for c in group[:-1]: sum = sum + c sum = (sum & 0x3f) + 0x20 return sum == checksum def decode_group(self, group): group = group[:-2].decode('ascii') fields = group.split(self.sep) msg = {} if len(fields) < 2 or len(fields) > 4: log.warning(f'corrupted group: {group}') return None label = fields[0] value = fields[-1] date = None if len(fields) == 3: date = self.parse_date(fields[1]) if not label in msgs: log.warning(f'unknown label: {group}') return None if len(value) != msgs[label]['len']: log.warning(f'bad value length: {group}') return None unit = msgs[label].get('unit') value_type = msgs[label].get('value', None) if unit: try: value = int(value) except: log.warning(f'bad numerical value: {group}') return None else: value = re.sub('\s\s+', ' ', value).strip() if not len(value): value = None msg['label'] = label if value is not None: msg['value'] = value if date: msg['date'] = date if unit: msg['unit'] = unit return msg def parse_date(self, str): try: tz = self.tz_info.get(str[0].upper()) dy = int(str[1:3]) + 2000 dm = int(str[3:5]) dd = int(str[5:7]) th = int(str[7:9]) tm = int(str[9:11]) ts = int(str[11:13]) date = datetime.datetime(year=dy, month=dm, day=dd, hour=th, minute=tm, second=ts, tzinfo=tz) return date.isoformat() except Exception as e: print(e) return None