--- title: Custom Connector - SerialConnector tags: thingsboard-gateway image: --- # Custom Connector - SerialConnector * status: ==planning > on-going== > **Config** ``` Create configuration file for our serial connector ``` * custom_serial.json ```json= { "name": "Custom serial connector", "devices": [ { "name": "CustomSerialDevice1", "port": "/dev/ttyUSB0", "baudrate": 9600, "converter": "CustomSerialUplinkConverter", "telemetry": [ { "type": "byte", "key": "humidity", "untilDelimiter": "\r" } ], "attributes":[ { "key": "SerialNumber", "type": "string", "fromByte": 4, "toByte": -1 } ], "attributeUpdates": [ { "attributeOnThingsBoard": "attr1", "stringToDevice": "value = ${attr1}\n" } ] } ] } ``` > **Connector** ``` Connectors are Gateway components that connect to external system or directly to devices ``` * custom_serial_connector.py ```python= """Import libraries""" import serial import time from threading import Thread from random import choice from string import ascii_lowercase from thingsboard_gateway.connectors.connector import Connector, log # Import base class for connector and logger from thingsboard_gateway.tb_utility.tb_utility import TBUtility class CustomSerialConnector(Thread, Connector): # Define a connector class, it should inherit from "Connector" class. def __init__(self, gateway, config, connector_type): super().__init__() # Initialize parents classes self.statistics = {'MessagesReceived': 0, 'MessagesSent': 0} # Dictionary, will save information about count received and sent messages. self.__config = config # Save configuration from the configuration file. self.__gateway = gateway # Save gateway object, we will use some gateway methods for adding devices and saving data from them. self.__connector_type = connector_type # Saving type for connector, need for loading converter self.setName(self.__config.get("name", "Custom %s connector " % self.get_name() + ''.join(choice(ascii_lowercase) for _ in range(5)))) # get from the configuration or create name for logs. log.info("Starting Custom %s connector", self.get_name()) # Send message to logger self.daemon = True # Set self thread as daemon self.stopped = True # Service variable for check state self.connected = False # Service variable for check connection to device self.devices = {} # Dictionary with devices, will contain devices configurations, converters for devices and serial port objects self.load_converters() # Call function to load converters and save it into devices dictionary self.__connect_to_devices() # Call function for connect to devices log.info('Custom connector %s initialization success.', self.get_name()) # Message to logger log.info("Devices in configuration file found: %s ", '\n'.join(device for device in self.devices)) # Message to logger def __connect_to_devices(self): # Function for opening connection and connecting to devices for device in self.devices: try: # Start error handler connection_start = time.time() if self.devices[device].get("serial") is None \ or self.devices[device]["serial"] is None \ or not self.devices[device]["serial"].isOpen(): # Connect only if serial not available earlier or it is closed. self.devices[device]["serial"] = None while self.devices[device]["serial"] is None or not self.devices[device]["serial"].isOpen(): # Try connect '''connection to serial port with parameters from configuration file or default''' self.devices[device]["serial"] = serial.Serial( port=self.__config.get('port', '/dev/ttyUSB0'), baudrate=self.__config.get('baudrate', 9600), bytesize=self.__config.get('bytesize', serial.EIGHTBITS), parity=self.__config.get('parity', serial.PARITY_NONE), stopbits=self.__config.get('stopbits', serial.STOPBITS_ONE), timeout=self.__config.get('timeout', 1), xonxoff=self.__config.get('xonxoff', False), rtscts=self.__config.get('rtscts', False), write_timeout=self.__config.get('write_timeout', None), dsrdtr=self.__config.get('dsrdtr', False), inter_byte_timeout=self.__config.get('inter_byte_timeout', None), exclusive=self.__config.get('exclusive', None) ) time.sleep(.1) if time.time() - connection_start > 10: # Break connection try if it setting up for 10 seconds log.error("Connection refused per timeout for device %s", self.devices[device]["device_config"].get("name")) break except serial.serialutil.SerialException: log.error("Port %s for device %s - not found", self.__config.get('port', '/dev/ttyUSB0'), device) time.sleep(10) except Exception as e: log.exception(e) time.sleep(10) else: # if no exception handled - add device and change connection state self.__gateway.add_device(self.devices[device]["device_config"]["name"], {"connector": self}) self.connected = True def open(self): # Function called by gateway on start self.stopped = False self.start() def get_name(self): # Function used for logging, sending data and statistic return self.name def is_connected(self): # Function for checking connection state return self.connected def load_converters(self): # Function for search a converter and save it. devices_config = self.__config.get('devices') try: if devices_config is not None: for device_config in devices_config: if device_config.get('converter') is not None: converter = TBUtility.check_and_import(self.__connector_type, device_config['converter']) self.devices[device_config['name']] = {'converter': converter(device_config), 'device_config': device_config} else: log.error('Converter configuration for the custom connector %s -- not found, please check your configuration file.', self.get_name()) else: log.error('Section "devices" in the configuration not found. A custom connector %s has being stopped.', self.get_name()) self.close() except Exception as e: log.exception(e) def run(self): # Main loop of thread try: while True: for device in self.devices: serial = self.devices[device]["serial"] ch = b'' data_from_device = b'' while ch != b'\n': try: try: ch = serial.read(1) # Reading data from serial except AttributeError as e: if serial is None: self.__connect_to_devices() # if port not found - try to connect to it raise e data_from_device = data_from_device + ch except Exception as e: log.exception(e) break try: converted_data = self.devices[device]['converter'].convert(self.devices[device]['device_config'], data_from_device) self.__gateway.send_to_storage(self.get_name(), converted_data) time.sleep(.1) except Exception as e: log.exception(e) self.close() raise e if not self.connected: break except Exception as e: log.exception(e) def close(self): # Close connect function, usually used if exception handled in gateway main loop or in connector main loop self.stopped = True for device in self.devices: self.__gateway.del_device(self.devices[device]) if self.devices[device]['serial'].isOpen(): self.devices[device]['serial'].close() def on_attributes_update(self, content): # Function used for processing attribute update requests from ThingsBoard log.debug(content) if self.devices.get(content["device"]) is not None: # checking - is device in configuration? device_config = self.devices[content["device"]].get("device_config") if device_config is not None: log.debug(device_config) if device_config.get("attributeUpdates") is not None: requests = device_config["attributeUpdates"] # getting configuration for attribute requests for request in requests: attribute = request.get("attributeOnThingsBoard") log.debug(attribute) if attribute is not None and attribute in content["data"]: try: value = content["data"][attribute] # get value from content str_to_send = str(request["stringToDevice"].replace("${" + attribute + "}", str(value))).encode("UTF-8") # form a string to send to device self.devices[content["device"]]["serial"].write(str_to_send) # send string to device log.debug("Attribute update request to device %s : %s", content["device"], str_to_send) time.sleep(.01) except Exception as e: log.exception(e) def server_side_rpc_handler(self, content): pass ``` > **Converter** ``` Convert data from devices to the ThingsBoard format ``` * custom_serial_converter.py ```python= from thingsboard_gateway.connectors.converter import Converter, log # Import base class for the converter and log ("converter.log" in logs directory). class CustomSerialUplinkConverter(Converter): # Definition of class. def __init__(self, config): # Initialization method self.__config = config # Saving configuration to object variable self.result_dict = { 'deviceName': config.get('name', 'CustomSerialDevice'), 'deviceType': config.get('deviceType', 'default'), 'attributes': [], 'telemetry': [] } # template for a result dictionary. def convert(self, config, data: bytes): # Method for conversion data from device format to ThingsBoard format. keys = ['attributes', 'telemetry'] # Array used for looking data for data processing. for key in keys: # Data processing loop for parameters in keys array. self.result_dict[key] = [] # Clean old data. if self.__config.get(key) is not None: # Checking the parameter from the keys in the config. for config_object in self.__config.get(key): # The loop for checking whether there is data that interests us. data_to_convert = data # data for conversion. if config_object.get('untilDelimiter') is not None: # Checking some parameter from configuration file. data_to_convert = data.split(config_object.get('untilDelimiter').encode('UTF-8'))[0] # if "utilDelimiter" parameter in configuration file - get data from incoming data to delimiter position in received string. if config_object.get('fromDelimiter') is not None: # Checking some parameter from configuration file. data_to_convert = data.split(config_object.get('fromDelimiter').encode('UTF-8'))[1] # if "fromDelimiter" parameter in configuration file - get data from incoming data from delimiter position in received string. if config_object.get('toByte') is not None: # Checking some parameter from configuration file. to_byte = config_object.get('toByte') # # if "toByte" parameter in configuration file - get data from incoming data to byte number from a parameter "toByte" in configuration file. if to_byte == -1: # Checking some parameter from configuration file. to_byte = len(data) - 1 # If parameter == -1 - we will take data to the end. data_to_convert = data_to_convert[:to_byte] # saving data to variable for sending if config_object.get('fromByte') is not None: # Checking some parameter from configuration file from_byte = config_object.get('fromByte') # if "fromByte" parameter in configuration file - get data from incoming data from byte number from a parameter "fromByte" in configuration file. data_to_convert = data_to_convert[from_byte:] # saving data to variable for sending. converted_data = {config_object['key']: data_to_convert.decode('UTF-8')} # Adding data from temporary variable to result string. self.result_dict[key].append(converted_data) # Append result string to result dictionary. return self.result_dict # returning result dictionary after all iterations. ``` ## Reference * [Custom IoT Gateway Connector](https://thingsboard.io/docs/iot-gateway/custom/)