---
title: Custom Connector - SerialConnector
tags: thingsboard-gateway
image:
---
# Custom Connector - SerialConnector
* status: ==planning > on-going==
> **Config**
```
Create configuration file for our serial connector
```
* custom_serial.json
```json=
{
"name": "Custom serial connector",
"devices": [
{
"name": "CustomSerialDevice1",
"port": "/dev/ttyUSB0",
"baudrate": 9600,
"converter": "CustomSerialUplinkConverter",
"telemetry": [
{
"type": "byte",
"key": "humidity",
"untilDelimiter": "\r"
}
],
"attributes":[
{
"key": "SerialNumber",
"type": "string",
"fromByte": 4,
"toByte": -1
}
],
"attributeUpdates": [
{
"attributeOnThingsBoard": "attr1",
"stringToDevice": "value = ${attr1}\n"
}
]
}
]
}
```
> **Connector**
```
Connectors are Gateway components that connect to external system or directly to devices
```
* custom_serial_connector.py
```python=
"""Import libraries"""
import serial
import time
from threading import Thread
from random import choice
from string import ascii_lowercase
from thingsboard_gateway.connectors.connector import Connector, log # Import base class for connector and logger
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
class CustomSerialConnector(Thread, Connector): # Define a connector class, it should inherit from "Connector" class.
def __init__(self, gateway, config, connector_type):
super().__init__() # Initialize parents classes
self.statistics = {'MessagesReceived': 0,
'MessagesSent': 0} # Dictionary, will save information about count received and sent messages.
self.__config = config # Save configuration from the configuration file.
self.__gateway = gateway # Save gateway object, we will use some gateway methods for adding devices and saving data from them.
self.__connector_type = connector_type # Saving type for connector, need for loading converter
self.setName(self.__config.get("name",
"Custom %s connector " % self.get_name() + ''.join(choice(ascii_lowercase) for _ in range(5)))) # get from the configuration or create name for logs.
log.info("Starting Custom %s connector", self.get_name()) # Send message to logger
self.daemon = True # Set self thread as daemon
self.stopped = True # Service variable for check state
self.connected = False # Service variable for check connection to device
self.devices = {} # Dictionary with devices, will contain devices configurations, converters for devices and serial port objects
self.load_converters() # Call function to load converters and save it into devices dictionary
self.__connect_to_devices() # Call function for connect to devices
log.info('Custom connector %s initialization success.', self.get_name()) # Message to logger
log.info("Devices in configuration file found: %s ", '\n'.join(device for device in self.devices)) # Message to logger
def __connect_to_devices(self): # Function for opening connection and connecting to devices
for device in self.devices:
try: # Start error handler
connection_start = time.time()
if self.devices[device].get("serial") is None \
or self.devices[device]["serial"] is None \
or not self.devices[device]["serial"].isOpen(): # Connect only if serial not available earlier or it is closed.
self.devices[device]["serial"] = None
while self.devices[device]["serial"] is None or not self.devices[device]["serial"].isOpen(): # Try connect
'''connection to serial port with parameters from configuration file or default'''
self.devices[device]["serial"] = serial.Serial(
port=self.__config.get('port', '/dev/ttyUSB0'),
baudrate=self.__config.get('baudrate', 9600),
bytesize=self.__config.get('bytesize', serial.EIGHTBITS),
parity=self.__config.get('parity', serial.PARITY_NONE),
stopbits=self.__config.get('stopbits', serial.STOPBITS_ONE),
timeout=self.__config.get('timeout', 1),
xonxoff=self.__config.get('xonxoff', False),
rtscts=self.__config.get('rtscts', False),
write_timeout=self.__config.get('write_timeout', None),
dsrdtr=self.__config.get('dsrdtr', False),
inter_byte_timeout=self.__config.get('inter_byte_timeout', None),
exclusive=self.__config.get('exclusive', None)
)
time.sleep(.1)
if time.time() - connection_start > 10: # Break connection try if it setting up for 10 seconds
log.error("Connection refused per timeout for device %s", self.devices[device]["device_config"].get("name"))
break
except serial.serialutil.SerialException:
log.error("Port %s for device %s - not found", self.__config.get('port', '/dev/ttyUSB0'), device)
time.sleep(10)
except Exception as e:
log.exception(e)
time.sleep(10)
else: # if no exception handled - add device and change connection state
self.__gateway.add_device(self.devices[device]["device_config"]["name"], {"connector": self})
self.connected = True
def open(self): # Function called by gateway on start
self.stopped = False
self.start()
def get_name(self): # Function used for logging, sending data and statistic
return self.name
def is_connected(self): # Function for checking connection state
return self.connected
def load_converters(self): # Function for search a converter and save it.
devices_config = self.__config.get('devices')
try:
if devices_config is not None:
for device_config in devices_config:
if device_config.get('converter') is not None:
converter = TBUtility.check_and_import(self.__connector_type, device_config['converter'])
self.devices[device_config['name']] = {'converter': converter(device_config),
'device_config': device_config}
else:
log.error('Converter configuration for the custom connector %s -- not found, please check your configuration file.', self.get_name())
else:
log.error('Section "devices" in the configuration not found. A custom connector %s has being stopped.', self.get_name())
self.close()
except Exception as e:
log.exception(e)
def run(self): # Main loop of thread
try:
while True:
for device in self.devices:
serial = self.devices[device]["serial"]
ch = b''
data_from_device = b''
while ch != b'\n':
try:
try:
ch = serial.read(1) # Reading data from serial
except AttributeError as e:
if serial is None:
self.__connect_to_devices() # if port not found - try to connect to it
raise e
data_from_device = data_from_device + ch
except Exception as e:
log.exception(e)
break
try:
converted_data = self.devices[device]['converter'].convert(self.devices[device]['device_config'], data_from_device)
self.__gateway.send_to_storage(self.get_name(), converted_data)
time.sleep(.1)
except Exception as e:
log.exception(e)
self.close()
raise e
if not self.connected:
break
except Exception as e:
log.exception(e)
def close(self): # Close connect function, usually used if exception handled in gateway main loop or in connector main loop
self.stopped = True
for device in self.devices:
self.__gateway.del_device(self.devices[device])
if self.devices[device]['serial'].isOpen():
self.devices[device]['serial'].close()
def on_attributes_update(self, content): # Function used for processing attribute update requests from ThingsBoard
log.debug(content)
if self.devices.get(content["device"]) is not None: # checking - is device in configuration?
device_config = self.devices[content["device"]].get("device_config")
if device_config is not None:
log.debug(device_config)
if device_config.get("attributeUpdates") is not None:
requests = device_config["attributeUpdates"] # getting configuration for attribute requests
for request in requests:
attribute = request.get("attributeOnThingsBoard")
log.debug(attribute)
if attribute is not None and attribute in content["data"]:
try:
value = content["data"][attribute] # get value from content
str_to_send = str(request["stringToDevice"].replace("${" + attribute + "}", str(value))).encode("UTF-8") # form a string to send to device
self.devices[content["device"]]["serial"].write(str_to_send) # send string to device
log.debug("Attribute update request to device %s : %s", content["device"], str_to_send)
time.sleep(.01)
except Exception as e:
log.exception(e)
def server_side_rpc_handler(self, content):
pass
```
> **Converter**
```
Convert data from devices to the ThingsBoard format
```
* custom_serial_converter.py
```python=
from thingsboard_gateway.connectors.converter import Converter, log # Import base class for the converter and log ("converter.log" in logs directory).
class CustomSerialUplinkConverter(Converter): # Definition of class.
def __init__(self, config): # Initialization method
self.__config = config # Saving configuration to object variable
self.result_dict = {
'deviceName': config.get('name', 'CustomSerialDevice'),
'deviceType': config.get('deviceType', 'default'),
'attributes': [],
'telemetry': []
} # template for a result dictionary.
def convert(self, config, data: bytes): # Method for conversion data from device format to ThingsBoard format.
keys = ['attributes', 'telemetry'] # Array used for looking data for data processing.
for key in keys: # Data processing loop for parameters in keys array.
self.result_dict[key] = [] # Clean old data.
if self.__config.get(key) is not None: # Checking the parameter from the keys in the config.
for config_object in self.__config.get(key): # The loop for checking whether there is data that interests us.
data_to_convert = data # data for conversion.
if config_object.get('untilDelimiter') is not None: # Checking some parameter from configuration file.
data_to_convert = data.split(config_object.get('untilDelimiter').encode('UTF-8'))[0] # if "utilDelimiter" parameter in configuration file - get data from incoming data to delimiter position in received string.
if config_object.get('fromDelimiter') is not None: # Checking some parameter from configuration file.
data_to_convert = data.split(config_object.get('fromDelimiter').encode('UTF-8'))[1] # if "fromDelimiter" parameter in configuration file - get data from incoming data from delimiter position in received string.
if config_object.get('toByte') is not None: # Checking some parameter from configuration file.
to_byte = config_object.get('toByte') # # if "toByte" parameter in configuration file - get data from incoming data to byte number from a parameter "toByte" in configuration file.
if to_byte == -1: # Checking some parameter from configuration file.
to_byte = len(data) - 1 # If parameter == -1 - we will take data to the end.
data_to_convert = data_to_convert[:to_byte] # saving data to variable for sending
if config_object.get('fromByte') is not None: # Checking some parameter from configuration file
from_byte = config_object.get('fromByte') # if "fromByte" parameter in configuration file - get data from incoming data from byte number from a parameter "fromByte" in configuration file.
data_to_convert = data_to_convert[from_byte:] # saving data to variable for sending.
converted_data = {config_object['key']: data_to_convert.decode('UTF-8')} # Adding data from temporary variable to result string.
self.result_dict[key].append(converted_data) # Append result string to result dictionary.
return self.result_dict # returning result dictionary after all iterations.
```
## Reference
* [Custom IoT Gateway Connector](https://thingsboard.io/docs/iot-gateway/custom/)