---
tags: ROS2note
---
**這邊不再更新,請移動到 [ros2.tw](https://ros2.tw/),也歡迎一起貢獻 ROS 2 相關內容**
[ROS 2 筆記整理](https://hackmd.io/@evshary/ROS2Note/)
# 簡介
ROS 2 通訊層使用 DDS 的一大原因就是 DDS 有支援 QoS (Quality of Service)。QoS 能夠指定通訊相關的設定,像是資料傳輸的可靠性、是否要保留歷史資料等等。然而 DDS 所支援的 QoS 實在太多了,從官方規範就有 20 幾種,因此 ROS 2 只選擇跟機器人比較相關的其中六種出來:
| QoS | 功能 |
| ----------- | ---- |
| Reliability | 用來確認通訊的可靠性,有 best effort 和 reliable 兩種 |
| History | 能夠保留多少通訊的歷史資料,可以選擇 keep_last N 或 keep_all |
| Durability | 是否要將歷史資料提供給 late-joiner,可以選擇 volatile 或 transient_local |
| Liveliness | 主動確認 node 是否存活 |
| Lifespan | 設定資料的存活時間,如果超過時間未被收取將會捨棄,並且跳出警示 |
| Deadline | 在連續的資料間,最多可以有多長的間隔時間 |
一個很重要的點是,各個 QoS 本身上是獨立運行不會受到其他人影響,舉例來說,reliability 的設定不會影響到 durability,甚至其他 QoS 的行為。因此我們在看待這六個 QoS 的時候,可以當成六種可以調整的 config 來看帶。
官方有針對 Liveliness, Lifespan 和 Deadline 提供 [ROS 2 demo](https://github.com/ros2/demos/tree/master/quality_of_service_demo),如果想要實際了解其中內容,可以直接操作看看。
# RxO (Request vs Offered)
Publisher 和 Subscriber 要能夠通訊,兩者的 QoS 必須要先 compatible,也就是符合 RxO 才行。一言以蔽之,RxO 就是 Publisher 的 QoS 設定必須要高於 Subscriber 才行。舉個例子來說,Reliability 的 QoS 要能夠通訊,Publisher 和 Subscriber 必須要符合如下邏輯。
| Publisher | Subscriber | Compatible |
| ----------- | ----------- | ---------- |
| Best Effort | Best Effort | V |
| Best Effort | Reliable | X |
| Reliable | Best Effort | V |
| Reliable | Reliable | V |
由於定義上 Reliable > Best Effort,所以如果 Publisher 是 Best Effort,Subscriber 是 Reliable,兩者就會跳出 incompatible QoS 的警告,並且無法通訊。
看到這邊一定會有個問題:要怎麼知道哪些設定是高,哪些設定是低? 除了靠直覺外 (Reliable 比起 Best Effort 來說要求更高),可以查詢 [ROS 2 的 QoS Design Compatibility](https://index.ros.org/doc/ros2/Concepts/About-Quality-of-Service-Settings/#qos-compatibilities),或是直接去查詢 DDS 的說明,[OpenSplice DDS 的文件](http://download.prismtech.com/docs/Vortex/apis/ospl/isocpp2/html/a02530.html)就有詳細敘述。
# 實際範例
我放了一些範例在[GitHub](https://github.com/evshary/ROS2_cheatsheet/tree/master/10.QoS)上,可以當作參考。
## C++
C++ 的 QoS API 可以參考 [rclcpp::QoS Class Reference](http://docs.ros2.org/latest/api/rclcpp/classrclcpp_1_1QoS.html)
基本上作法是先 init QoS,然後在 create publisher / subscriber 的時候把 QoS 帶入即可。
* history, reliability, durability
這三者的設定還蠻容易的,所以就放一起來表示。
```cpp
// 先 init QoS 的物件
rclcpp::QoS qos_settings(10);
// 如果要設定 keep_last 10, reliable, volatile
qos_settings.keep_last(10).reliable().durability_volatile();
// 如果要設定 keep_all, best_effort, transient local
qos_settings.keep_all().best_effort().transient_local();
// 然後再將 QoS 在 create publisher 時帶入即可
publisher_ = this->create_publisher<std_msgs::msg::String>("topic", qos_settings);
```
* liveliness
可參考 [官方 liveliness demo](https://github.com/ros2/demos/blob/master/quality_of_service_demo/rclcpp/src/liveliness.cpp)
```cpp
qos_settings.liveliness(RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC);
qos_settings.liveliness_lease_duration(std::chrono::milliseconds(2000));
rclcpp::SubscriptionOptions sub_options;
sub_options.event_callbacks.liveliness_callback =
[](rclcpp::QOSLivelinessChangedInfo & event)
{
printf("Liveliness changed event: \n");
printf(" alive_count: %d\n", event.alive_count);
printf(" not_alive_count: %d\n", event.not_alive_count);
printf(" alive_count_change: %d\n", event.alive_count_change);
printf(" not_alive_count_change: %d\n", event.not_alive_count_change);
};
```
* lifespan
可參考 [官方 lifespan demo](https://github.com/ros2/demos/blob/master/quality_of_service_demo/rclcpp/src/lifespan.cpp)
```cpp
qos_settings.lifespan(std::chrono::milliseconds(2000));
```
* deadline
可參考 [官方 deadline demo](https://github.com/ros2/demos/blob/master/quality_of_service_demo/rclcpp/src/deadline.cpp)
```cpp
qos_settings.deadline(std::chrono::milliseconds(2000));
// publisher callback
rclcpp::PublisherOptions pub_options;
pub_options.event_callbacks.deadline_callback =
[](rclcpp::QOSDeadlineOfferedInfo & event) -> void
{
printf("Offered deadline missed: total %d delta %d\n", event.total_count, event.total_count_change);
};
auto publisher_ = this->create_publisher<std_msgs::msg::String>("topic", qos_settings, pub_options);
// subscriber callback
rclcpp::SubscriptionOptions sub_options;
sub_options.event_callbacks.deadline_callback =
[](rclcpp::QOSDeadlineRequestedInfo & event) -> void
{
printf("Requested deadline missed: total %d delta %d\n", event.total_count, event.total_count_change);
};
auto subscription_ = this->create_subscription<std_msgs::msg::String>("topic", qos_settings, std::bind(&MinimalSubscriber::topic_callback, this, _1), sub_options);
```
## Python
Python 的 QoS API 可以參考 [rclpy Quality of Service](http://docs.ros2.org/latest/api/rclpy/api/qos.html)
# 官方建議設定
官方有些建議的 QoS 設定,可參考 [qos_profiles.h](https://github.com/ros2/rmw/blob/master/rmw/include/rmw/qos_profiles.h)
舉個例子來說,sensor data 並不在乎資料的 reliable,所以會設定為 best effort,盡可能快速傳資料。過去 sensor data 的歷史資料也不重要,所以 durability 的部分用 volatile 即可。
# Reference
* [ROS 2 wiki about QoS](https://index.ros.org/doc/ros2/Concepts/About-Quality-of-Service-Settings/)
* [ROS 2 對 QoS 的設計理念](https://design.ros2.org/articles/qos.html)
* [ROS 2 對 deadline, liveliness, lifespan 的使用](https://design.ros2.org/articles/qos_deadline_liveliness_lifespan.html)