12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- # import vedirect from https://github.com/karioja/vedirect
- # description of channels:https://beta.ivc.no/wiki/index.php/Victron_VE_Direct_DIY_Cable
- import serial
- class vedirect:
- def __init__(self, serialport, timeout):
- self.serialport = serialport
- self.ser = serial.Serial(serialport, 19200, timeout=timeout)
- self.header1 = '\r'
- self.header2 = '\n'
- self.hexmarker = ':'
- self.delimiter = '\t'
- self.key = ''
- self.value = ''
- self.bytes_sum = 0;
- self.state = self.WAIT_HEADER
- self.dict = {}
- (HEX, WAIT_HEADER, IN_KEY, IN_VALUE, IN_CHECKSUM) = range(5)
- def input(self, byte):
- if byte == self.hexmarker and self.state != self.IN_CHECKSUM:
- self.state = self.HEX
- if self.state == self.WAIT_HEADER:
- self.bytes_sum += ord(byte)
- if byte == self.header1:
- self.state = self.WAIT_HEADER
- elif byte == self.header2:
- self.state = self.IN_KEY
- return None
- elif self.state == self.IN_KEY:
- self.bytes_sum += ord(byte)
- if byte == self.delimiter:
- if (self.key == 'Checksum'):
- self.state = self.IN_CHECKSUM
- else:
- self.state = self.IN_VALUE
- else:
- self.key += byte
- return None
- elif self.state == self.IN_VALUE:
- self.bytes_sum += ord(byte)
- if byte == self.header1:
- self.state = self.WAIT_HEADER
- self.dict[self.key] = self.value;
- self.key = '';
- self.value = '';
- else:
- self.value += byte
- return None
- elif self.state == self.IN_CHECKSUM:
- self.bytes_sum += ord(byte)
- self.key = ''
- self.value = ''
- self.state = self.WAIT_HEADER
- if (self.bytes_sum % 256 == 0):
- self.bytes_sum = 0
- return self.dict
- else:
- print('Malformed packet')
- self.bytes_sum = 0
- elif self.state == self.HEX:
- self.bytes_sum = 0
- if byte == self.header2:
- self.state = self.WAIT_HEADER
- else:
- raise AssertionError()
- def read_data(self):
- while True:
- byte = self.ser.read(1)
- packet = self.input(byte)
- def read_data_single(self):
- while True:
- byte = self.ser.read(1)
- packet = self.input(byte)
- if (packet != None):
- return packet
- def read_data_callback(self, callbackFunction):
- while True:
- byte = self.ser.read(1)
- if byte:
- packet = self.input(byte)
- if (packet != None):
- callbackFunction(packet)
- else:
- break
- def print_data_callback(data):
- print(data)
- # end import vedirect
|