# Máquina de Estados - Docker IC
## :ghost:
OS: Ubuntu 20.04
ROS: Noetic (ROS 1)
Autor: Lucca Gandra (luccagandra@poli.ufrj.br)
## Data_wrapper.py
Recebe os estados e armazena em variáveis da classe DroneData, que será chamada por todos os sub-estados da aplicação.
```python
#!/usr/bin/python3
import rospy
from nav_msgs.msg import Odometry
from geometry_msgs.msg import PoseStamped
from geographic_msgs.msg import GeoPoseStamped
from mavros_msgs.msg import StatusText, HomePosition
from sensor_msgs.msg import NavSatFix
from darknet_ros_msgs.msg import BoundingBoxes, BoundingBox
from colours import *
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
from tf.transformations import euler_from_quaternion, quaternion_from_euler
import numpy as np
# This wrapper handles all the sensor data used by the State Machines
class DroneData:
_instance = None # Keep instance reference
# Ensure singleton behaviour
def __new__(cls, *args, **kwargs):
if not cls._instance:
rospy.loginfo("DroneData initialized")
cls._instance = object.__new__(cls, *args, **kwargs)
return cls._instance
def __init__(self):
self.first_msg_received = False
self.odometry = None
self.bridge = CvBridge()
# Subscribers
rospy.Subscriber("/mavros/global_position/local", Odometry, self.odometryCallback)
rospy.Subscriber("/mavros/global_position/global", NavSatFix, self.globalposCallback)
rospy.Subscriber("/mavros/home_position/home", HomePosition, self.homeCallback)
def odometryCallback(self, data):
self.odometry = data
def homeCallback(self, data):
self.home = data
def globalposCallback(self, data):
self.globalpos = data
def get_current_yaw(self):
return euler_from_quaternion([self.odometry.pose.pose.orientation.x, self.odometry.pose.pose.orientation.y, self.odometry.pose.pose.orientation.z, self.odometry.pose.pose.orientation.w])[2]
if __name__ == "__main__":
Wrapper = DroneData()
rospy.init_node('Data_Wrapper')
```
## Control_wrapper.py
Geralmente utilizado para publicar tópicos e armazenar funções úteis para a alpicação.
```python
#!/usr/bin/python3
import rospy
import math
from geometry_msgs.msg import PoseStamped
from geometry_msgs.msg import Twist
from geographic_msgs.msg import GeoPoseStamped
from tf.transformations import quaternion_from_euler
from Data_Wrapper import DroneData
from mavros_msgs.srv import CommandLong, CommandLongRequest
from std_msgs.msg import Bool
from colours import *
from tf.transformations import euler_from_quaternion, quaternion_from_euler
import numpy as np
import cv2
import math
class Control_Wrapper():
_instance = None # Keep instance reference
# Ensure singleton behaviour
def __new__(cls, *args, **kwargs):
if not cls._instance:
rospy.loginfo("DroneData initialized")
cls._instance = object.__new__(cls, *args, **kwargs)
return cls._instance
def __init__(self):
#DroneData.__init__(self)
# GPS Coordinates
self._GLOBAL_position_stp_msg = GeoPoseStamped()
self._pub_GLOBAL_position_stp = rospy.Publisher("/mavros/setpoint_position/global", GeoPoseStamped, queue_size=1)
# LOCAL_NED
self._LOCAL_position_stp_msg = PoseStamped()
self._pub_LOCAL_position_stp = rospy.Publisher("/mavros/setpoint_position/local", PoseStamped, queue_size=1)
# BODY_NED Possible?
self._MAV_position_stp_msg = PoseStamped()
self._pub_MAV_position_stp = rospy.Publisher("/mavros/setpoint_position/local", PoseStamped, queue_size=1)
# BODY_NED
self._MAV_velocity_stp_msg = Twist()
self._pub_MAV_velocity_stp = rospy.Publisher("/mavros/setpoint_velocity/cmd_vel_unstamped", Twist, queue_size=1)
self.ns = rospy.get_namespace()
self.command_client = rospy.ServiceProxy(
name="{}mavros/cmd/command".format(self.ns), service_class=CommandLong
)
def publish_global_pos(self):
self._pub_GLOBAL_position_stp.publish(self._GLOBAL_position_stp_msg)
def publish_local_pos(self):
self._pub_LOCAL_position_stp.publish(self._LOCAL_position_stp_msg)
def publish_mav_pos(self):
self._pub_MAV_position_stp.publish(self._MAV_position_stp_msg)
def publish_vel(self):
self._pub_MAV_velocity_stp.publish(self._MAV_velocity_stp_msg)
def SetpointGlobalPos(self, x, y, z, yaw):
self._GLOBAL_position_stp_msg.pose.position.latitude = x
self._GLOBAL_position_stp_msg.pose.position.longitude = y
self._GLOBAL_position_stp_msg.pose.position.altitude = z
quaternion = quaternion_from_euler(0,0,float(yaw))
self._GLOBAL_position_stp_msg.pose.orientation.x = quaternion[0]
self._GLOBAL_position_stp_msg.pose.orientation.y = quaternion[1]
self._GLOBAL_position_stp_msg.pose.orientation.z = quaternion[2]
self._GLOBAL_position_stp_msg.pose.orientation.w = quaternion[3]
def SetpointLocalPos(self, x, y, z, yaw):
self._LOCAL_position_stp_msg.pose.position.x = x
self._LOCAL_position_stp_msg.pose.position.y = y
self._LOCAL_position_stp_msg.pose.position.z = z
quaternion = quaternion_from_euler(0,0,float(yaw))
self._LOCAL_position_stp_msg.pose.orientation.x = quaternion[0]
self._LOCAL_position_stp_msg.pose.orientation.y = quaternion[1]
self._LOCAL_position_stp_msg.pose.orientation.z = quaternion[2]
self._LOCAL_position_stp_msg.pose.orientation.w = quaternion[3]
def SetpointMavPos(self, x, y, z, yaw):
self._MAV_position_stp_msg.pose.position.x = x
self._MAV_position_stp_msg.pose.position.y = y
self._MAV_position_stp_msg.pose.position.z = z
quaternion = quaternion_from_euler(0,0,float(yaw))
self._MAV_position_stp_msg.pose.orientation.x = quaternion[0]
self._MAV_position_stp_msg.pose.orientation.y = quaternion[1]
self._MAV_position_stp_msg.pose.orientation.z = quaternion[2]
self._MAV_position_stp_msg.pose.orientation.w = quaternion[3]
def SetpointLinearVel(self, x, y, z):
self._MAV_velocity_stp_msg.linear.x = x
self._MAV_velocity_stp_msg.linear.y = y
self._MAV_velocity_stp_msg.linear.z = z
def SetpointAngularVel(self, roll, pitch, yaw):
self._MAV_velocity_stp_msg.angular.x = roll
self._MAV_velocity_stp_msg.angular.y = pitch
self._MAV_velocity_stp_msg.angular.z = yaw
def CheckPositionReached(self, init_position, final_position):
dx = abs(
init_position.x - final_position.x
)
dy = abs(
init_position.y - final_position.y
)
dz = abs(
init_position.z - final_position.z
)
dMag = math.sqrt(pow(dx, 2) + pow(dy, 2) + pow(dz, 2))
# TO-DO Acrescentar verificação de yaw
if dMag < rospy.get_param("state_machine/P2"):
return True
else:
return False
def CheckGlobalPositionReached(self, init_position, final_position):
if self.distance_btw_coordinates(init_position.latitude, init_position.longitude, final_position.latitude, final_position.longitude) < rospy.get_param("state_machine/P2"):
return True
else:
return False
def send_id_tower(self, state, p1, p2, p3, p4, p5, p6):
send_command_autonomous = CommandLongRequest()
send_command_autonomous.command = 244
send_command_autonomous.param1 = state
send_command_autonomous.param2 = p1
send_command_autonomous.param3 = p2
send_command_autonomous.param4 = p3
send_command_autonomous.param5 = p4
send_command_autonomous.param6 = p5
send_command_autonomous.param7 = p6
self.command_client(send_command_autonomous)
```
## Main_State_Machine.py
A máquina de estados fica mais organizada dessa forma. Imagine que queremos realizar uma tarefa com o drone, mas existem vários caminhos intermediários para realizá-la (muito comum). Então, faz sentido organizar a aplicação em torno de um caminho bem sucedido, e cada problema gera um retorno para um sub-estado anterior.
```python
#!/usr/bin/python3
import rospy
import smach
if __name__ == '__main__':
rospy.init_node('main_statemachine')
# Closed Loop SM
from SSM_AwaitingAutonomous import *
from SSM_Takeoff import *
from SSM_Common import *
# Declaring the main state machine
rospy.Rate(1)
rospy.loginfo("Starting Main State Machine...")
mainStateMachine = smach.StateMachine(outcomes=['End'])
with mainStateMachine:
# Closed Loop (State 1 - AwaitingAutonomous)
smach.StateMachine.add('AwaitingCommand', awaitingCommand,
transitions={'Done': 'Takeoff'})
# Closed Loop (State 2 - Takeoff)
smach.StateMachine.add('Takeoff', takeoff,
transitions={'Done': 'Land'})
# Open Loop (State 3 - Land)
smach.StateMachine.add('Land', Land(),
transitions={'Done': 'End'})
"""
# Executing main state machine
mainStateMachine.execute()
```
## SSM_Takeoff.py
Sub-máquina de estados: Takeoff.
A condição para este estado ser inicializado é o retorno da string 'Done', de 'AwaitingCommand'. Ou seja, quando houver algum comando específico neste sub-estado. (Pode ser um True de algum tópico).
```python
smach.StateMachine.add('AwaitingCommand', awaitingCommand,
transitions={'Done': 'Takeoff'})
```
A máquina de estados principal então realiza o link (Done -> Takeoff) e passa a executar a sub-máquina de estados Takeoff. Que então só será finalizada com um retorno "Done" de dentro da sua própria máquina de estados
O Takeoff funciona da seguinte forma:
O estado "Takeoff" manda um setpoint de posição e imediatamente retorna "Waiting_To_Reach_Waypoint".
"Waiting_To_Reach_Waypoint" então fica em loop, pois se ```self.control.CheckPositionReached``` for False, há a continuidade no mesmo estado.
```python
transitions={'Waiting_To_Reach_Waypoint': 'Waiting_To_Reach_Waypoint'}
```
Contudo, com um valor True de ```self.control.CheckPositionReached```, o retorno é "Reached", e a transição realizada é:
```python
transitions={'Reached': 'Done'}
```
"Done" é a condição para finalizar a sub-máquina de estados Takeoff.
```python
#!/usr/bin/python3
import smach
from States_Takeoff import *
takeoff = smach.StateMachine(outcomes=['Done'])
with takeoff:
# Declaring state
smach.StateMachine.add('Takeoff', Takeoff(),
transitions={'Waiting_To_Reach_Waypoint': 'Waiting_To_Reach_Waypoint'})
smach.StateMachine.add('Waiting_To_Reach_Waypoint', GoToWaypoint(),
transitions={'Waiting_To_Reach_Waypoint': 'Waiting_To_Reach_Waypoint',
'Reached': 'Done'})
if __name__ == '__main__':
takeoff.execute()
```
## States_Takeoff.py
```python
#!/usr/bin/python3
import rospy
import smach
from geometry_msgs.msg import Point
from Control_Wrapper import Control_Wrapper
from Data_Wrapper import DroneData
class Takeoff(smach.State, DroneData):
def __init__(self):
smach.State.__init__(self, outcomes=["Waiting_To_Reach_Waypoint"])
DroneData.__init__(self)
self.control = Control_Wrapper()
def execute(self, userdata):
tower_altitude = self.qgroundcoordinates.pose.position.altitude
# Setpoint LOCAL pois tower_altitude tem referência local
self.control.SetpointMavPos(self.odometry.pose.pose.position.x, self.odometry.pose.pose.position.y, tower_altitude, self.get_current_yaw())
self.control.publish_mav_pos()
return 'Waiting_To_Reach_Waypoint'
class GoToWaypoint(smach.State, DroneData):
def __init__(self):
smach.State.__init__(self, outcomes=["Waiting_To_Reach_Waypoint", "Reached"])
DroneData.__init__(self)
self.control = Control_Wrapper()
self.first_iteration = True
self.waypoint = Point()
def execute(self, userdata):
curr_pos = self.odometry.pose.pose.position
if self.first_iteration:
self.waypoint.x = self.odometry.pose.pose.position.x
self.waypoint.y = self.odometry.pose.pose.position.y
self.waypoint.z = self.qgroundcoordinates.pose.position.altitude
if self.control.CheckPositionReached(curr_pos, self.waypoint) == False:
return 'Waiting_To_Reach_Waypoint'
else:
return 'Reached'
```