diff --git a/tic2mqtt b/tic2mqtt new file mode 100755 index 0000000..ec3851d --- /dev/null +++ b/tic2mqtt @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 + +import sys +import argparse +import time +import serial +from pprint import pprint + +TIC_STX = b'\x02' +TIC_ETX = b'\x03' +TIC_SGR = b'\x0a' +TIC_SEP_STD = b'\x09' +TIC_SEP_HIS = b'\x20' +TIC_EGR = b'\x0d' + + +class TicDevice: + def __init__(self, name): + self._is_serial = False + try: + self.device = serial.Serial(name, + baudrate=9600, + bytesize=serial.SEVENBITS, + parity=serial.PARITY_EVEN, + stopbits=serial.STOPBITS_ONE, + timeout=2.0) + self._is_serial = True + except serial.serialutil.SerialException: + self.device = open(name, 'rb') + + def set_baudrate(self, rate): + if self._is_serial: + self.device.baudrate = rate + + def get_char(self): + return self.device.read(1) + +class Tic: + def __init__(self, input): + self.input = input + self.device = TicDevice(input) + self.mode = None + + def detect_mode(self): + print("tring speed 9600bps") + self.device.set_baudrate(9600) + if self._detect_mode(): + return + print("tring speed 1200bps") + self.device.set_baudrate(1200) + if self._detect_mode(): + return + return None + + def _detect_mode(self): + ok_mask = 0x00 + tic_map = { TIC_STX: 0x01, + TIC_ETX: 0x02, + TIC_SGR: 0x04, + TIC_EGR: 0x08, + TIC_SEP_STD: 0x10 } + + nb_reset = 0 + while c := self.device.get_char(): + ok = tic_map.get(c, 0) + if ok_mask or ok == 0x01: + ok_mask = ok_mask | ok + if ok_mask == 0x1f: + print(f"Standard mode detected") + return 'std' + if ok_mask == 0x0f: + print(f"Historiqual mode detected") + return 'hist' + if not ok and (c < b' ' or c > b'\x7f'): + #print(f"bad char: {c}") + ok_mask = 0x00 + nb_reset = nb_reset + 1 + if nb_reset >= 20: + break + print("Timeout/EOF") + return None + +def main(): + parser = argparse.ArgumentParser(prog="tic2mqtt", + description="MQTT client for Linky Electricity Meters installed by Enedis in France.") + #parser.add_argument('input', nargs='?', type=argparse.FileType('rb', bufsize=0), default='-', + # help="input file or device to read TIC messages (stdin by default)") + parser.add_argument('input', nargs='?', default='/dev/stdin', + help="input file or device to read TIC messages (stdin by default)") + parser.add_argument('-m', '--mode', choices=['auto', 'historique', 'standard'], default='auto', + help='select TIC mode') + args = parser.parse_args() + + tic = Tic(args.input) + tic.detect_mode() + + + +if __name__ == '__main__': + main() +