This commit is contained in:
Gilles Grandou 2024-05-19 18:23:53 +02:00
parent 19337f4ff1
commit d7e2587673

101
tic2mqtt Executable file
View File

@ -0,0 +1,101 @@
#!/usr/bin/env python3
import sys
import argparse
import time
import serial
from pprint import pprint
TIC_STX = b'\x02'
TIC_ETX = b'\x03'
TIC_SGR = b'\x0a'
TIC_SEP_STD = b'\x09'
TIC_SEP_HIS = b'\x20'
TIC_EGR = b'\x0d'
class TicDevice:
def __init__(self, name):
self._is_serial = False
try:
self.device = serial.Serial(name,
baudrate=9600,
bytesize=serial.SEVENBITS,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
timeout=2.0)
self._is_serial = True
except serial.serialutil.SerialException:
self.device = open(name, 'rb')
def set_baudrate(self, rate):
if self._is_serial:
self.device.baudrate = rate
def get_char(self):
return self.device.read(1)
class Tic:
def __init__(self, input):
self.input = input
self.device = TicDevice(input)
self.mode = None
def detect_mode(self):
print("tring speed 9600bps")
self.device.set_baudrate(9600)
if self._detect_mode():
return
print("tring speed 1200bps")
self.device.set_baudrate(1200)
if self._detect_mode():
return
return None
def _detect_mode(self):
ok_mask = 0x00
tic_map = { TIC_STX: 0x01,
TIC_ETX: 0x02,
TIC_SGR: 0x04,
TIC_EGR: 0x08,
TIC_SEP_STD: 0x10 }
nb_reset = 0
while c := self.device.get_char():
ok = tic_map.get(c, 0)
if ok_mask or ok == 0x01:
ok_mask = ok_mask | ok
if ok_mask == 0x1f:
print(f"Standard mode detected")
return 'std'
if ok_mask == 0x0f:
print(f"Historiqual mode detected")
return 'hist'
if not ok and (c < b' ' or c > b'\x7f'):
#print(f"bad char: {c}")
ok_mask = 0x00
nb_reset = nb_reset + 1
if nb_reset >= 20:
break
print("Timeout/EOF")
return None
def main():
parser = argparse.ArgumentParser(prog="tic2mqtt",
description="MQTT client for Linky Electricity Meters installed by Enedis in France.")
#parser.add_argument('input', nargs='?', type=argparse.FileType('rb', bufsize=0), default='-',
# help="input file or device to read TIC messages (stdin by default)")
parser.add_argument('input', nargs='?', default='/dev/stdin',
help="input file or device to read TIC messages (stdin by default)")
parser.add_argument('-m', '--mode', choices=['auto', 'historique', 'standard'], default='auto',
help='select TIC mode')
args = parser.parse_args()
tic = Tic(args.input)
tic.detect_mode()
if __name__ == '__main__':
main()