# Máquina de Estados - Docker IC ## :ghost: OS: Ubuntu 20.04 ROS: Noetic (ROS 1) Autor: Lucca Gandra (luccagandra@poli.ufrj.br) ## Data_wrapper.py Recebe os estados e armazena em variáveis da classe DroneData, que será chamada por todos os sub-estados da aplicação. ```python #!/usr/bin/python3 import rospy from nav_msgs.msg import Odometry from geometry_msgs.msg import PoseStamped from geographic_msgs.msg import GeoPoseStamped from mavros_msgs.msg import StatusText, HomePosition from sensor_msgs.msg import NavSatFix from darknet_ros_msgs.msg import BoundingBoxes, BoundingBox from colours import * from sensor_msgs.msg import Image from cv_bridge import CvBridge from tf.transformations import euler_from_quaternion, quaternion_from_euler import numpy as np # This wrapper handles all the sensor data used by the State Machines class DroneData: _instance = None # Keep instance reference # Ensure singleton behaviour def __new__(cls, *args, **kwargs): if not cls._instance: rospy.loginfo("DroneData initialized") cls._instance = object.__new__(cls, *args, **kwargs) return cls._instance def __init__(self): self.first_msg_received = False self.odometry = None self.bridge = CvBridge() # Subscribers rospy.Subscriber("/mavros/global_position/local", Odometry, self.odometryCallback) rospy.Subscriber("/mavros/global_position/global", NavSatFix, self.globalposCallback) rospy.Subscriber("/mavros/home_position/home", HomePosition, self.homeCallback) def odometryCallback(self, data): self.odometry = data def homeCallback(self, data): self.home = data def globalposCallback(self, data): self.globalpos = data def get_current_yaw(self): return euler_from_quaternion([self.odometry.pose.pose.orientation.x, self.odometry.pose.pose.orientation.y, self.odometry.pose.pose.orientation.z, self.odometry.pose.pose.orientation.w])[2] if __name__ == "__main__": Wrapper = DroneData() rospy.init_node('Data_Wrapper') ``` ## Control_wrapper.py Geralmente utilizado para publicar tópicos e armazenar funções úteis para a alpicação. ```python #!/usr/bin/python3 import rospy import math from geometry_msgs.msg import PoseStamped from geometry_msgs.msg import Twist from geographic_msgs.msg import GeoPoseStamped from tf.transformations import quaternion_from_euler from Data_Wrapper import DroneData from mavros_msgs.srv import CommandLong, CommandLongRequest from std_msgs.msg import Bool from colours import * from tf.transformations import euler_from_quaternion, quaternion_from_euler import numpy as np import cv2 import math class Control_Wrapper(): _instance = None # Keep instance reference # Ensure singleton behaviour def __new__(cls, *args, **kwargs): if not cls._instance: rospy.loginfo("DroneData initialized") cls._instance = object.__new__(cls, *args, **kwargs) return cls._instance def __init__(self): #DroneData.__init__(self) # GPS Coordinates self._GLOBAL_position_stp_msg = GeoPoseStamped() self._pub_GLOBAL_position_stp = rospy.Publisher("/mavros/setpoint_position/global", GeoPoseStamped, queue_size=1) # LOCAL_NED self._LOCAL_position_stp_msg = PoseStamped() self._pub_LOCAL_position_stp = rospy.Publisher("/mavros/setpoint_position/local", PoseStamped, queue_size=1) # BODY_NED Possible? self._MAV_position_stp_msg = PoseStamped() self._pub_MAV_position_stp = rospy.Publisher("/mavros/setpoint_position/local", PoseStamped, queue_size=1) # BODY_NED self._MAV_velocity_stp_msg = Twist() self._pub_MAV_velocity_stp = rospy.Publisher("/mavros/setpoint_velocity/cmd_vel_unstamped", Twist, queue_size=1) self.ns = rospy.get_namespace() self.command_client = rospy.ServiceProxy( name="{}mavros/cmd/command".format(self.ns), service_class=CommandLong ) def publish_global_pos(self): self._pub_GLOBAL_position_stp.publish(self._GLOBAL_position_stp_msg) def publish_local_pos(self): self._pub_LOCAL_position_stp.publish(self._LOCAL_position_stp_msg) def publish_mav_pos(self): self._pub_MAV_position_stp.publish(self._MAV_position_stp_msg) def publish_vel(self): self._pub_MAV_velocity_stp.publish(self._MAV_velocity_stp_msg) def SetpointGlobalPos(self, x, y, z, yaw): self._GLOBAL_position_stp_msg.pose.position.latitude = x self._GLOBAL_position_stp_msg.pose.position.longitude = y self._GLOBAL_position_stp_msg.pose.position.altitude = z quaternion = quaternion_from_euler(0,0,float(yaw)) self._GLOBAL_position_stp_msg.pose.orientation.x = quaternion[0] self._GLOBAL_position_stp_msg.pose.orientation.y = quaternion[1] self._GLOBAL_position_stp_msg.pose.orientation.z = quaternion[2] self._GLOBAL_position_stp_msg.pose.orientation.w = quaternion[3] def SetpointLocalPos(self, x, y, z, yaw): self._LOCAL_position_stp_msg.pose.position.x = x self._LOCAL_position_stp_msg.pose.position.y = y self._LOCAL_position_stp_msg.pose.position.z = z quaternion = quaternion_from_euler(0,0,float(yaw)) self._LOCAL_position_stp_msg.pose.orientation.x = quaternion[0] self._LOCAL_position_stp_msg.pose.orientation.y = quaternion[1] self._LOCAL_position_stp_msg.pose.orientation.z = quaternion[2] self._LOCAL_position_stp_msg.pose.orientation.w = quaternion[3] def SetpointMavPos(self, x, y, z, yaw): self._MAV_position_stp_msg.pose.position.x = x self._MAV_position_stp_msg.pose.position.y = y self._MAV_position_stp_msg.pose.position.z = z quaternion = quaternion_from_euler(0,0,float(yaw)) self._MAV_position_stp_msg.pose.orientation.x = quaternion[0] self._MAV_position_stp_msg.pose.orientation.y = quaternion[1] self._MAV_position_stp_msg.pose.orientation.z = quaternion[2] self._MAV_position_stp_msg.pose.orientation.w = quaternion[3] def SetpointLinearVel(self, x, y, z): self._MAV_velocity_stp_msg.linear.x = x self._MAV_velocity_stp_msg.linear.y = y self._MAV_velocity_stp_msg.linear.z = z def SetpointAngularVel(self, roll, pitch, yaw): self._MAV_velocity_stp_msg.angular.x = roll self._MAV_velocity_stp_msg.angular.y = pitch self._MAV_velocity_stp_msg.angular.z = yaw def CheckPositionReached(self, init_position, final_position): dx = abs( init_position.x - final_position.x ) dy = abs( init_position.y - final_position.y ) dz = abs( init_position.z - final_position.z ) dMag = math.sqrt(pow(dx, 2) + pow(dy, 2) + pow(dz, 2)) # TO-DO Acrescentar verificação de yaw if dMag < rospy.get_param("state_machine/P2"): return True else: return False def CheckGlobalPositionReached(self, init_position, final_position): if self.distance_btw_coordinates(init_position.latitude, init_position.longitude, final_position.latitude, final_position.longitude) < rospy.get_param("state_machine/P2"): return True else: return False def send_id_tower(self, state, p1, p2, p3, p4, p5, p6): send_command_autonomous = CommandLongRequest() send_command_autonomous.command = 244 send_command_autonomous.param1 = state send_command_autonomous.param2 = p1 send_command_autonomous.param3 = p2 send_command_autonomous.param4 = p3 send_command_autonomous.param5 = p4 send_command_autonomous.param6 = p5 send_command_autonomous.param7 = p6 self.command_client(send_command_autonomous) ``` ## Main_State_Machine.py A máquina de estados fica mais organizada dessa forma. Imagine que queremos realizar uma tarefa com o drone, mas existem vários caminhos intermediários para realizá-la (muito comum). Então, faz sentido organizar a aplicação em torno de um caminho bem sucedido, e cada problema gera um retorno para um sub-estado anterior. ```python #!/usr/bin/python3 import rospy import smach if __name__ == '__main__': rospy.init_node('main_statemachine') # Closed Loop SM from SSM_AwaitingAutonomous import * from SSM_Takeoff import * from SSM_Common import * # Declaring the main state machine rospy.Rate(1) rospy.loginfo("Starting Main State Machine...") mainStateMachine = smach.StateMachine(outcomes=['End']) with mainStateMachine: # Closed Loop (State 1 - AwaitingAutonomous) smach.StateMachine.add('AwaitingCommand', awaitingCommand, transitions={'Done': 'Takeoff'}) # Closed Loop (State 2 - Takeoff) smach.StateMachine.add('Takeoff', takeoff, transitions={'Done': 'Land'}) # Open Loop (State 3 - Land) smach.StateMachine.add('Land', Land(), transitions={'Done': 'End'}) """ # Executing main state machine mainStateMachine.execute() ``` ## SSM_Takeoff.py Sub-máquina de estados: Takeoff. A condição para este estado ser inicializado é o retorno da string 'Done', de 'AwaitingCommand'. Ou seja, quando houver algum comando específico neste sub-estado. (Pode ser um True de algum tópico). ```python smach.StateMachine.add('AwaitingCommand', awaitingCommand, transitions={'Done': 'Takeoff'}) ``` A máquina de estados principal então realiza o link (Done -> Takeoff) e passa a executar a sub-máquina de estados Takeoff. Que então só será finalizada com um retorno "Done" de dentro da sua própria máquina de estados O Takeoff funciona da seguinte forma: O estado "Takeoff" manda um setpoint de posição e imediatamente retorna "Waiting_To_Reach_Waypoint". "Waiting_To_Reach_Waypoint" então fica em loop, pois se ```self.control.CheckPositionReached``` for False, há a continuidade no mesmo estado. ```python transitions={'Waiting_To_Reach_Waypoint': 'Waiting_To_Reach_Waypoint'} ``` Contudo, com um valor True de ```self.control.CheckPositionReached```, o retorno é "Reached", e a transição realizada é: ```python transitions={'Reached': 'Done'} ``` "Done" é a condição para finalizar a sub-máquina de estados Takeoff. ```python #!/usr/bin/python3 import smach from States_Takeoff import * takeoff = smach.StateMachine(outcomes=['Done']) with takeoff: # Declaring state smach.StateMachine.add('Takeoff', Takeoff(), transitions={'Waiting_To_Reach_Waypoint': 'Waiting_To_Reach_Waypoint'}) smach.StateMachine.add('Waiting_To_Reach_Waypoint', GoToWaypoint(), transitions={'Waiting_To_Reach_Waypoint': 'Waiting_To_Reach_Waypoint', 'Reached': 'Done'}) if __name__ == '__main__': takeoff.execute() ``` ## States_Takeoff.py ```python #!/usr/bin/python3 import rospy import smach from geometry_msgs.msg import Point from Control_Wrapper import Control_Wrapper from Data_Wrapper import DroneData class Takeoff(smach.State, DroneData): def __init__(self): smach.State.__init__(self, outcomes=["Waiting_To_Reach_Waypoint"]) DroneData.__init__(self) self.control = Control_Wrapper() def execute(self, userdata): tower_altitude = self.qgroundcoordinates.pose.position.altitude # Setpoint LOCAL pois tower_altitude tem referência local self.control.SetpointMavPos(self.odometry.pose.pose.position.x, self.odometry.pose.pose.position.y, tower_altitude, self.get_current_yaw()) self.control.publish_mav_pos() return 'Waiting_To_Reach_Waypoint' class GoToWaypoint(smach.State, DroneData): def __init__(self): smach.State.__init__(self, outcomes=["Waiting_To_Reach_Waypoint", "Reached"]) DroneData.__init__(self) self.control = Control_Wrapper() self.first_iteration = True self.waypoint = Point() def execute(self, userdata): curr_pos = self.odometry.pose.pose.position if self.first_iteration: self.waypoint.x = self.odometry.pose.pose.position.x self.waypoint.y = self.odometry.pose.pose.position.y self.waypoint.z = self.qgroundcoordinates.pose.position.altitude if self.control.CheckPositionReached(curr_pos, self.waypoint) == False: return 'Waiting_To_Reach_Waypoint' else: return 'Reached' ```