# main() :::info I am aware that this is outside the sccope of the Tuturial, this is more of a personl excercise in trying to explain my spaghetti code. ::: The entire code that is used to control the Dyson fan will be explained here. *The numbers in breakers () refer to the line number in the code black above that section.* ```python= def main(): check_ir_signal_files() set_up_mqtt() indicator_light() global state_changed temp = get_temp() send_data(ujson.dumps({ "temp":temp, "speed":current_state["speed"], "on_off":int(current_state["is_on"]), "auto":int(current_state["auto_on"]) })) max_time_intervall = int(config["maximum time interval"]) last_data_sent = time.ticks_ms() while True: temp = get_temp() mqtt_client.check_msg() if current_state["auto_on"]: autonomous(temp) sleep_time = 60 else: manual() sleep_time = 3 if state_changed: send_data(ujson.dumps({ "temp":temp, "speed":current_state["speed"], "on_off":int(current_state["is_on"]), "auto":int(current_state["auto_on"]) })) last_data_sent = time.ticks_ms() state_changed = False update_state() # checks how long since the last temp has been sent elif (time.ticks_ms() - last_data_sent) > max_time_intervall: send_data(ujson.dumps({"temp":temp})) last_data_sent = time.ticks_ms() time.sleep(sleep_time) ``` At the top of the main function we first call [check_ir_signal_files()](#check_ir_signal_files()) to test for the presence of the files that contain the IR signals (2). Next, the connection to the MQTT broker is established by calling [set_up_mqtt()](#set_up_mqtt()) (3). The indicator light is also started here with [indicator_light()](#indicator_light()) (4). Then a temperature measurement is taken with [get_temp()](#get_temp()) and the current state of the board is sent to the dashboard using [send_data()](#send_data(data_msg)) (7), as a JSON object, to ensure that they are synchronized and the time is recorded of the last time data was sent (14). This is the set-up of the program that will run once when the board is turned on. Once the setup is done the while loop starts that will run continuously (16). At the top of the loop, aanother temperature measurement is taken (17) and [check_msg()](#check_msg()) is called on the mqtt client object to check if there are any new messages from the MQTT broker (18). Once the messages are evaluated the first of two if-else statements is executed (19). This conditional depends on the state of the autonomous_mode; if it is on, the [autonomous()](#autonomous(temp)) function is called(20), then the sleep time is set to 60 seconds (21); else, [manual()](#manual()) is excecuted (23) and sets the sleep timer to 3 seconds (24). The sleep timers are different because, if the physical interface is used the program needs to be more responsive than if the decisions are entirely made based on temperature. The second if-else statement (25) determines what and when data is sent. It first checks if a change has occurred during the last cycle; if it has, the entire state of the fan is [sent](#send_data()) (26), both the `last_data_sent` (32) and the `state_changed` (33) variable are reset, and the updated state is saved to `dyson_state.py`; else it checks how much time has passed since the last time data was sent (35). If it has been more then `max_time_intervall`, which is set to 60 seconds by default, the temperature is [sent](#send_data())(36), and the time is reset(37). ## check_ir_signal_files() ```python= # Check if the required IR signal files are present def check_ir_signal_files(): file_names = ['on_off.py','speed_down.py','speed_up.py'] for file in file_names: try: with open("ir_signals/{}".format(file), 'r'): continue except: print("{} file not found".format(file)) acquire_ir_signal(file) time.sleep(1) ``` The `file_name` variable contains a list of the file names (3) that contain the IR signals for turning the fan off/on, speed down, and up. The program loops through the list (4) and attempts to open each file (6). If the file opens successfully it means the file exists and the next file can be tested(7), but if the file can not be opened an error will occur and the exception block will be executed(8). In this block the use is informed of the missing file (9) and [acquire_ir_signal(file)](#acquire_ir_signal(file_name)) is called (10) with the missing file name as the argument. ### acquire_ir_signal(file_name) ```python= # Acquire IR signals from the remote control def acquire_ir_signal(file_name): prompt = { 'on_off.py': "POWER", 'speed_down.py': "DOWN", 'speed_up.py': "UP" } # prompt the user press the button corresponding to the file print("Please point the remote at the IR receiver an press the {} button".format(prompt[file_name])) lst = test(Pin(16, Pin.IN)) # creat the file and save the signal with open("ir_signals/{}".format(file_name), 'w') as f: ujson.dump(lst, f) ``` `prompt` is a dictionary that associates each file name with a key word (3). The keyword is placed in the prompt that is sent to the user (9) to inform them what button needs to be pressed. The `test()` function form the `ir_rx` library is called, this functions records the IR signal and returns it, the signal is assigned to the `lst` variable (10). The signal is saved in a file corresponding to the file name that was past as the argument (12). When all files have been tested we continue in the main function. ## set_up_mqtt() ```python= # Set up MQTT client PORT = 1883 client_ID = ubinascii.hexlify(machine.unique_id()) mosquitto_server = config["MQTT broker IP"] mosquitto_user = config["MQTT broker user"] mosquitto_key = config["MQTT broker key"] data_topic = "devices/data" commands_topic = "devices/command" mqtt_client = MQTTClient(client_ID, mosquitto_server, PORT, mosquitto_user, mosquitto_key, keepalive=120) def set_up_mqtt(): print(f"Begin connection with MQTT Broker :: {mosquitto_server}") try: mqtt_client.set_callback(sub_cb) mqtt_client.connect() mqtt_client.subscribe(commands_topic) print(f"connected to MQTT Broker :: {mosquitto_server}") except Exception as e: print(f"Failed to connect to MQTT Broker :: {mosquitto_server}") print(f"Error: {e}") ``` First several variables are initiated with values from the `config.py` file, the topics variables are initiated, and the mqtt object is created. Then in the function we set the call back function (16) that will evaluate any messages received form the topics it is subscribed to; The connection to the broker is established (17); And a subscription to the command topic is created (18). If there is an error during these steps, like the mqtt broker being offline, the user is informed and the program will end (20). ## indicator_light() ```python= # On-Off indication light def indicator_light(): LED_Green.value(int(current_state["is_on"])) LED_Red.value(int(not current_state["is_on"])) ``` Changes the color of the RG LED depending on the on/off state in the `current_state` dictionary. ## get_temp(data_msg) ```python= # Get temperature from the sensor def get_temp(): temperature_sensor.measure() return temperature_sensor.temperature() ``` Gets the temperature measurement from the DHT11 and returns it. ## send_data() ```python= # Send data to the MQTT broker def send_data(data_msg): print("data sent") mqtt_client.publish(topic=data_topic, msg=data_msg) ``` Takes data as an argument and sends it to the mqtt broker using the `data_topic`. ## check_msg() This is a Function from the mqtt library that checks if any new messages have been posted to the subscribed topics, if there is a new messages the call back function is called that was established in `set_up_mqtt()` ### sub_cb(topic, msg) ```python= # Callback function for MQTT messages def sub_cb(topic, msg): print("Received message:", msg) global state_changed # If autonomous_mode mode is ON only take action if the off signal is sent if msg == b'auto_power': print("autonomous mode is now off") current_state["auto_on"] = not current_state["auto_on"] state_changed = True # Set the rotary sensor to the current speed rotation.set(value = current_state["speed"]) if not current_state["auto_on"]: if msg == b"power": ON_OFF() elif current_state["is_on"]: rotation.set(value=int(msg)) ``` - The received message is printed for debugging (3). - The global `state_change` variable is brought into the scope of the function so it can be altered (4). - The program checks if the message is `auto_power` (6). - If is receives the message `auto_power` the value of `current_state["auto_on"]` will be inverted (8), the `state_changed` flag is changed to `True` (9), and the current value of the rotary encoder is set to the current fan speed (11). - This is important since while autonomous mode is on the rotary encoder is not checked, so it is going to be in the same position is was in before autonomous mode turned on. For example, if the fan was set to speed 4 with the rotary encoder, and then the autonomous mode is turned on which switches it to 7. When the autonomous mode is turned off the program again start checking the rotary encoder and will find it set to 4. The program will then set the fan speed to 4. So by setting the value of the rotary encoder when autonomous mode is turned off the fan will remain at the same speed without unintentional changes. - It then If the autonomous mode is off, if it is all possible messages are checked. - `power` will call the [ON_OFF()](#ON_OFF()) function (15). - If the program has gotten this far the only remaining possible message is a speed change, If the fan is currently on the position of the rotary encoder is set to the value of the msg (17), else no action will be taken with the message. ## ON_OFF() ```python= # Send the ON/OFF IR signal to control the Dyson def ON_OFF(): global state_changed state_changed = True try: with open('ir_signals/on_off.py', 'r') as f: lst = ujson.load(f) ir.play(lst) # Wait for the transmitter to finish sending the signal time.sleep_ms(50) current_state['is_on'] = not current_state["is_on"] rotation.set(value=current_state["speed"]) indicator_light() except OSError: print("Error: Failed to open IR signal file.") ``` The `state_changed` flag is triggered (4). The file containing the on_off signal is opened (6) and its contend is assigned to `lst` (7) and played with the IR transmitter (9). The program sleeps for 50 milliseconds (10), this is important, if the program dose not sleep the signal is not sent properly. I suspect this is because if we don't sleep the transmitter is still sending the signal when the function finishes and the value of `lst` is discarded. The value of `current_state["is_on"]` is inverted (12), and the value of the rotary encoder is set to the current speed (13), in case the position of the encoder was changed while the fan was off. [indicator_light](#indicator_light()) is called (14). ## autonomous(temp) ```python= # Autonomous mode dyson_on_temp = int(config["dyson on temp"]) dyson_off_temp = int(config["dyson off temp"]) medium_break_point = int(config["medium break point"]) fast_break_point = int(config["fast break point"]) def autonomous(temp): # checks breakpoint and takes action accordingly if not current_state["is_on"] and temp > dyson_on_temp: ON_OFF() if current_state["is_on"]: if temp < dyson_off_temp: ON_OFF() elif temp < medium_break_point: if current_state["speed"] != 3: change_speed_to(3) elif temp < fast_break_point: if current_state["speed"] != 6: change_speed_to(6) elif current_state["speed"] != 9: change_speed_to(9) ``` First the breakpoint are loaded from the config file (1-5). The program checks the fan state against and temperature against the break point to determine what actions are necessary. ## manual() ```python= # Manual mode def manual(): if push_button.value() == False: ON_OFF() if current_state["is_on"]: rotary() ``` checks value of the push button (3), if it is False (the value when the button is pressed) call [ON_OFF()](#ON_OFF()) (4). If the fan is on check [rotary()](#rotary()) (6). ## rotary() ```python= def rotary(): speed_new = rotation.value() if current_state["speed"] != speed_new and speed_new >= 1 and speed_new <= 10: change_speed_to(speed_new) ``` Get the value of the rotary encoder (2) Check if the value is different then the current speed of the fan, and if it in the possible range (3). While test I once got a value out side of the range, I don't know why and it hasn't happened since but I still added the check to be save. If it is both different and an acceptable value, call [change_speed_to(speed_new)](#change_speed_to(speed_new)) with the new value as the argument(4). ### change_speed_to(speed_new) ```python= def change_speed_to(speed_new): # Calculate the direction and magnitude of speed change speed_difference = speed_new - current_state["speed"] # Determine the direction based on the speed difference if speed_difference > 0: direction = "speed_up" # Increase speed else: direction = "speed_down" # Decrease speed current_state["speed"] = speed_new speed_change(direction, abs(speed_difference)) ``` calculate the difference between the old speed and the new (3). If the difference is positive the speed needs to be increased (7). If it is negative it needs to be decreased (9). Updates the current speed (11). call [speed_change(direction, abs(speed_difference))](#speed_change(direction),abs(speed_difference)) with the argument of the direction of change and the absolute value of the change (12). ### speed_change(direction,abs(speed_difference)) ```python= # Send the corresponding IR signals for the given speed difference def speed_change(direction, speed_difference): global state_changed state_changed = True file_path ='ir_signals/{}.py'.format(direction) try: with open(file_path, 'r') as f: lst = ujson.load(f) # Send the signal for each unit of speed change for x in range(speed_difference): ir.play(lst) time.sleep_ms(500) except OSError: print("Error: Failed to open IR signal file.") ``` The `state_changed` flag is triggered (4). The file that corresponds to the direction of change is opened (7). The signal is send once for each step in the speed change (12). The program sleeps to give the fan time to process the change (13). ## update_state() ```python= # Update the state file with the current Dyson state def update_state(): try: with open('dyson_state.py', 'w') as f: ujson.dump(current_state, f) # prints the current state for debugging print("Dyson state is: ", current_state) except Exception as e: print("Error updating state:", e) ``` The state in the `dyson_state.py` file is updated with the changed state (5). The current state of the fan is printed for debugging (7).