tic2mqtt/tic_standard.py

163 lines
5.4 KiB
Python

import logging
import datetime
import time
import re
log = logging.getLogger(__name__)
TIC_SEP_STD = b'\x09'
msgs = {
'ADSC': { 'len': 12 },
'VTIC': { 'len': 2 },
'DATE': { 'len': 0 },
'NGTF': { 'len': 16 },
'LTARF': { 'len': 16 },
'EAST': { 'len': 9, 'unit': 'Wh' },
'EASF01': { 'len': 9, 'unit': 'Wh' },
'EASF02': { 'len': 9, 'unit': 'Wh' },
'EASF03': { 'len': 9, 'unit': 'Wh' },
'EASF04': { 'len': 9, 'unit': 'Wh' },
'EASF05': { 'len': 9, 'unit': 'Wh' },
'EASF06': { 'len': 9, 'unit': 'Wh' },
'EASF07': { 'len': 9, 'unit': 'Wh' },
'EASF08': { 'len': 9, 'unit': 'Wh' },
'EASF09': { 'len': 9, 'unit': 'Wh' },
'EASF10': { 'len': 9, 'unit': 'Wh' },
'EASD01': { 'len': 9, 'unit': 'Wh' },
'EASD02': { 'len': 9, 'unit': 'Wh' },
'EASD03': { 'len': 9, 'unit': 'Wh' },
'EASD04': { 'len': 9, 'unit': 'Wh' },
'EAIT': { 'len': 9, 'unit': 'Wh' },
'ERQ1': { 'len': 9, 'unit': 'VArh' },
'ERQ2': { 'len': 9, 'unit': 'VArh' },
'ERQ3': { 'len': 9, 'unit': 'VArh' },
'ERQ4': { 'len': 9, 'unit': 'VArh' },
'IRMS1': { 'len': 3, 'unit': 'A' },
'IRMS2': { 'len': 3, 'unit': 'A' },
'IRMS3': { 'len': 3, 'unit': 'A' },
'URMS1': { 'len': 3, 'unit': 'V' },
'URMS2': { 'len': 3, 'unit': 'V' },
'URMS3': { 'len': 3, 'unit': 'V' },
'PREF': { 'len': 2, 'unit': 'kVA' },
'PCOUP': { 'len': 2, 'unit': 'kVA' },
'SINSTS': { 'len': 5, 'unit': 'VA' },
'SINST1': { 'len': 5, 'unit': 'VA' },
'SINST2': { 'len': 5, 'unit': 'VA' },
'SINST3': { 'len': 5, 'unit': 'VA' },
'SMAXSN': { 'len': 5, 'unit': 'VA' },
'SMAXSN1': { 'len': 5, 'unit': 'VA' },
'SMAXSN2': { 'len': 5, 'unit': 'VA' },
'SMAXSN3': { 'len': 5, 'unit': 'VA' },
'SMAXSN-1': { 'len': 5, 'unit': 'VA' },
'SMAXSN1-1': { 'len': 5, 'unit': 'VA' },
'SMAXSN2-1': { 'len': 5, 'unit': 'VA' },
'SMAXSN3-1': { 'len': 5, 'unit': 'VA' },
'SINSTI': { 'len': 5, 'unit': 'VA' },
'SMAXIN': { 'len': 5, 'unit': 'VA' },
'SMAXIN-1': { 'len': 5, 'unit': 'VA' },
'CCASN': { 'len': 5, 'unit': 'W' },
'CCASN-1': { 'len': 5, 'unit': 'W' },
'CCAIN': { 'len': 5, 'unit': 'W' },
'CCAIN-1': { 'len': 5, 'unit': 'W' },
'UMOY1': { 'len': 3, 'unit': 'V' },
'UMOY2': { 'len': 3, 'unit': 'V' },
'UMOY3': { 'len': 3, 'unit': 'V' },
'STGE': { 'len': 8 },
'DPM1': { 'len': 2 },
'FPM1': { 'len': 2 },
'DPM2': { 'len': 2 },
'FPM2': { 'len': 2 },
'DPM3': { 'len': 2 },
'FPM3': { 'len': 2 },
'MSG1': { 'len': 32 },
'MSG2': { 'len': 16 },
'PRM': { 'len': 14 },
'RELAIS': { 'len': 3 },
'NTARF': { 'len': 2 },
'NJOURF': { 'len': 2 },
'NJOURF+1': { 'len': 2 },
'PJOURF+1': { 'len': 98 },
'PPOINTE': { 'len': 98 },
}
class TicStandard:
def __init__(self):
self.sep = TIC_SEP_STD.decode('ascii')
tz_winter = datetime.timezone(datetime.timedelta(seconds=-time.timezone), name=time.tzname[0])
tz_summer = datetime.timezone(datetime.timedelta(seconds=-time.altzone), name=time.tzname[1])
self.tz_info = {
'H': tz_winter,
'E': tz_summer,
}
pass
def name(self):
return "mode standard"
def check_sum(self, group):
checksum = group[-1]
sum = 0
for c in group[:-1]:
sum = sum + c
sum = (sum & 0x3f) + 0x20
return sum == checksum
def decode_group(self, group):
group = group[:-2].decode('ascii')
fields = group.split(self.sep)
msg = {}
if len(fields) < 2 or len(fields) > 4:
log.warning(f'corrupted group: {group}')
return None
label = fields[0]
value = fields[-1]
date = None
if len(fields) == 3:
date = self.parse_date(fields[1])
if not label in msgs:
log.warning(f'unknown label: {group}')
return None
if len(value) != msgs[label]['len']:
log.warning(f'bad value length: {group}')
return None
unit = msgs[label].get('unit')
value_type = msgs[label].get('value', None)
if unit:
try:
value = int(value)
except:
log.warning(f'bad numerical value: {group}')
return None
else:
value = re.sub('\s\s+', ' ', value).strip()
if not len(value):
value = None
msg['label'] = label
if value is not None:
msg['value'] = value
if date:
msg['date'] = date
if unit:
msg['unit'] = unit
return msg
def parse_date(self, str):
try:
tz = self.tz_info.get(str[0].upper())
dy = int(str[1:3]) + 2000
dm = int(str[3:5])
dd = int(str[5:7])
th = int(str[7:9])
tm = int(str[9:11])
ts = int(str[11:13])
date = datetime.datetime(year=dy, month=dm, day=dd, hour=th, minute=tm, second=ts, tzinfo=tz)
return date.isoformat()
except Exception as e:
print(e)
return None