First Implementation
This commit is contained in:
parent
19337f4ff1
commit
493dad5e4d
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
__pycache__
|
210
tic.py
Normal file
210
tic.py
Normal file
@ -0,0 +1,210 @@
|
||||
import logging
|
||||
import time
|
||||
|
||||
import tic_device
|
||||
import tic_standard
|
||||
import tic_hist
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
TIC_STX = b'\x02'
|
||||
TIC_ETX = b'\x03'
|
||||
TIC_SGR = b'\x0a'
|
||||
TIC_EGR = b'\x0d'
|
||||
|
||||
TIC_MODE_STD = "standard"
|
||||
TIC_MODE_HIS = "historique"
|
||||
|
||||
|
||||
type_str = {
|
||||
'61': "Compteur monophasé 60 A généralisation Linky G3 - arrivée puissance haute",
|
||||
'62': "Compteur monophasé 90 A généralisation Linky G1 - arrivée puissance basse",
|
||||
'63': "Compteur triphasé 60 A généralisation Linky G1 - arrivée puissance basse",
|
||||
'64': "Compteur monophasé 60 A généralisation Linky G3 - arrivée puissance basse",
|
||||
'70': "Compteur monophasé Linky 60 A mise au point G3",
|
||||
'71': "Compteur triphasé Linky 60 A mise au point G3",
|
||||
'75': "Compteur monophasé 90 A généralisation Linky G3 - arrivée puissance basse",
|
||||
'76': "Compteur triphasé 60 A généralisation Linky G3 - arrivée puissance basse",
|
||||
}
|
||||
|
||||
manufacturer_str = {
|
||||
"01": "CROUZET / MONETEL",
|
||||
"02": "SAGEM / SAGEMCOM",
|
||||
"03": "SCHLUMBERGER / ACTARIS / ITRON",
|
||||
"04": "LANDIS ET GYR / SIEMENS METERING / LANDIS+GYR",
|
||||
"05": "SAUTER / STEPPER ENERGIE France / ZELLWEGER",
|
||||
"06": "ITRON",
|
||||
"07": "MAEC",
|
||||
"08": "MATRA-CHAUVIN ARNOUX / ENERDIS",
|
||||
"09": "FAURE-HERMAN",
|
||||
"10": "SEVME / SIS",
|
||||
"11": "MAGNOL / ELSTER / HONEYWELL",
|
||||
"12": "GAZ THERMIQUE",
|
||||
"14": "GHIELMETTI / DIALOG E.S. / MICRONIQUE",
|
||||
"15": "MECELEC",
|
||||
"16": "LEGRAND / BACO",
|
||||
"17": "SERD-SCHLUMBERGER",
|
||||
"18": "SCHNEIDER / MERLIN GERIN / GARDY",
|
||||
"19": "GENERAL ELECTRIC / POWER CONTROL",
|
||||
"20": "NUOVO PIGNONE / DRESSER",
|
||||
"21": "SCLE",
|
||||
"22": "EDF",
|
||||
"23": "GDF / GDF-SUEZ",
|
||||
"24": "HAGER – GENERAL ELECTRIC",
|
||||
"25": "DELTA-DORE",
|
||||
"26": "RIZ",
|
||||
"27": "ISKRAEMECO",
|
||||
"28": "GMT",
|
||||
"29": "ANALOG DEVICE",
|
||||
"30": "MICHAUD",
|
||||
"31": "HEXING ELECTRICAL CO. Ltd",
|
||||
"32": "SIAME",
|
||||
"33": "LARSEN & TOUBRO Limited",
|
||||
"34": "ELSTER / HONEYWELL",
|
||||
"35": "ELECTRONIC AFZAR AZMA",
|
||||
"36": "ADVANCED ELECTRONIC COMPAGNY Ldt",
|
||||
"37": "AEM",
|
||||
"38": "ZHEJIANG CHINT INSTRUMENT & METER CO. Ldt",
|
||||
"39": "ZIV",
|
||||
"70": "LANDIS et GYR (export ou régie)",
|
||||
"71": "STEPPER ENERGIE France (export ou régie)",
|
||||
"81": "SAGEM / SAGEMCOM",
|
||||
"82": "LANDIS ET GYR / SIEMENS METERING / LANDIS+GYR",
|
||||
"83": "ELSTER / HONEYWELL",
|
||||
"84": "SAGEM / SAGEMCOM",
|
||||
"85": "ITRON",
|
||||
}
|
||||
|
||||
|
||||
class Tic:
|
||||
def __init__(self, input):
|
||||
self.input = input
|
||||
self.device = tic_device.TicDevice(input)
|
||||
self.ads = None
|
||||
self.device_manufacturer = None
|
||||
self.device_year = None
|
||||
self.device_model = None
|
||||
self.device_serial = None
|
||||
self.mode = None
|
||||
self._tic = None
|
||||
self._cb_ads = None
|
||||
self._cb_msg = None
|
||||
|
||||
def set_mode(self, mode):
|
||||
if mode == TIC_MODE_STD:
|
||||
self.mode = TIC_MODE_STD
|
||||
self._tic = tic_standard.TicStandard()
|
||||
self.device.set_baudrate(9600)
|
||||
log.info(f"set_mode: {self._tic.name()}")
|
||||
elif mode == TIC_MODE_HIS:
|
||||
self.mode = TIC_MODE_HIS
|
||||
self._tic = tic_hist.TicHist()
|
||||
self.device.set_baudrate(1200)
|
||||
log.info(f"set_mode: {self._tic.name()}")
|
||||
else:
|
||||
self.mode = None
|
||||
self._tic = None
|
||||
log.info(f"set_mode: unsupported")
|
||||
|
||||
def detect_mode(self):
|
||||
for mode in ( TIC_MODE_STD, TIC_MODE_HIS ):
|
||||
self.set_mode(mode)
|
||||
f = self.get_frame()
|
||||
if not f:
|
||||
continue
|
||||
groups = self.split_frame(f)
|
||||
for group in groups:
|
||||
if not self.check_sum(group):
|
||||
continue
|
||||
self.decode_group(group)
|
||||
if self.ads:
|
||||
log.info(f"detected {self._tic.name()}")
|
||||
return mode
|
||||
log.error("no valid frame detected")
|
||||
return None
|
||||
|
||||
def get_frame(self):
|
||||
frame = b''
|
||||
while True:
|
||||
c = self.device.get_char()
|
||||
if not c:
|
||||
return None
|
||||
if c == TIC_STX:
|
||||
break
|
||||
while True:
|
||||
c = self.device.get_char()
|
||||
if not c:
|
||||
return None
|
||||
if c == TIC_ETX:
|
||||
break
|
||||
frame = frame + c
|
||||
return frame
|
||||
|
||||
def split_frame(self, frame):
|
||||
groups = []
|
||||
start = 0
|
||||
while start <= len(frame):
|
||||
start = frame.find(TIC_SGR, start)
|
||||
if start < 0:
|
||||
return groups
|
||||
end = frame.find(TIC_EGR, start)
|
||||
if end < 0:
|
||||
return groups
|
||||
groups.append(frame[start+1:end])
|
||||
start = end + 1
|
||||
return groups
|
||||
|
||||
def check_sum(self, group):
|
||||
return self._tic.check_sum(group)
|
||||
|
||||
def decode_group(self, group):
|
||||
msg = self._tic.decode_group(group)
|
||||
log.debug(f"msg: {msg}")
|
||||
if msg and msg.get('label') in ( 'ADSC', 'ADCO' ):
|
||||
self.parse_ads(msg['value'])
|
||||
return msg
|
||||
|
||||
def parse_ads(self, ads):
|
||||
if self.ads != ads:
|
||||
self.ads = ads
|
||||
self.device_manufacturer = ma = manufacturer_str.get(ads[0:2], "unknown")
|
||||
try:
|
||||
self.device_year = 2000 + int(ads[2:4])
|
||||
except ValueError:
|
||||
self.device_year = None
|
||||
self.device_model = type_str.get(ads[4:6], "unknown")
|
||||
self.device_serial = ads[6:]
|
||||
log.info(f"device: {self.ads}")
|
||||
log.info(f"manufacturer: {self.device_manufacturer}")
|
||||
log.info(f"year: {self.device_year}")
|
||||
log.info(f"model: {self.device_model}")
|
||||
log.info(f"serial number: {self.device_serial}")
|
||||
if self._cb_ads:
|
||||
self._cb_ads(self.ads, self.device_manufacturer, self.device_year, self.device_model, self.device_serial)
|
||||
|
||||
def set_callback_ads(self, function):
|
||||
self._cb_ads = function
|
||||
if self.ads:
|
||||
ads = self.ads
|
||||
self.ads = None
|
||||
self.parse_ads(self.ads)
|
||||
|
||||
def set_callback_msg(self, function):
|
||||
self._cb_msg = function
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
frame = self.get_frame()
|
||||
if not frame:
|
||||
return
|
||||
groups = self.split_frame(frame)
|
||||
for group in groups:
|
||||
if not self.check_sum(group):
|
||||
continue
|
||||
msg = self.decode_group(group)
|
||||
log.info(msg)
|
||||
if msg and self._cb_msg:
|
||||
self._cb_msg(msg)
|
||||
#if not self.device.is_serial():
|
||||
# time.sleep(1)
|
||||
|
48
tic2mqtt
Executable file
48
tic2mqtt
Executable file
@ -0,0 +1,48 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
|
||||
import tic
|
||||
import tic_mqtt
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(prog="tic2mqtt",
|
||||
description="MQTT client for Linky Electricity Meters installed by Enedis in France.")
|
||||
#parser.add_argument('input', nargs='?', type=argparse.FileType('rb', bufsize=0), default='-',
|
||||
# help="input file or device to read TIC messages (stdin by default)")
|
||||
parser.add_argument('input', nargs='?', default='/dev/stdin',
|
||||
help="input file or device to read TIC messages (stdin by default)")
|
||||
parser.add_argument('-m', '--mode', choices=['auto', 'historique', 'standard'], default='auto',
|
||||
help='select TIC mode')
|
||||
parser.add_argument('-l', '--log', choices=['debug', 'info', 'warning', 'error'], default='error',
|
||||
help='set the log level')
|
||||
parser.add_argument('-b', '--broker',
|
||||
help='MQTT broker hostname')
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(level=getattr(logging, args.log.upper()))
|
||||
|
||||
my_tic = tic.Tic(args.input)
|
||||
if args.mode == "auto":
|
||||
my_tic.detect_mode()
|
||||
else:
|
||||
my_tic.set_mode(args.mode)
|
||||
|
||||
mqtt = tic_mqtt.TicPublisher(args.broker)
|
||||
my_tic.set_callback_ads(mqtt.set_ads)
|
||||
my_tic.set_callback_msg(mqtt.send_msg)
|
||||
|
||||
my_tic.run()
|
||||
|
||||
if mqtt:
|
||||
mqtt.close()
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
41
tic_device.py
Normal file
41
tic_device.py
Normal file
@ -0,0 +1,41 @@
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TicDevice:
|
||||
def __init__(self, name):
|
||||
self._is_serial = False
|
||||
self.device = None
|
||||
try:
|
||||
self.device = serial.Serial(name,
|
||||
bytesize=serial.SEVENBITS,
|
||||
parity=serial.PARITY_EVEN,
|
||||
stopbits=serial.STOPBITS_ONE,
|
||||
timeout=2.0)
|
||||
self.device.baudrate = 9600
|
||||
self._is_serial = True
|
||||
except:
|
||||
if self.device:
|
||||
self.device.close()
|
||||
self.device = None
|
||||
if not self.device:
|
||||
self.device = open(name, 'rb')
|
||||
self._is_serial = False
|
||||
|
||||
def set_baudrate(self, rate):
|
||||
if self._is_serial:
|
||||
self.device.baudrate = rate
|
||||
|
||||
def get_char(self):
|
||||
c = self.device.read(1)
|
||||
if not c:
|
||||
if self._is_serial:
|
||||
log.debug("RX Timeout")
|
||||
else:
|
||||
log.debug("EOF.")
|
||||
return c
|
||||
|
||||
def is_serial(self):
|
||||
return self._is_serial
|
||||
|
92
tic_hist.py
Normal file
92
tic_hist.py
Normal file
@ -0,0 +1,92 @@
|
||||
import re
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
TIC_SEP_HIS = b'\x20'
|
||||
|
||||
msgs = {
|
||||
'ADCO': { 'len': 12, },
|
||||
'OPTARIF': { 'len': 4 },
|
||||
'ISOUSC': { 'len': 2, 'unit': 'A' },
|
||||
'BASE': { 'len': 9, 'unit': 'Wh' },
|
||||
'HCHC': { 'len': 9, 'unit': 'Wh' },
|
||||
'HCHP': { 'len': 9, 'unit': 'Wh' },
|
||||
'EJPHN': { 'len': 9, 'unit': 'Wh' },
|
||||
'EJPHPM': { 'len': 9, 'unit': 'Wh' },
|
||||
'BBRHCJB': { 'len': 9, 'unit': 'Wh' },
|
||||
'BBRHPJB': { 'len': 9, 'unit': 'Wh' },
|
||||
'BBRHCJW': { 'len': 9, 'unit': 'Wh' },
|
||||
'BBRHPJW': { 'len': 9, 'unit': 'Wh' },
|
||||
'BBRHCJR': { 'len': 9, 'unit': 'Wh' },
|
||||
'BBRHPJR': { 'len': 9, 'unit': 'Wh' },
|
||||
'PEJP': { 'len': 9, 'unit': 'min' },
|
||||
'PTEC': { 'len': 4 },
|
||||
'DEMAIN': { 'len': 9 },
|
||||
'IINST': { 'len': 3, 'unit': 'A' },
|
||||
'IINST1': { 'len': 3, 'unit': 'A' },
|
||||
'IINST2': { 'len': 3, 'unit': 'A' },
|
||||
'IINST3': { 'len': 3, 'unit': 'A' },
|
||||
'ADPS': { 'len': 3, 'unit': 'A' },
|
||||
'IMAX': { 'len': 3, 'unit': 'A' },
|
||||
'IMAX1': { 'len': 3, 'unit': 'A' },
|
||||
'IMAX2': { 'len': 3, 'unit': 'A' },
|
||||
'IMAX3': { 'len': 3, 'unit': 'A' },
|
||||
'PMAX': { 'len': 5, 'unit': 'W' },
|
||||
'PAPP': { 'len': 5, 'unit': 'VA' },
|
||||
'HHPHC': { 'len': 1 },
|
||||
'MOTDETAT': { 'len': 6 },
|
||||
'PPOT': { 'len': 2 },
|
||||
'ADIR1': { 'len': 3, 'unit': 'A' },
|
||||
'ADIR2': { 'len': 3, 'unit': 'A' },
|
||||
'ADIR3': { 'len': 3, 'unit': 'A' },
|
||||
}
|
||||
|
||||
class TicHist:
|
||||
def __init__(self):
|
||||
self.sep = TIC_SEP_HIS.decode('ascii')
|
||||
pass
|
||||
|
||||
def name(self):
|
||||
return "mode historique"
|
||||
|
||||
def check_sum(self, group):
|
||||
checksum = group[-1]
|
||||
sum = 0
|
||||
for c in group[:-2]:
|
||||
sum = sum + c
|
||||
sum = (sum & 0x3f) + 0x20
|
||||
return sum == checksum
|
||||
|
||||
def decode_group(self, group):
|
||||
group = group[:-2].decode('ascii')
|
||||
fields = group.split(self.sep)
|
||||
msg = {}
|
||||
if len(fields) < 2 or len(fields) > 4:
|
||||
log.warning(f'corrupted group: {group}')
|
||||
return None
|
||||
label = fields[0]
|
||||
value = fields[-1]
|
||||
|
||||
if not label in msgs:
|
||||
log.warning(f'unknown label: {group}')
|
||||
return None
|
||||
if len(value) != msgs[label]['len']:
|
||||
log.warning(f'bad value length: {group}')
|
||||
return None
|
||||
unit = msgs[label].get('unit')
|
||||
if unit:
|
||||
try:
|
||||
value = int(value)
|
||||
except:
|
||||
log.warning(f'bad numerical value: {group}')
|
||||
return None
|
||||
else:
|
||||
value = re.sub('\s\s+', ' ', value).strip()
|
||||
|
||||
msg['label'] = label
|
||||
msg['value'] = value
|
||||
if unit:
|
||||
msg['unit'] = unit
|
||||
return msg
|
||||
|
131
tic_mqtt.py
Normal file
131
tic_mqtt.py
Normal file
@ -0,0 +1,131 @@
|
||||
import paho.mqtt.client as mqtt
|
||||
import time
|
||||
import json
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
state_class_from_unit = {
|
||||
'Wh': 'total_increasing',
|
||||
'VArh': 'power',
|
||||
'A': 'measurement',
|
||||
'V': 'measurement',
|
||||
'kVA': 'measurement',
|
||||
'VA': 'measurement',
|
||||
'W': 'measurement',
|
||||
}
|
||||
|
||||
device_class_from_unit = {
|
||||
'Wh': 'energy',
|
||||
'VArh': 'power',
|
||||
'A': 'current',
|
||||
'V': 'voltage',
|
||||
'kVA': 'apparent_power',
|
||||
'VA': 'apparent_power',
|
||||
'W': 'power',
|
||||
}
|
||||
|
||||
class TicPublisher:
|
||||
def __init__(self, hostname=None, port=1883):
|
||||
self.client = None
|
||||
if hostname:
|
||||
self.client = mqtt.Client()
|
||||
self.client.connect(hostname, port)
|
||||
#self.client.loop_start()
|
||||
self._ads = None
|
||||
self._manufacturer = None
|
||||
self._year = None
|
||||
self._model = None
|
||||
self._serial = None
|
||||
self._last_ts = {}
|
||||
self._ha_config = {}
|
||||
|
||||
def close(self):
|
||||
if self.client:
|
||||
#self.client.loop_stop()
|
||||
self.client = None
|
||||
|
||||
def set_ads(self, ads, manufacturer, year, model, serial):
|
||||
log.debug(f"ads: {ads} manufacturer: {manufacturer} year: {year} model: {model} serial: {serial}")
|
||||
self._ads = ads
|
||||
self._manufacturer = manufacturer
|
||||
self._year = year
|
||||
self._model = model
|
||||
self._serial = serial
|
||||
|
||||
def get_topic(self, label):
|
||||
label = label.replace('+', 'PLUS').replace('-', 'MOINS')
|
||||
topic = f"tic/{self._ads}/{label}"
|
||||
return topic
|
||||
|
||||
def send_ha_config(self, msg, topic):
|
||||
if not self._ads:
|
||||
return
|
||||
label = msg.get('label')
|
||||
unit = msg.get('unit')
|
||||
device_class = device_class_from_unit.get(unit) if unit else None
|
||||
state_class = state_class_from_unit.get(unit) if unit else None
|
||||
unique_id = f"tic_{self._ads}_{label.replace('+','PLUS').replace('-','MOINS')}"
|
||||
node_id = f"tic"
|
||||
object_id = unique_id
|
||||
|
||||
config_topic = f"homeassistant/sensor/{node_id}/{object_id}/config"
|
||||
config = {
|
||||
'unique_id': unique_id,
|
||||
'name': label,
|
||||
'state_topic': topic,
|
||||
'value_template': "{% if 'value' in value_json %}{{ value_json.value }}{% endif %}",
|
||||
'device_class': device_class,
|
||||
'unit_of_measurement': unit,
|
||||
'device': {
|
||||
'identifiers': [ f'tic_{self._ads}' ],
|
||||
'manufacturer': self._manufacturer,
|
||||
'model': self._model,
|
||||
'name': 'Tic Enedis',
|
||||
},
|
||||
'origin': {
|
||||
'name': 'tic2mqtt',
|
||||
'sw_version': "0.1",
|
||||
'support_url': "https://git.grandou.net/gilles/tic2mqtt",
|
||||
}
|
||||
}
|
||||
if unit:
|
||||
config['unit_of_measurement'] = unit
|
||||
if device_class:
|
||||
config['device_class'] = device_class
|
||||
if state_class:
|
||||
config['state_class'] = state_class
|
||||
|
||||
log.debug(f"HA config topic: {config_topic}")
|
||||
log.debug(f"HA config: {config}")
|
||||
result = self.client.publish(config_topic, json.dumps(config), retain=True)
|
||||
|
||||
def send_msg(self, msg):
|
||||
log.debug(f"msg: {msg}")
|
||||
if not self.client:
|
||||
return
|
||||
if not self._ads:
|
||||
return
|
||||
if msg.get('label') is None:
|
||||
return
|
||||
if msg.get('value') is None:
|
||||
return
|
||||
ts = time.time()
|
||||
label = msg.get('label')
|
||||
value = msg.get('value')
|
||||
unit = msg.get('unit')
|
||||
if self._last_ts.get(label):
|
||||
if ts < self._last_ts[label]['ts'] + 60 and value == self._last_ts[label]['value']:
|
||||
log.debug(f"skip msg {label} {value}")
|
||||
return
|
||||
self._last_ts[label] = { 'value': value, 'ts': ts }
|
||||
topic = self.get_topic(label)
|
||||
if not self._ha_config.get('label'):
|
||||
self.send_ha_config(msg, topic)
|
||||
_msg = { 'value': msg['value'] }
|
||||
if msg.get('date'):
|
||||
_msg['date'] = msg['date']
|
||||
log.debug(f"topic: {topic} msg: {_msg}")
|
||||
result = self.client.publish(topic, json.dumps(_msg))
|
||||
|
||||
|
162
tic_standard.py
Normal file
162
tic_standard.py
Normal file
@ -0,0 +1,162 @@
|
||||
import logging
|
||||
import datetime
|
||||
import time
|
||||
import re
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
TIC_SEP_STD = b'\x09'
|
||||
|
||||
msgs = {
|
||||
'ADSC': { 'len': 12 },
|
||||
'VTIC': { 'len': 2 },
|
||||
'DATE': { 'len': 0 },
|
||||
'NGTF': { 'len': 16 },
|
||||
'LTARF': { 'len': 16 },
|
||||
'EAST': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASF01': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASF02': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASF03': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASF04': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASF05': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASF06': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASF07': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASF08': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASF09': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASF10': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASD01': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASD02': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASD03': { 'len': 9, 'unit': 'Wh' },
|
||||
'EASD04': { 'len': 9, 'unit': 'Wh' },
|
||||
'EAIT': { 'len': 9, 'unit': 'Wh' },
|
||||
'ERQ1': { 'len': 9, 'unit': 'VArh' },
|
||||
'ERQ2': { 'len': 9, 'unit': 'VArh' },
|
||||
'ERQ3': { 'len': 9, 'unit': 'VArh' },
|
||||
'ERQ4': { 'len': 9, 'unit': 'VArh' },
|
||||
'IRMS1': { 'len': 3, 'unit': 'A' },
|
||||
'IRMS2': { 'len': 3, 'unit': 'A' },
|
||||
'IRMS3': { 'len': 3, 'unit': 'A' },
|
||||
'URMS1': { 'len': 3, 'unit': 'V' },
|
||||
'URMS2': { 'len': 3, 'unit': 'V' },
|
||||
'URMS3': { 'len': 3, 'unit': 'V' },
|
||||
'PREF': { 'len': 2, 'unit': 'kVA' },
|
||||
'PCOUP': { 'len': 2, 'unit': 'kVA' },
|
||||
'SINSTS': { 'len': 5, 'unit': 'VA' },
|
||||
'SINST1': { 'len': 5, 'unit': 'VA' },
|
||||
'SINST2': { 'len': 5, 'unit': 'VA' },
|
||||
'SINST3': { 'len': 5, 'unit': 'VA' },
|
||||
'SMAXSN': { 'len': 5, 'unit': 'VA' },
|
||||
'SMAXSN1': { 'len': 5, 'unit': 'VA' },
|
||||
'SMAXSN2': { 'len': 5, 'unit': 'VA' },
|
||||
'SMAXSN3': { 'len': 5, 'unit': 'VA' },
|
||||
'SMAXSN-1': { 'len': 5, 'unit': 'VA' },
|
||||
'SMAXSN1-1': { 'len': 5, 'unit': 'VA' },
|
||||
'SMAXSN2-1': { 'len': 5, 'unit': 'VA' },
|
||||
'SMAXSN3-1': { 'len': 5, 'unit': 'VA' },
|
||||
'SINSTI': { 'len': 5, 'unit': 'VA' },
|
||||
'SMAXIN': { 'len': 5, 'unit': 'VA' },
|
||||
'SMAXIN-1': { 'len': 5, 'unit': 'VA' },
|
||||
'CCASN': { 'len': 5, 'unit': 'W' },
|
||||
'CCASN-1': { 'len': 5, 'unit': 'W' },
|
||||
'CCAIN': { 'len': 5, 'unit': 'W' },
|
||||
'CCAIN-1': { 'len': 5, 'unit': 'W' },
|
||||
'UMOY1': { 'len': 3, 'unit': 'V' },
|
||||
'UMOY2': { 'len': 3, 'unit': 'V' },
|
||||
'UMOY3': { 'len': 3, 'unit': 'V' },
|
||||
'STGE': { 'len': 8 },
|
||||
'DPM1': { 'len': 2 },
|
||||
'FPM1': { 'len': 2 },
|
||||
'DPM2': { 'len': 2 },
|
||||
'FPM2': { 'len': 2 },
|
||||
'DPM3': { 'len': 2 },
|
||||
'FPM3': { 'len': 2 },
|
||||
'MSG1': { 'len': 32 },
|
||||
'MSG2': { 'len': 16 },
|
||||
'PRM': { 'len': 14 },
|
||||
'RELAIS': { 'len': 3 },
|
||||
'NTARF': { 'len': 2 },
|
||||
'NJOURF': { 'len': 2 },
|
||||
'NJOURF+1': { 'len': 2 },
|
||||
'PJOURF+1': { 'len': 98 },
|
||||
'PPOINTE': { 'len': 98 },
|
||||
}
|
||||
|
||||
|
||||
class TicStandard:
|
||||
def __init__(self):
|
||||
self.sep = TIC_SEP_STD.decode('ascii')
|
||||
tz_winter = datetime.timezone(datetime.timedelta(seconds=-time.timezone), name=time.tzname[0])
|
||||
tz_summer = datetime.timezone(datetime.timedelta(seconds=-time.altzone), name=time.tzname[1])
|
||||
self.tz_info = {
|
||||
'H': tz_winter,
|
||||
'E': tz_summer,
|
||||
}
|
||||
pass
|
||||
|
||||
def name(self):
|
||||
return "mode standard"
|
||||
|
||||
def check_sum(self, group):
|
||||
checksum = group[-1]
|
||||
sum = 0
|
||||
for c in group[:-1]:
|
||||
sum = sum + c
|
||||
sum = (sum & 0x3f) + 0x20
|
||||
return sum == checksum
|
||||
|
||||
def decode_group(self, group):
|
||||
group = group[:-2].decode('ascii')
|
||||
fields = group.split(self.sep)
|
||||
msg = {}
|
||||
if len(fields) < 2 or len(fields) > 4:
|
||||
log.warning(f'corrupted group: {group}')
|
||||
return None
|
||||
label = fields[0]
|
||||
value = fields[-1]
|
||||
date = None
|
||||
if len(fields) == 3:
|
||||
date = self.parse_date(fields[1])
|
||||
|
||||
if not label in msgs:
|
||||
log.warning(f'unknown label: {group}')
|
||||
return None
|
||||
if len(value) != msgs[label]['len']:
|
||||
log.warning(f'bad value length: {group}')
|
||||
return None
|
||||
unit = msgs[label].get('unit')
|
||||
value_type = msgs[label].get('value', None)
|
||||
if unit:
|
||||
try:
|
||||
value = int(value)
|
||||
except:
|
||||
log.warning(f'bad numerical value: {group}')
|
||||
return None
|
||||
else:
|
||||
value = re.sub('\s\s+', ' ', value).strip()
|
||||
if not len(value):
|
||||
value = None
|
||||
|
||||
msg['label'] = label
|
||||
if value is not None:
|
||||
msg['value'] = value
|
||||
if date:
|
||||
msg['date'] = date
|
||||
if unit:
|
||||
msg['unit'] = unit
|
||||
return msg
|
||||
|
||||
def parse_date(self, str):
|
||||
try:
|
||||
tz = self.tz_info.get(str[0].upper())
|
||||
dy = int(str[1:3]) + 2000
|
||||
dm = int(str[3:5])
|
||||
dd = int(str[5:7])
|
||||
th = int(str[7:9])
|
||||
tm = int(str[9:11])
|
||||
ts = int(str[11:13])
|
||||
date = datetime.datetime(year=dy, month=dm, day=dd, hour=th, minute=tm, second=ts, tzinfo=tz)
|
||||
return date.isoformat()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return None
|
||||
|
Loading…
x
Reference in New Issue
Block a user