# Full Stack PBL #1
This page: https://hackmd.io/@yukimat/fullstack-2024-1
## 授業メモ(2025.01.21)
- ピンセットがあるとデバイスのセットアップがしやすい
- HackMDは岡大ネットワークからだとアクセスしづらい
## この授業のゴール
IoTセンサ(M5Stackなど)を用いてセンシングした時系列データをサーバ(Raspberry Piなど)にストリーミングしデータを蓄積していくシステムを構築する。しばしばこのようなシステムでは、データ取得間隔の異常(データの欠損)や計測値の異常が生じることがあるため、構築したシステムで異常検知を行う仕組みの開発を経験することをこの授業のゴールとする。

## Raspberry Pi の OS インストール
<!--
> _ToDo List for Editors_
> - [ ] Raspberry Pi の接続(電源,ディスプレイ,ケース(?),etc.)
> - 2024年度は少人数ですし,その場で説明していけば十分?(はら)
> - [x] (micro) SD カードへのOS書き込み
> - 2024年度は書き込み済みのSDカードを配布する
> - [ ] Raspberry Pi OSの初期設定
> - [ ] 日本語対応
> - English-onlyな講義を指向して,英語ONLYを貫いてもいいかも
> - 英語オンリーだとリカレント教育には辛そうかも?
> - [ ] `export LANG=C` ぐらいは教えておくとよさそう
> - [x] GPIOからのUSB接続(sshではなく,usb経由のtelnetもアリ?他のIoTデバイスと干渉しないか?)
> - ディスプレイ/キーボードを直接つなぐなら不要っぽい?
> - Raspberry Pi の GPIO は今回は使う案件はないかもですね(まつだ)
> - [ ] `apt` の利用に向けて `root` 権限か `sudo` の話が必要になる?
> - 直近の講義としては,ROOT ONLY の環境で突き進むのが楽かもしれない
> - ssh/xrdp等のリモート接続は無しで,tty 直接で管理するという建前であれば許容範囲か
> - [ ] プログラムはJupyter環境で書いてみる?それとも,何らかのEditor + python CLI で行くか?
> - なんとなく,python CLI のほうが直接いじってる感じはありそう.
> - jupyter だと,あとで streamlit 使うメリットが薄れそう.
> - Editor どうする問題.デフォルトで gnome editor ぐらいはありそう.あるいは,VS Codeでも入れておく?
- RPiOSはThonny入ってるイメージですが、VS Codeが安パイそうですね(まつだ)
- なるほど,Thonny便利そうですね.ただし venv 対応してるかどうか,ちょっと心配と言えば心配(はら)
- 発見 https://core-electronics.com.au/guides/using-virtual-environments-in-thonny-on-a-raspberry-pi/
//-->
今回はRaspberry Pi OSがインストール済みのSDカードを使いますが、自前で準備することも可能である。
### Important URLs
- Raspberry Pi OS: https://www.raspberrypi.com/software/
### Raspberry Pi OS インストール手順
1. Raspberry Pi Imager (https://www.raspberrypi.com/software/) をインストールする
2. SD Card を接続する
3. Raspberry Pi Imager を起動する
4. 「Raspberry Pi デバイス」「インストールする Raspberry Pi OS」および書き込み先の「SD Card」を選択する
5. 「NEXT」をクリック
6. 初期ユーザ・パスワードなどを設定したい場合は、設定を編集する
7. ウィザードに従っていけばOSの書き込みが完了する
### Raspberry Pi OS の種類
- Raspberry Pi OS: 通常はこれでOK
- Raspberry Pi OS Full: 通常版より多くのアプリケーションがプリインストールされるバージョン
- Raspberry Pi OS Lite: デスクトップ画面が無い、コマンドラインのみのOS(Raspberry Pi Zero などの低スペックデバイスでは有用)
---
## Raspberry Pi OS の初期設定
### 設定のサマリ
およそ,以下の項目についての設定を行っていきます.
- Username: `aiot24`
- Password:(詳細は以降の内容を参照)
- Netowork:
- WiFi ESSID:`GL-MT3000-320-5G`
- WiFi password:(詳細は以降の内容を参照)
- IP address: `192.168.8.xxx` ... 詳細は以降の内容を参照
### Raspberry Pi 本体を専用ケースに組み込もう!
完成するとこんな感じになります.

1. ケース付属品のヒートシンクを基盤中央の銀色の素子に貼り付ける
2. ケース取付済みのファンの電源ケーブルをPi本体に刺す **(基盤の左上あたり)**
- ヒント: 新品の場合,端子部のカバーを外す必要があるかもしれません
4. RTCクロック保持用のバッテリを刺す **(基盤の右下あたりの"BAT"端子)**
5. バッテリを両面テープでケースに貼り付ける(任意)
6. 最後にケースを閉じる
- ヒント: USB端子側から見たときに,基盤が見えなくなるまで奥に入れておかないと,ケースが閉まりません.
### 周辺機器を接続しよう!
周辺機器は,こんな感じでつないでいきます.
**重要**
ACアダプタの接続は最後!接続すると同時に電源ONになります.

1. OSをインストールしたmicroSDカードを,**裏表を間違えないように** 刺す
- ヒント: ケースを裏返して置いた時,SDカードの表面が見える向きに刺します.SDカードの端子が基盤に近い側になることをイメージしましょう.
3. Raspberry Pi (USB A) <--> (micro USB A) キーボード
- これは簡単ですね.
5. キーボード (USB A) <-- マウス
- 本キーボードはUSBハブとしても動作している.
6. Raspberry Pi (mini HDMI) <--> (micro HDMI) ディスプレイ
- Raspberry Pi キットの付属品ではなく,別袋のケーブルを利用する.
7. Raspberry Pi (USB A) <--> (USB C) ディスプレイ
- ディスプレイはUSB経由で電源供給される
8. Raspberry Pi (USB C) <--> ACアダプタ
- 念のため繰り返す.ACアダプタで通電させると,電源ONになります.
**確認**
- ディスプレイには2本のケーブル
- Raspberry Pi本体には4本のケーブル
という状態になるはずです.
### 起動直後の初期設定(1)
OSインストール直後(開封直後)の初期設定です.
#### 画面1
> Welcome to the Raspberry Pi ...
- Next
#### 画面2: Set Country
> Enter the details of your location...
基本,初期設定のまま
- Country: Japan
- Language: Japanese
- Timezone: Tokyo (Asia/Tokyo)
- 2つのチェックボックスは off
- User English language / Use US keyboard
#### 画面3: Create User
> You need to create ...
以下の設定で作成
- Enter username: `aiot24`
- Enter password: `pbl24aiot@GZ1` ... 最後の `GZ1` の部分は各グループ名に変更のこと
- Confirm password: (上記と同じ)
**※パスワードは,講義の都合上,指定パスワードとしてください.**
#### 画面4: Select Wifi Network
> Select your Wifi network from the list.
- `GL-MT3000-320-5G`
#### 画面5: Enter WiFi Password
> Enter the password for the WiFi ...
- `WABXY7YCF7`
#### 画面6: Choose Browser
> Both the Chromium ...
- Chromium
- チェックボックスは off
- Tick here to ...
#### 画面7: Enable Raspberry Pi Connect
> Raspberry Pi Connect is a service ...
- トグルボタンはオフ (左側に丸でグレーの状態)
- Enable Raspberry Pi ...
#### 画面8: Update Software
> The operating system and ...
- Next
※環境次第だが10分ぐらい見込むこと.
#### 画面9: Setup complete
> Your Raspberry Pi is now...
- Restart
### 起動直後の初期設定(2)
初期設定を終えた後の初回起動時についての設定です.
#### ダイアログ: Update Notification
> ... can now run labwc.
- Keep Wayfire
<!-- ToDo: 将来的には labwc に乗り換えたほうがよさそう? -->
<!-- メモ: 最近のラズパイでは日本語対応のための特別措置は不要と思われる -->
### ネットワークの設定
#### 設定のサマリ
|グループ名 |R-Pi IP addr |Netmask |(参考:利用想定範囲) |
|-----------|--------------|--------------|--------------------|
|(Faculties)|192.168.8.17 |255.255.255.0 |(192.168.8.16-31) |
|`GZ1` |192.168.8.33 |255.255.255.0 |(192.168.8.32-47) |
|`GZ2` |192.168.8.49 |255.255.255.0 |(192.168.8.48-63) |
|`GZ3` |192.168.8.65 |255.255.255.0 |(192.168.8.64-79) |
|`GZ4` |192.168.8.81 |255.255.255.0 |(192.168.8.80-95) |
|`GZ5` |192.168.8.97 |255.255.255.0 |(192.168.8.96-111) |
|`GZ6` |192.168.8.113 |255.255.255.0 |(192.168.8.112-128) |
- ネットマスク: `24` or `255.255.255.0`
- デフォルトゲートウェイ: `192.168.8.1`
#### 説明
設定の前に,現在の状態を確認しよう.
##### IPアドレス
ターミナルを開いて:
```bash=
ip addr
```
の実行結果を確認しよう.
1. `lo` というアダプタが存在する.LOOPBACKアダプタであり,常に自分自身のことを指す.
通常,IPアドレスは `127.0.0.1` である.
> コラム:
> OSによっては,`127.0.1.1` などが使われることもある.
> 後述のネットマスクを理解すれば,`/8`になっていることに気づく.
> すなわち,`127.x.x.x` の`x`がどんな数字でも,自分自身に通信しようと試みるということになる.
> (ただし,127.0.0.0 と 127.255.255.255 は特別な意味を持つので除く)
2. 今回は `wlan0` というネットワークアダプタが無線LANを指している.
3. state の後は,通常 `UP` か `DOWN` である.接続が確立しているか,否か,である.
4. `inet` と書かれた行に注目すると,現在のIPアドレスがわかる.初期起動時は DHCP で配布されたアドレスである.
例えば,`192.168.8.210/24` と書かれているならば,このホストのIPアドレスは`192.168.8.210`であり,
ネットマスク(後述)が`24 (255.255.255.0)` ということがわかる.
##### デフォルトゲートウェイ
```bash=
ip route
```
の出力結果を確認しよう.
1. **ルーティングテーブル**を表示するコマンドである.自ネットワーク外の各ネットワークと通信する際,どのホストとやり取りをするか,の対応表である.
2. `default ...` から始まる行がデフォルトゲートウェイである.機器によっては `0.0.0.0/0` に対するルートとして設定・表記する場合もある.
4. `via 192.168.8.1` は通信先のホストのIPアドレス
5. `dev wlan0` は通信に利用するデバイス
#### Steps
1. デスクトップ右上の WiFi アイコンをクリック
→ 高度なオプション...
→ 接続を編集する
2. WiFi カテゴリの中の `GL-MT3000-320-5G` をダブルクリック
- 上部タブから IPv4設定 を選ぶ
- Method: `自動(DHCP)` → `手動`
- アドレス欄の Add ボタンを押す
- アドレス: `192.168.8.$X`
- ホスト(今回の設定ならRaspberry Pi本体)のIPアドレス.**IPではなくIPアドレス.**
- `$X`は上記表を参照
- ネットマスク: `24` or `255.255.255.0`
- マスクされたアドレスが同じなら同じネットワーク内に居ると言う扱いである
- `24` とは「上位 `24` bitをマスクする」の意味.
- IPv4では,`32` bit のアドレス空間があり,通常 `8` bit (`0--255`) で区切った,
`255.255.255.255` のような表記をする.
- `24` ではなく,`255.255.255.0` という表記もよく用いられる.
- ゲートウェイ: `192.168.8.1`
- ネットワーク外に通信する際,問い合わせる先.通常,上位のネットワーク管理者により提供される.
- DNSサーバ:
- `192.168.8.1`
3. `wlan0` をいったん無効化して,また有効化する(再起動的なことをしたい)
- ターミナルを開く.
- 管理者権限(ルート権限)が必要なので,`sudo`を介して`ip`コマンドを実行する.
```bash
sudo ip link set wlan0 down # OSによっては `ipdown wlan0`
sudo ip link set wlan0 up # OSによっては `ipup wlan0`
- 参考:代替案として,GUIで「無線LANをオフにする」をしてから「無線LANをオンにする」でもよい
#### ネットワーク疎通確認 — ICMP echo, etc.
(1)
IPアドレスが変更されていて,かつ, `state UP` になっているか確認しよう
```bash
ip addr
```
(2)
デフォルトゲートウェイとネットワーク接続されているかどうか確認してみよう.
```bash=
ping 192.168.8.1
```
(3)
岡山大学のホームページサーバ (`www.okayama-u.ac.jp`) までたどり着けるか,途中経路も含めて確認してみよう.
```bash=
sudo apt install iputils-tracepath
tracepath www.okayama-u.ac.jp
tracepath www.okayama-u.ac.jp -n
```
### 時刻同期サービス (NTP) の設定
複数デバイスでのロギングを考える場合(特に,物理的な距離が離れているデバイス間),各デバイスが持つ時刻情報が「正確」であることが,しばしば重要になります.
> 想像してみよう.
> - デバイスAの内部時刻 `10:00:00` に収録したデータと,デバイスBの内部時刻 `10:00:00` に収録したデータは,実世界において全く同じ時刻の事象を標本化したデータであろうか?
> - 2つのデバイスのデータを集約するサーバは,そのサーバの内部時刻 `10:00:00` ちょうどにデバイスからデータを受け取った.そのデータは,いずれも実世界の `10:00:00` の事象を標本化したデータといえるだろうか?
>
> なお,本文で言及した「正確」とは,「真の時刻と一致する」とか「十分に近い時刻であること」ということではありません.
> - サーバ,デバイスA,デバイスBがいずれも実世界の真の時刻から **厳密に5秒** 遅れているとする.デバイスAの時刻 `10:00:00` で収録されたデータとデバイスBの時刻 `10:00:02` に収録されたデータを比較したとき,後者のデータのほうが後の時刻に録られたデータだと保証できるか?
> - サーバ,デバイスA,デバイスBがいずれも実世界の真の時刻から **$\pm 1.1$秒の誤差**で設定されているとする.デバイスAの時刻 `10:00:00` で収録されたデータとデバイスBの時刻 `10:00:02` に収録されたデータを比較したとき,後者のデータのほうが後の時刻に録られたデータだと保証できるか?
Raspberry Pi OS (Ubuntu OS の亜種) では,標準で `systemd-timesyncd` による時刻同期がなされています.このデーモンは *NTP (Network Time Protocol)* を用いて,インターネット上の時刻同期サーバ (NTPD) と通信し,時刻同期を実現しています.ただし,環境によってはNTPによる通信が上流で認められていないこともあります.例えば,岡山大学,です...
ということで,岡山大学内でNTPを利用する場合は,以下の設定を追加することで,岡山大学のNTPサーバと同期するようになります.
`nano` などを使って:
```bash=
# nano の起動 (root権限)
sudo nano /etc/systemd/timesyncd.conf
```
`/etc/systemd/timesyncd.conf` の以下の箇所を編集してください.
```
[Time]
NTP=ntpsrv.cc.okayama-u.ac.jp # <-- 先頭の#を外して,追記
#FallbackNTP=0.debian.pool (略)
```
`systemd-timesyncd` を再起動しましょう.(あるいは,OSの再起動でも可)
```bash=
# 再起動
sudo systemctl restart systemd-timesyncd
# 動作確認
sudo systemctl status systemd-timesyncd
```
> コラム
> 本講義の Raspberry Pi 5 では RTC (Real-Time Clock) モジュールを取り付けていますので,電源をOFFにした後でも時刻がずれにくくなっています.
### 諸々のアップデート
本講義で利用するソフトウェアがいくつかあるため,この段階でインストールしておこう.
```bash=
sudo apt update
sudo apt install openjdk-17-jre
sudo apt install python-is-python3 python3-venv
```
#### パッケージの確認
Debian/Ubuntu 系のディストリビューションでは `dpkg` コマンドでインストールされているパッケージを知ることができる.
`grep` コマンドも組み合わせれば,絞り込みも容易である.例えば:
```bash=
dpkg --list | grep jdk
```
※注:`grep` と `--list` の間の記号は「縦棒|」である.
---
## Python/AI開発基盤のインストール(Scikit-learn を想定)
<!--
> _ToDo List for Editors_
> - [x] ~~`uv` and pip で書きたい(はら)~~
> - 参考:https://www.jikken-b.cs.okayama-u.ac.jp/aisp/ready-winenv.html
> - 簡単のため,pyenv を利用する説明にします.'25.1.14 hara
> - [ ] Jupyter(Lab) は使う?
> - [ ] 動作確認のための簡単なプログラムの掲載
> - [x] いったん,pytorch は使わない方向で資料作成を進める
-->
サーバ側では主にPython言語を利用した開発を行います.
本講義では,Pythonネイティブで利用可能な `python-venv` を使って,**pythonの仮想環境** を構築することにします.
### 解説
Pythonのライブラリのいくつかは,バージョンによって使えるAPIと使えないAPIがあります.
そして,様々なライブラリが相互に利用しあう関係があります.
結果として,ライブラリ間の依存関係が保てないことがしばしば発生します.
(例:ライブラリAは`numpy>=2`を必要とするのに,ライブラリBは`numpy<2` を必要とする,など)
> コラム:
> メンテナンスを請け負うような業種のエンジニアになるのであれば,APIが `deprecated (非推奨)` かどうか,を常に公式ドキュメントで確認する癖をつけておくと,生産性の高いエンジニアと認識されるようになることでしょう...
この特性は機械学習ライブラリの多くで顕著に発生します.特に,最新の論文で報告される方式を利用する場合には,かなり厳しいバージョン制約がかかることが多いです.そのため「最先端の方式Aと方式Bを組み合わせる」という,一見簡単に聞こえる実装をするだけでも,様々に障害が起こりがちです.
そのため,Pythonを利用した機械学習プロジェクトを作る場合は,何らかの方法で**仮想環境**を構築することが一般的です.
プロダクトに近い段階,あるいは,何らかのツールを単独で使ってみたいだけ,であるならば `docker` や `lxc` などのコンテナを利用するのも手です.これらは,**コンテナ**としてホストの実OSとは隔離された空間で別のOSと所望のアプリケーションを実行します.ただし,開発段階でいろいろとライブラリを組み合わせたり,ライブラリそのものに手を入れる必要があるなど,研究的に試行錯誤する段階では,やや煩雑な手順が必要になることもしばしばです.
なお,本資料で紹介する`python-venv`では,pythonそのものはOSインストールされたバージョンを使います.
もしも,python自身のバージョンも管理する必要があるのであれば,`miniconda3 (conda-forge)`,`poetry`, `uv` などの利用を検討してください.
### Steps
#### 1. 作業ディレクトリの準備
ホームディレクトリの直下に,`pbl24`というディレクトリを作り,その中で作業することにしましょう.以降,このディレクトリを**作業ディレクトリ**と呼びます.
```bash
mkdir ~/pbl24
cd ~/pbl24 # 最初の3文字は PBL の小文字です
pwd # 出力例: /home/aiot24/pbl24
```
#### 2. pyenv の利用準備
作業ディレクトリで使うpythonの実行環境(python仮想環境)を準備します.
```bash
python3 -mvenv .venv --prompt pbl24
```
この環境のpython/pipを使う場合は,次のコマンドを最初に入力します.
```bash
source .venv/bin/activate
# または
source ~/pbl24/.venv/bin/activate
```
本講義においては,`~/.profile` にて `source` コマンドを記載しておいても良いでしょう.
**★Exercise★**
利用している python の実行ファイルの場所を確認しましょう.
```bash=
which python
```
新しいターミナルを開き,前述の `source` コマンド実行前の状態で,同じく `which` をおこなった場合の出力結果と比較してみるとよいでしょう.
#### 3. python仮想環境へのライブラリインストール
本講義で利用するライブラリを `pip` コマンドでインストールします.
事前に,上記の `source` コマンドで仮想環境を利用する状態にしてから実行してください.
```bash=
pip install 'numpy<2' pandas # Python & MLではしばしば使う
pip install kafka-python # 特に本講義で使う
```
---
## 大規模ログデータ集約システムの初期設定(Apache Kafkaを想定)
<!--
> _ToDo List for Editors_
> - [x] 先に `apt` の話が必要?
> - とりあえずは apt なしか.
> - [x] Kafka のインストール
> - https://kafka.apache.org/quickstart
> - KRaft ベースの話がいちばん簡単そう
> - 後のIoT関係の開発で docker/k8s などが必要になるなら docker ベースにしてもいいかも
-->
**重要:**
**本節ではターミナルへの文字入力が多い.ただし,その多くはコマンド名やファイル名である.**
**TABキーによる補完を活用して,打ち間違いや入力数を減らす工夫をしよう!!**
### Steps
#### 0. 確認
Java の実行環境 (JRE; Java Runtime Environment) が必要です.
`dpkg` コマンドや `which` コマンドで,`java` コマンドの存在を確認しましょう.
```bash=
dpkg --list | grep jre # openjdk-??-jre があるか確認
which java # --> /usr/bin/java
```
#### 1. 初期設定
- see also: https://kafka.apache.org/quickstart
最新版のURLは https://kafka.apache.org/downloads から確認してください.
2025年1月20日現在の最新版は,Kafka 3.9.0 (Prebuilt Binary for Scala 2.13) です.
```bash=
cd ~/pbl24
# ダウンロードとアーカイブ展開
curl -LO https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar xf kafka_2.13-3.9.0.tgz
```
```bash=
# 初期設定(ログストレージの準備)
cd kafka_2.13-3.9.0
export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
echo $KAFKA_CLUSTER_ID
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
```
#### 2. 動作確認
中継サーバ(Broker)を起動し,データの送信者(Producer)や受信者(Consumer)の動作をテストしておきます.
##### Broker
**ターミナル1にて**
```bash=
cd ~/pbl24/kafka_2.13-3.9.0
bin/kafka-server-start.sh config/kraft/reconfig-server.properties
```
以下,**ターミナル2にて**.
送るメッセージの topic(s) を決めておく.ここでは quickstart-events.`localhost:9092`は利用するサーバを指定.
```bash=
cd ~/pbl24/kafka_2.13-3.9.0
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
```
出力例:
```
Created topic quickstart-events.
```
##### Producer
実際にメッセージを送ってみよう.
```bash=
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
```
プロンプトが出てくるので,例えば以下を入れてみよう.
```
This is my first event
This is my second event
```
終了は `Ctrl+C`
##### Consumer
```bash=
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
```
終了は `Ctrl+C`
**★Exercise★**
`kafka-console-consumer` は,Consumer として受信し続ける動作になっている.
もう一つターミナルを立ち上げて,Consumer 実行中に Producer からデータを送り込んでみよう.
> コラム:
> LinuxではOS実行中に起動し続けるアプリケーションのことを daemon (デーモン)と呼ぶ.
> 他のシステム(例えば Windows OS)では,サービスと呼ぶこともある.
> 慣習上,daemon として動くことを意図したアプリケーションは,末尾に d を付けた実行ファイル名にすることが多い.
> 例:HTTPサーバ (Apache HTTPD) の実行ファイル名は `httpd` である.
ヒント: 2025.1.31追記
以下のコマンド群で,別ウィンドウでサーバ起動,別ウィンドウでconsumer起動ができる.
```bash=
lxterminal -e bin/kafka-server-start.sh config/kraft/reconfig-server.properties
lxterminal -e bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
```
<!-- **★Exercise★**
別端末の `consumer` に対して,データを送り付けてみよう.例えば,教員用のconsumer は `192.168.8.17:9092` で動いている(はず).
2025.1.20 13:00 このExerciseはパス.現状では,他ノードのデータを受け付ける設定ファイルになっていない.
-->
### PythonからのAPI利用
`python` のインタプリタでも動作確認してみよう.
```python3=
from kafka import KafkaConsumer
c = KafkaConsumer("quickstart-events", bootstrap_servers="localhost:9092", auto_offset_reset='earliest')
for m in c:
print(m, m.value)
```
このプログラムも同じく無限ループで受信し続ける.外部のproducerからデータを送り込んで挙動を確認してみよう.
> `auto_offset_reset='earliest'` を付けることで from-beginning と同じ動きになる.
> デフォルトは最新のみを意味する `latest` 指定.
### Kafkaのまとめ
Kafka のインストールと動作確認は以上です.
> コラム:
> Kafkaのログデータ保存場所は config ファイルの `log.dirs` に書いてある.
> 今回手順であれば,`config/kraft/reconfig-server.properties` に基づいているため,そのファイルから `/tmp/kraft-combined-logs` に存在するということがわかる.
> 実際に:
> - `less` コマンドで設定ファイルを確認
> - `/` で `dirs` などの文字を検索
> - `q` で終了
> - `ls` コマンドでログディレクトリの中身を確認
>
> をしてみるとよい.
---
## IoTセンサで Pub/Sub
ArduinoシリーズやM5Stackシリーズは、その多くがApache Kafkaのプロトコルに対応していないため、MQTTを使う必要があります。そこで、MQTTブローカーと、MQTTとApache Kafkaを繋ぐブリッジ・プロキシを用意してみましょう。
> ### MQTTブローカー
> - **Eclipse Mosquitto**
> - https://mosquitto.org/
> - シングルスレッドアーキテクチャ
> - 軽量・シンプルな構成
> - **EMQX**
> - https://www.emqx.com/ja/blog/mqtt-and-kafka
> - マスターレス分散アーキテクチャを採用
> - 大規模・高可用性・スケーラブルなシステムに有効
>
> ### MQTT-Kafkaブリッジ・プロキシ
> - **Confluent MQTT Proxy**
> - https://docs.confluent.io/platform/current/kafka-mqtt/intro.html
> - Confluent社が開発するProxy
> - **strimzi-mqtt-bridge**
> - https://github.com/strimzi/strimzi-mqtt-bridge
> - Cloud Native Computing Foundation のプロジェクト
> - Javaベース
> - **mqtt-to-kafka-bridge**
> - https://github.com/nodefluent/mqtt-to-kafka-bridge
> - Node.jsベース
### MQTTブローカーを導入する
今回はシンプルに使える Eclipse Mosquitto でセットアップしてみましょう。
1. インストール
```bash=
sudo apt install -y mosquitto mosquitto-clients
```
2. 有効化
```bash=
sudo systemctl enable mosquitto.service
# # または
# mosquitto -d #デーモンとして起動
```
3. 起動状態を確認
```bash=
sudo systemctl status mosquitto.service
# # または
# mosquitto -v
```
4. リモートからのアクセスを許可する
ここでは匿名での通信を許可する方法でセットアップしますが、ユーザ・パスワード認証を要求する設定も可能です。
```bash=
sudo nano /etc/mosquitto/mosquitto.conf
```
下記の箇所を修正する(ポートの設定と匿名でのアクセスの許可)
```
listener 1883
allow_anonymous true
```
5. Mosquittoを再起動する
```bash=
sudo systemctl restart mosquitto.service
```
6. Raspberry Pi のIPアドレスを確認する
```bash=
hostname -I
```
7. 動作テスト
- Subscriberを起動する(`ip_addr`はRaspberry Pi のIPアドレス)
```bash=
mosquitto_sub -h {ip_addr} -p 1883 -t "test/hello"
```
- Publisherでメッセージを送信する
```bash=
mosquitto_pub -h {ip_addr} -p 1883 -t "test/hello" -m "Hello, world!"
```
### MQTT-Kafkaブリッジ・プロキシを導入する
#### `mqtt-to-kafka-bridge` の場合
センサ → MQTT → Kafka という一方向のみのストリームの場合、こちらは簡単に使える。(逆方向はサポートされてないっぽい?)
1. Node.js
- Node Version Manager (NVM)
```bash=
wget -qO- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.1/install.sh | bash
```
- insall Node.js
```bash=
nvm install --lts
```
- check Node.js version
```bash=
node -v
# v22.13.0
```
- update npm (Node Package Manager)
```bash=
npm -v # --> 10.9.2
npm update -g npm
npm -v # --> 11.0.0 (updated)
```
2. Githubのリポジトリをcloneする(※zipダウンロードでもOK)
```bash=
git clone git@github.com:nodefluent/mqtt-to-kafka-bridge.git
# または
git clone https://github.com/nodefluent/mqtt-to-kafka-bridge.git
```
3. ライブラリをインストール
```bash=
cd mqtt-to-kafka-bridge/
npm install
```
たぶん、色々とエラーが出ると思うので、指示に従って依存関係を解決するためのコマンドを実行する
```bash=
npm audit fix
```
Node.js の Apache Kafka 関係のライブラリ(`node-rdkafka`)を追加でインストールする
```bash=
npm install --save node-rdkafka
```
4. 実行ファイルを準備する
サンプルをコピーしてちょっと変更する。
```bash=
mv example/sample.js ./mqtt-kafka-bridge.js
mv example/config.js ./config.js
rm -r example
```
`mqtt-kafka-bridge.js` を修正
```javascript=
"use strict";
const Bridge = require("./index.js");
let config = require("./config.js");
const bridge = new Bridge(config);
bridge.on("error", console.error);
bridge
.run()
.catch(console.error);
```
5. 設定ファイルを準備する
`config.js`
```javascript=
// --- 前略 ---
mqtt: { // # see https://github.com/mqttjs/MQTT.js#mqttclientstreambuilder-options
url: null,
options: {
clientId: "mqtt-kafka-bridge",
host: "localhost",
port: 1883,
protocolId: "MQTT",
protocolVersion: 4,
}
},
// --- 中略 ---
routing: {
//"*": "*", // from all to all (indiviudally 1:1)
//"*": "kafka-test", // from all to single kafka-test topic
//"mqtt-topic": "kafka-topic", // from mqtt-topic to kafka-topic only
"*": "quickstart-events"
},
// --- 後略 ---
```
6. いざ実行
```bash=
node mqtt-kafka-bridge.js
```
何も表示されないけどそれでOK。
<!--
#### `strimzi-mqtt-bridge` の場合(未確認)
1. Githubのリポジトリをcloneする
```bash=
git clone git@github.com:strimzi/strimzi-mqtt-bridge.git
# または
git clone https://github.com/strimzi/strimzi-mqtt-bridge.git
```
2. 設定ファイルを準備する
- Topic Mapping Rules (ToMaR)
- `config/topic-mapping-rules.json`
- MQTTとKafkaのトピックのマッピングを定義するファイル
- Bridge Configuration
- `config/application.properties`
- MQTTとKafkaそれぞれの接続先などの基本情報を定義するファイル
3. 以降は未確認
-->
---
## まとめ
以下のコマンドで Raspberry Pi をシャットダウンしてください.
```bash=
sudo shutdown -h now
```
<br>
---
下記、解説資料です。
---
## Pub/Subモデル
Apache Kafka や MQTT はPub/Subモデル(出版−購読型モデル)を採用している。
### Pub/Subモデルの特徴および利点・欠点
**Pub/Subモデル (Publish/Subscribeモデル)** は、情報発信者(Publisher)と情報受信者(Subscriber)が直接やり取りを行わず、メッセージブローカー(Broker)を介して通信する方式である。
#### 特徴
Pub/Subモデルは、その名にもあるとおり「出版社」の例で考えるとわかりやすい。世の中に出回っている本は「出版社」が直接「購読者」に売られることはなく、その多くは「本屋」を仲介することが多いだろう。出版社は「誰が買うか?」を認識することなく本を売ることができるし、購読者は「どの出版社の本か?」を意識しなくても本を買うことができる。また、それぞれの行動は同期する必要がない。この形態を通信方式に応用したのが Pub/Subモデルである。特徴は次のとおりである。

1. **疎結合 (Loosely Coupled)**
HTTPはクライアントとサーバーが直接通信するため、送信元・送信先が必ず「対」になっている必要があるため、1対1通信が基本となる。一方で、Pub/Subモデルでは、Broker(仲介者)がメッセージを中継するため、Publisher(送信者)とSubscriber(受信者)が互いを意識する必要がないことが大きな特徴である。つまり、Publisherは受信者の数や存在を気にせずメッセージを送信できる。
2. **非同期通信**
Pub/SubではSubscriberがメッセージを非同期に受け取ることができるため、リアルタイム性が求められるシステムや高可用性システムに適している。HTTPは同期通信が基本であり、リクエストに対してレスポンスを待つ必要がある。
---
#### 利点
上記の特徴から、HTTPなどの通信方式と比較して、次のような利点がある。
1. **スケーラビリティ**
Brokerが処理を管理するため、Pub/Subは多くのPublisherやSubscriberがいても性能を維持しやすい。また、1:N通信といった複数のSubscriberにデータを送りたい場合でも送信側は一度データをBrokerに送信するだけでよく、多対多の通信に適している。これに対し、HTTPはリクエスト数の増加に伴いサーバーの負荷が高まる。
2. **可用性と信頼性**
Brokerがメッセージの再送や永続化をサポートすることで、信頼性の高い通信を実現できる。これに対し、HTTPでは失敗したリクエストを再送する機能をアプリケーション側で実装する必要がある。
3. **リアルタイム通知**
Pub/Subはリアルタイム通知システム(例:チャットアプリ、株価更新)に最適である。これに対し、HTTPはポーリングや長時間接続を必要とするため、効率が悪い場合がある。
---
#### 欠点
1. **構成の複雑さ**
Brokerを導入する必要があるため、システム構成が複雑になる。一方で、HTTPはシンプルなクライアント-サーバーモデルで実装が容易である。
2. **デバッグとモニタリングの難しさ**
メッセージがBrokerを介して流れるため、問題発生時に原因を特定するのが困難な場合がある。
---
### MQTTとKafkaなど異なるPub/Sub方式の比較
Pub/Subモデルにはさまざまな実装方式があり、MQTTやKafkaはそれぞれ異なる特性を持つ。
#### 1. MQTT (Message Queuing Telemetry Transport)
- **特徴**
軽量でシンプルなプロトコルであり、主に低帯域幅や不安定なネットワーク環境で動作するIoTデバイス向けに設計されている。トピックベースでメッセージを配信する。
- **利点**
- 低消費電力で通信できるため、バッテリー駆動のデバイスに適している。
- QoS(Quality of Service)レベルを設定し、メッセージ配信の信頼性を制御可能。
- **欠点**
- 高スループットを求めるシステムには適さない。
- メッセージの永続化や高度な分析には不向きである。
#### 2. Kafka
- **特徴**
分散型メッセージングシステムであり、ログデータの収集やリアルタイム分析に最適である。トピックごとにメッセージを永続化し、高速なデータ処理が可能である。
- **利点**
- 高スループットと耐障害性を持つ。
- メッセージが永続化されるため、後で再処理が可能である。
- ビッグデータ処理やストリーム処理システム(例:Apache Spark、Flink)と統合しやすい。
- **欠点**
- セットアップが複雑であり、運用管理にコストがかかる。
- レイテンシが重要なシステムではオーバーヘッドが発生する場合がある。
---
## 比較表
| 特徴 | MQTT | Kafka |
|---------------------|-------------------------------------|----------------------------|
| 用途 | IoT、低帯域幅の環境 | ビッグデータ、ログ解析 |
| 軽量性 | 高い | 低い |
| メッセージ永続化 | オプション | デフォルトでサポート |
| スループット | 低~中 | 高 |
| リアルタイム性 | 高い | 高い |
| 管理の複雑さ | 低い | 高い |
- **MQTT**はセンサーやIoTデバイスなどリソースが限られた環境に適している。
- **Kafka**は大量のデータ処理や分析が必要な企業システムに適している。
それぞれの特性を理解し、用途に応じて適切な方式を選択することが重要である。
------
------
## 次回講義資料
https://hackmd.io/@yukimat/fullstack-2024-2
------
------
config.js
```
"use strict";
module.exports = {
// mqtt connection options
mqtt: { // # see https://github.com/mqttjs/MQTT.js#mqttclientstreambuilder-options
url: null,
options: {
clientId: "mqtt-kafka-bridge",
host: "localhost",
port: 1883,
protocolId: "MQTT",
protocolVersion: 4,
}
},
// kafka connection options
kafka: { // # see https://github.com/nodefluent/node-sinek/blob/master/lib/librdkafka/README.md
logger: undefined,
noptions: {
//"debug": "all",
"metadata.broker.list": "localhost:9092",
"client.id": "mqtt-bridge-example-client",
"event_cb": true,
"compression.codec": "none",
"retry.backoff.ms": 200,
"message.send.max.retries": 10,
"socket.keepalive.enable": true,
"queue.buffering.max.messages": 100000,
"queue.buffering.max.ms": 1000,
"batch.num.messages": 1000000,
//"security.protocol": "sasl_ssl",
//"ssl.key.location": path.join(__dirname, "../certs/ca-key"),
//"ssl.key.password": "nodesinek",
//"ssl.certificate.location": path.join(__dirname,"../certs/ca-cert"),
//"ssl.ca.location": path.join(__dirname,"../certs/ca-cert"),
//"sasl.mechanisms": "PLAIN",
//"sasl.username": "admin",
//"sasl.password": "nodesinek",
"api.version.request": true,
},
tconf: {
"request.required.acks": 1,
}
},
// declares on which target kafka topic a mqtt message should be routed to (based on the mqtt topic)
routing: {
//"*": "*", // from all to all (indiviudally 1:1)
//"*": "kafka-test", // from all to single kafka-test topic
//"mqtt-topic": "kafka-topic", // from mqtt-topic to kafka-topic only
"*": "quickstart-events"
},
// if routed messages should be logged to debug
logMessages: true,
// declares how an mqtt topic name should be split (/) to fit to the kafka topic naming conventions
kafkaTopicDelimiter: "-",
// gives you the option to alter mqtt messages before they are consumed (routed)
subscribeEtl: (topic, message, packet, callback) => {
// first param is an error, if you pass one, we will omit the message
callback(null, {
topic,
message,
});
},
// gives you the option to alter kafka messages before they are produced
produceEtl: (topic, message, key, callback) => {
// first param is an error, if you pass one, we will omit the message
callback(null, {
topic,
message, // you can pass an object, will be turned into a string
key, // default uuid.v4
partition: null, // default null
});
},
// the bridge starts an http server
http: {
port: 3967,
},
};
```
mqtt-kafka-bridge.js
```
"use strict";
const Bridge = require("./index.js");
let config = require("./config.js");
const bridge = new Bridge(config);
bridge.on("error", console.error);
bridge
.run()
.catch(console.error);
```