# 專題
# CI
- install CI
- 教學 : https://www.tecmint.com/install-codeigniter-in-centos-7/
- 實際步驟 :
1. `curl -sS https://getcomposer.org/installer | php`
2. `mv composer.phar /usr/local/bin/composer`
3. `sudo mkdir /usr/local/bin/composer`
4. `chmod +x /usr/local/bin/composer`
5. `sudo mkdir /var/www/ci`
6. `cd /var/www/ci`
7. `git clone https://github.com/bcit-ci/CodeIgniter.git .`
8. `/usr/local/bin/composer/composer.phar install`
- 如果有報錯可是要加上參數 : `--ignore-platform-req`
9. `chown -R apache:apache /var/www/ci/`
10. 先把 url 設為 localhost,到時候要換成真實 ip :
- `vi /var/www/html/application/config/config.php` : `$config['base_url'] = 'http://127.0.0.1';`
11. 設定 db(先去建一個 database and user) : `vi /var/www/html/application/config/database.php`
```config=
$db['default'] = array(
'dsn' => '',
'hostname' => 'localhost',
'username' => 'code_db',
'password' => 'password',
'database' => 'code_db',
'dbdriver' => 'mysqli',
```
12. 開啟 localhost/index.php
- install php on apache
- 教學 : https://computingforgeeks.com/install-apache-mariadb-php-on-rocky-linux/
- 實際步驟 :
1. setenforce 0 (暫時可以,每次重開機都要再下一次)
- `vi /etc/sysconfig/selinux`:SELINUX=disabled。(重開機生效)
2. `sudo yum install httpd`
3. `sudo service httpd start`
4. `sudo yum install mariadb-server mariadb`
5. `sudo service mariadb start`
6. `sudo yum install php-{common,gmp,fpm,curl,intl,pdo,mbstring,gd,xml,cli,zip,mysqli}`
7. `sudo yum module list php` (先看一下目前安裝的 php module)
8. `sudo yum module enable php:8.1 -y`
- 這行的 php 是第七步驟的 Name,8.1 是第七步驟的 Stream。可能依據版本會有不同,要自己看一下。
9. `sudo yum install php-{common,gmp,fpm,curl,intl,pdo,mbstring,gd,xml,cli,zip,mysqli}`
10. `php --version` : 看一下有沒有成功
11. `sudo vi /etc/php.ini` :
```config=
max_execution_time = 300
upload_max_filesize = 100M
post_max_size = 128M
date.timezone = "Asia/Taipei"
```
12. `sudo vi /etc/httpd/conf/httpd.conf` :
- 在 `# LoadModule foo_module modules/mod_foo.so
AddHandler php-script .php` 這行下面新增 :
- `AddHandler php-script .php`
- 新增
```config=
<Directory "/var/www/ci">
Require all granted
</Directory>
```
- 修改
```config=
<IfModule dir_module>
DirectoryIndex index.php
</IfModule>
```
13. `sudo systemctl restart httpd`
14. `sudo systemctl restart php-fpm`
- 備註 : 有問題請去 `/var/log/httpd/error_log` 找。
- connect to database
1. 先建好資料庫(這邊是 ci_test)、使用者
- `sudo mariadb`
- `create database ci_test`
- `use ci_test`
- `create table member(id INT PRIMARY KEY AUTO_INCREMENT, name varchar(10));`
- `insert into member(name) values('tommy');`
- 使用者我懶得打
2. `sudo vi application/config/database.php`
```php=
$db['default']['hostname'] = 'localhost';
$db['default']['username'] = 'myusername';
$db['default']['password'] = 'mypassword';
$db['default']['database'] = 'mydatabase';
$db['default']['dbdriver'] = 'mysqli';
$db['default']['dbprefix'] = '';
$db['default']['pconnect'] = TRUE;
$db['default']['db_debug'] = TRUE;
$db['default']['cache_on'] = FALSE;
$db['default']['cachedir'] = '';
$db['default']['char_set'] = 'utf8';
$db['default']['dbcollat'] = 'utf8_general_ci';
$db['default']['swap_pre'] = '';
$db['default']['autoinit'] = TRUE;
$db['default']['stricton'] = FALSE;
```
3. `sudo vi models/Member.php` : 檔名第一個字要大寫
```php=
<?php
class Member extends CI_Model {
public function __construct() {
$this->load->database();
}
public function get_data($slug = FALSE) {
if ($slug == FALSE) {
// member table
$query = $this->db->get('member');
return $query->result_array();
}
// member table
$query = $this->db->get_where('member', array('slug' => $slug));
return $query->row_array();
}
}
?>
```
4. `sudo vi controllers/Data.php`
```php=
<?php
class Data extends CI_Controller {
public function index() {
$this->load->model('member');
$data['member'] = $this->member->get_data(); // 回傳 Member.php 的 get_data() 的 value
$this->load->view('header', $data); // view 裡有 header.php
}
}
?>
```
5. `sudo vi views/header.php`
```php=
<html>
<h1>
test
</h1>
<?php
foreach ($member as $item) {
echo $item['id'];
echo $item['name'];
}
?>
</html>
```
## note
- 快速建一個
- `sudo vi /var/www/ci/controllers/Test.php` : 檔名第一個字要是大寫
```php=
<?php
// class name 第一個字母要是大寫
class Test extends CI_Controller {
public function index() {
echo "test";
}
public function hello() {
echo "test1";
}
}
?>
```
- 開啟 http://localhost/index.php or http://localhost/index.php/hello
- 把 url 的 index.php 拿掉 :
- 先在和 index.php 同一層(application)建一個檔案 `vi .htaccess` : (最後一行打自己的 index.php 路徑)
```config=
RewriteEngine on
RewriteCond $1 !^(index\.php|images|robots\.txt)
RewriteRule ^(.*)$ /var/www/ci/index.php/$1 [L]
```
- `sudo vi /var/www/ci/config/config.php`
- 修改 :
```=
$config['index_page'] = '';
```
- `sudo vi /etc/httpd/conf/httpd.conf`
- 修改
```config=
<Directory "/var/www/ci">
AllowOverride All
Required all granted
</Directory>
```
- 修改(全域還有一個)
```config=
AllowOverride All
```
- `sudo systemctl restart httpd`
- 開啟 http://localhost/test
- controller 的 class name 第一個字一定要大寫
- 前端 javascript 接收後端資料
```javascript=
<script>
var data = `
<?php
foreach ($member as $item) {
echo $item['id'];
echo $item['name'];
}
?>
`
console.log(data);
</script>
```
- 有用教學 :
- https://ithelp.ithome.com.tw/articles/10186538construct
- controller 的 example, Data.php
```php=
public function __construct() {
parent::__construct();
$this->load->model('member'); // 若這邊有先建好,下面應該就都不用再寫一次
}
public function index() { // 查看
$this->load->model('member'); // 載入 models/Member.php
$data['member'] = $this->member->get_data(); // 呼叫 models/Member.php 的 get_data()
$this->load->view('header', $data); // 傳入並顯示 views/header.php
}
public function create() { // 主要的 form function
$this->load->model('member'); // 載入 database
$data['member'] = $this->member->get_data(); // 呼叫 models/Member.php 的 get_data()
$this->load->view('header', $data); // 呼叫 views/header.php,把 $data 傳進去
// form
$this->load->helper('form');
$this->load->library('form_validation');
$this->form_validation->set_rules('name', '內容', 'required');
if ($this->form_validation->run === FALSE) { // 通過 form 認證
if ($this->input->post('submit') == 'delete' && $this->input->post('del_id')) { // 刪除
$this->del();
}
if ($this->input->post('submit') == "update" && $this->input->post('update_id')) { // 更新
$this->update();
}
}
else {
$this->member->set_data();
redirect($_SERVER('HTTP_REFERER')) // 重整頁面
// $this->load->view('success') 載入成功頁面
}
}
public function update() { // 更新
$this->load->model('member');
$this->member->update_data();
redirect($_SERVER['HTTP_REFERER']); // 重整頁面
}
public function delete() { // 刪除
$this->load->model('member');
$this->member->delete_data();
redirect($_SERVER['HTTP_REFERER']); // 重整頁面
}
```
- views 的 example, header.php
```php=
<html>
<h1>
todolist
</h1>
<div id = 'data'>
<div>
代辦清單
</div>
<div>
id content
</div>
<?php
foreach ($member as $item) {
echo "<div>";
echo "{$item['id']} {$item['name']}"; // 變數放字串要用大括號
echo "</div>";
// echo implode(" ", $item); 可以直接把 array 印出來
// print_r(array_keys($item)); 印出 array 的 key
}
?>
<?php
$this->load->library('form_validation');
echo validation_errors();
?>
<?php echo form_open('http://127.0.0.1/data/create');
?>
<div>
新增事項
</div>
<input type = "input" name = "name"></input>
<input type = "submit" name = "submit"> </input>
<div>
刪除事項
</div>
<input type = "input" name = "del_id"> </input>
<input type = "submit" name = "submit" value = "delete"></input>
<div>
更新事項
</div>
<input type = "input" name = "update_id"> </input>
<input type = "input" name = "update_content"></input>
<input type = "submit" name = "submit" value = "update"></input>
</div>
<script>
var get_php_value = `
<?php
foreach ($member as $item) {
echo $item['id'];
echo $item['name'];
}
?>
`
console.log(get_php_value);
</script>
</html>
```
- model 的 example, Member.php
```php=
<?php
class Member extends CI_Model {
public function __construct() { // 先載入資料庫模組
$this->load->database();
}
public function get_data($slug = FALSE) { // 找資料
if ($slug == FALSE) {
$query = $this->db->get('member'); // select * from member;
return $query->result_array();
}
$query = $this->db->get_where('member', array('slug' => $slug));
return $query->row_array();
}
public function set_data() { // 插入資料
$this->load->helper('url');
$data = array(
'name' => $this->input->post('name')
);
return $this->db->insert('member', $data); // insert into member
}
public function del_data() { // 插入資料
$this->load->helper('url');
$data = array(
'id' => $this->input->post('del_id')
);
$this->db->where('id', $data['id']); // which deleted id
return $this->db->delete('member'); // delete from member
}
public function update_data() {
$this->load->helper('url');
$data = array(
'id' => $this->input->post('update_id'),
'name' => $this->input->post('update_content')
);
$this->db->where('id', $data['id']); // set condition
$this->db->update('member', $data); // update content
}
}
?>
```
## smarty
- 安裝 : https://n.sfs.tw/content/index/10508
- `cd application/libraries`
- `wget https://github.com/smarty-php/smarty/archive/v3.1.27.zip`
- `unzip v3.1.27.zip`
- `rm v3.1.27.zip`
- `cd application/cache`
- `mkdir scache`
- `mkdir templates_c`
- `chmod 777 scache`
- `chmod 777 templates_c`
- `vi application/libraries/Base.php`
```php=
<?php
defined('BASEPATH') OR exit('No direct script access allowed');
class Base {
var $v;
public function __construct()
{
include APPPATH. "libraries/smarty-3.1.27/libs/Smarty.class.php";
$view = new Smarty();
// 設定 Smarty 參數
$view->left_delimiter = '{{';
$view->right_delimiter = '}}';
$view->setTemplateDir(APPPATH . 'views');
$view->setCompileDir(APPPATH . 'cache/templates_c');
$view->setCacheDir(APPPATH . 'cache/scache');
// $view->setConfigDir(APPPATH . 'views/config');
$view->compile_check = true;
$view->force_compile = true;
$view->caching = false;
$view->cache_lifetime = 86400;
$this->v= $view;
}
}
```
- `vi application/controllers/Index.php`
```php=
<?php
defined('BASEPATH') OR exit('No direct script access allowed');
class Index extends CI_Controller {
public function __construct() {
parent::__construct();
$this->load->library('base');
}
public function index()
{
$this->base->v->assign('t','TEST');
$this->base->v->display("test.html"); # 會抓 views 的
}
}
```
- `vi views/test.html`
```html=
<html>
{{$test}}
</html>
```
- 語法 :
- 使用變數 :
- controller :
```php=
$this->base->v->assign('t','TEST');
$this->base->v->display("test.html"); # 會抓 views 的
```
- html : `{{$test}}`
- json_encode :
- `{{$test|json_encode}}`
- foreach :
- controller :
```php=
$data['member'] = $this->member->search_data();
$this->base->v->assign('member',$data);
$this->base->v->display('search.html');
```
- html :
```php=
{{foreach $member as $item}}
<tr>
<td>{{$item.0.id}}</td>
</tr>
{{/foreach}}
```
- 查看版本 : `{{$smarty.version}}`
# PHP
- echo 變數在字串裡要加 {}
- ex. `echo "<div>{$item['name']}</div>";`
- Array to string conversion : 把 array 當成字串用
- solution :
```php=
<?php
echo json_encode($member);
?>
```
- foreach get index
```php=
<?php
foreach($member as $key=>$value) {
echo $key; # index
echo $value; # value
}
?>
```
# Redis Server
- 概念
- 為 key-value 的非關聯式資料庫,且儲存在 memory,所以讀寫速度非常快。
- 但存 memory 的缺點是關機會不見,所以最好還是要再另外存到其他關連式資料庫
- 安裝,開啟
- `sudo yum install redis`
- `sudo systemctl start redis`
- `redis-cli`
- 教學
- https://ithelp.ithome.com.tw/articles/10249583#:~:text=%E5%AE%89%E8%A3%9DRedis%201%20Windows%20%E5%AE%89%E8%A3%9DRedis%20%E5%88%B0%20Github%E4%B8%8B%E8%BC%89%20.msi%E5%AE%89%E8%A3%9D%E6%AA%94%E6%A1%88%EF%BC%8C%20%E6%8E%A5%E8%91%97%E4%B8%8B%E4%B8%80%E6%AD%A5%E3%80%81%E5%8B%BE%E9%81%B8%E5%8A%A0%E5%85%A5PATH%E7%92%B0%E5%A2%83%E8%AE%8A%E6%95%B8,docker%20exec%20-it%20redis_test%20%2Fbin%2Fbash%20%23%20redis-cli%20
- 設值
- `set variable_name value`
- ex. `set pig fat`
- 設定時變數
- ex. 十秒後刪除 : `SET test1 "123" EX 10`
- 拿值
- `get variable_name`
- 加減變數
- 加 50 : ex. `INCRBY test1 50`
- 減 50 : ex. `DECRBY test1 50`
- 查看變數長度
- `STRLEN varaiable_name`
- 直接把字串加在變數後面
- `append variable_name value`
- list
- 新增 list
- ex. 把 value 放進 list_name :`LPUSH list_name value value1`
- 刪除
- 右刪除 : `rpop list_name`
- 左刪除 : `lpop list_name`
- 列出值
- 從 0 ~ -1(所有值) : `lrange list_name 0 -1`
- set(集合),可設 key、value
- 建一個 set
- `zadd set_name value key`
- ex. `zadd score 99 stu1`
- 列出
- 從 0 ~ -1(所有值) : `zrange set_name 0 -1`
- 找單一個
- `zscore set_name key`
- ex. `zscore score stu1`
# Mosquitto
- 安裝 : https://weirenxue.github.io/2021/06/19/mqtt_centos_7/
- `sudo yum install mosquitto -y`
- 若沒有就 : `sudo yum -y install epel-release`
- `sudo systemctl start mosquitto`
- 訂閱 : `mosquitto_sub -h ip_address -t channel_name`
- ex. `mosquitto_sub -h 172.20.10.9 -t Rocky`
- 發送 : `mosquitto_pub -h ip_address -t channel_name -m "message content"`
- ex. `mosquitto_pub -h 172.20.10.9 -t Rocky -m "test"`
- (Rocky Linux)設定讓可以不只能用 localhost,也可用 ip address 連
- `sudo vi /etc/mosquitto/mosquitto.conf`
- 找到有一行只有 listener 的,改成
- `listener 1883 ip_address`
- 
- `allow_anonymous true`
- 
- `sudo systemctl restart mosquitto`
- 一定要 restart,不能只用 reload
- `sudo firewall-cmd --permanent --add-port=1883/tcp`
- 刪除是 : `sudo firewall-cmd --permanent --remove-port=1883/tcp`,一樣要 reload
- `sudo firewall-cmd --reload`
- demo

## connect to php
- 安裝
- `composer require php-mqtt/client`
- 如果權限不夠又沒安裝過 composer 可以 `su -` 進 root 安裝 composer
- `sudo vi test.php`
```php=
<?php
require('vendor/autoload.php');
use \PhpMqtt\Client\MqttClient;
use \PhpMqtt\Client\ConnectionSettings;
$server = '172.20.10.8'; // ip
$port = 1883;
$clientId = rand(5, 15);
$username = 'emqx_user';
$password = null;
$clean_session = false;
$connectionSettings = new ConnectionSettings();
$connectionSettings
->setUsername($username)
->setPassword(null)
->setKeepAliveInterval(60) // Last Will 設定
->setLastWillTopic('emqx/test/last-will')
->setLastWillMessage('client disconnect')
->setLastWillQualityOfService(1);
$mqtt = new MqttClient($server, $port, $clientId);
$mqtt->connect($connectionSettings);
// 發布 (topic_name, context)
$mqtt->publish('Rocky', 'Hello World!', 0);
// 訂閱 (topic_name)
$mqtt->subscribe('Rocky', function ($topic, $message) {
printf("Received message on topic [%s]: %s\n", $topic, $message);
}, 0);
// 持續訂閱
$mqtt->loop(true);
// 放開連線
$mqtt->disconnect();
```
- demo :

- 教學
- https://tw511.com/a/01/38137.html
- https://github.com/php-mqtt/client
## use in codeigniter
- 先照上面的安裝,確定可以跑一個測試的 mqtt 在 ci
- code :
- backend api :
- `sudo vi controllers/Test.php`
```php=
<?php
// 如果發布和訂閱要寫在同一支 controller 裡面,use 就統一在最上面寫一次即可
use \PhpMqtt\Client\MqttClient;
use \PhpMqtt\Client\ConnectionSettings;
class Test extends CI_controller {
public function __construct() {
parent::__construct();
}
public function index() { // get view
$this->load->view('mqtt_main');
}
// publish
public function sendMqtt() {
require('vendor/autoload.php');
try {
//$requestData = json_decode(file_get_contents('php://input'), true);
$test_a = $this->input->post('a');
//$all_input = $this->input->post();
$server = '172.20.10.8';
$port = 1883;
$clientId = rand(5, 15);
$username = 'emqx_user';
$password = null;
$clean_session = false;
$connectionSettings = new ConnectionSettings();
$connectionSettings
->setUsername($username)
->setPassword(null)
->setKeepAliveInterval(60)
->setLastWillTopic('emqx/test/last-will')
->setLastWillMessage('client disconnect')
->setLastWillQualityOfService(1);
$mqtt = new MqttClient($server, $port, $clientId);
$mqtt->connect($connectionSettings, $clean_session);
//printf("client connected\n");
$mqtt->publish('Rocky', $test_a, 0);
$mqtt->disconnect();
var_dump($test_a);
}
catch(e) {
var_dump($e);
}
}
// subscribe, get message from mqtt
public function getMqtt() {
require('vendor/autoload.php');
try {
$server = '172.20.10.8';
$port = 1883;
$clientId = rand(5, 15);
$username = 'emqx_user';
$password = null;
$clean_session = false;
$connectionSettings = new ConnectionSettings();
$connectionSettings
->setUsername($username)
->setPassword(null)
->setKeepAliveInterval(60)
->setLastWillTopic('emqx/test/last-will')
->setLastWillMessage('client disconnect')
->setLastWillQualityOfService(1);
$mqtt = new MqttClient($server, $port, $clientId);
// 如果超過 10 秒沒有收到來自 mqtt server 的訊息,就先結束這次連線,不然會 gateway timeout
$mqtt->registerLoopEventHandler(function (MqttClient $mqtt, float $elapsedTime) {
if ($elapsedTime >= 10) {
//print "Killing since we never got a topic!";
$mqtt->interrupt();
}
});
$mqtt->connect($connectionSettings, $clean_session);
// 訂閱頻道
$mqtt->subscribe('Rocky', function ($topic, $message) use ($mqtt) {
// 印出收到的訊息
printf($message);
// 收到一次訊息後就結束 loop,才能回傳收到的訊息
exit();
}, 0);
$mqtt->loop(true); // 持續監聽 mqtt server
$mqtt->disconnect();
}
catch(e) {
var_dump($e);
}
}
}
?>
```
- fronted
- `sudo vi views/mqtt_main.php`
```javascript=
<html>
<div id = 'mqtt_data_pos'></div>
<input type = 'text' id = 'pass_mqtt_msg'></input>
<button id = 'send_mqtt_bt'>send_mqtt</button>
<script src = 'https://cdnjs.cloudflare.com/ajax/libs/axios/0.27.2/axios.min.js' ></script>
<script>
document.getElementById('send_mqtt_bt').addEventListener('click', postMqttData);
function getId(id) {
return document.getElementById(id);
}
// 送出訊息到 mqtt server
async function postMqttData() {
// 要用 formdata format 後端比較好解讀資料
var test = new FormData();
var pass_data = getId('pass_mqtt_msg').value;
test.append('a', pass_data);
var a = await axios.post('/test/sendMqtt', test);
console.log(a.data);
}
// 向 api 拿 data of mqtt server
async function getMqttData() {
var mqtt_msg = await axios.get('/test/getMqtt');
if (mqtt_msg.data != '\n') {
getId('mqtt_data_pos').innerHTML += mqtt_msg.data + '<br/>';
}
}
// loop to get the data from mqtt
async function getDataLoop() {
while (true) {
await getMqttData();
}
}
getDataLoop();
</script>
</html>
```
- open 127.0.0.1/test on browser
- demo

# Kafka
## 安裝
- 安裝教學 : https://www.atlantic.net/vps-hosting/how-to-install-apache-kafka-on-rocky-linux-8/
- 安裝 :
- `sudo dnf update -y`
- `sudo dnf install java-11-openjdk-devel -y`
- `wget https://dlcdn.apache.org/kafka/3.3.2/kafka_2.12-3.3.2.tgz`
- 自己去當前版本 : https://dlcdn.apache.org/kafka
- `tar -xvzf kafka_2.13-3.0.0.tgz`
- `mv kafka_2.13-3.0.0/ /usr/local/kafka`
- `sudo vi /etc/systemd/system/zookeeper.service` : 要注意 kafka 的檔名路徑,可能會不同 `sudo find / -name kafka`
```config=
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/usr/bin/bash /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/bin/bash /usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
```
- `sudo vi /etc/systemd/system/kafka.service` : 要注意 kafka, java home 的檔名路徑,可能會不同 `sudo find / -name kafka`, `ls /usr/lib/jvm`
```config=
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/jre-11-openjdk"
ExecStart=/usr/bin/bash /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/bin/bash /usr/local/kafka/bin/kafka-server-stop.sh
[Install]
WantedBy=multi-user.target
```
- `sudo systemctl daemon-reload`
-
```
systemctl start zookeeper
systemctl start kafka
systemctl enable zookeeper
systemctl enable kafka
```
- `sudo systemctl status zookeeper kafka`
- 使用 :
- `cd /usr/local/kafka/`
- 發布 :
- `bin/kafka-console-producer.sh --broker-list localhost:9092 --topic event1`
- `>Hi, this is my first event`
- 訂閱 :
`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic event1 --from-beginning`
- 結果
- 
## 概念
- 是一個由 apache 公司開發的事件的 meessage queue,支援大量的訂閱以及消費 data
- 在古早時代,通常都是批次性地處理資料(ex. 一個月分析一次資料)。但現代常常會需要<b>及時</b>處理源源不絕的串流資料(ex. 及時的傳送 gps 位置、及時分析資料以掌握最新狀態)。
- 和 rabbitmq 的差別
- kafka 的 throughput 比較大
- rabbitmq 穩定性較高
## 原理

- broker : 一台有 kafka 的 server 為一個 broker
- cluser : 由 1~n 個 broker 所組成,可以把在 kafka 處理資料的 process 分散給不同 cluser 內的 broker 處理。
- record : kafka 接收到的資料
- 三個欄位
1. `key`
2. `value`
3. `timestamp`
- topic : 類似類別,不同種類 data 會用 topic 區分
- partition : 一個 topic 可以有多個 partition,個別 partition 儲存了相同的 topic 的不同資料,且不同 partition 通常會在不同的 broker。
- 好處
- 水平擴展, data 存於不同 broker
- load balance,讓相同 group 中不同 consumer 平行處理資料
- 多個 partition, 可以讓同一 group 的 consumer 個別存取
- 
- 
- 單個 partition
- 
- ref
- https://blog.csdn.net/justlpf/article/details/107400534
- replication
- 每個 partition 可以設定要幾個 replication,每個 partition 的 replication 的 data 都會相同
- replication 中會有 leader 和 follwer
- leader 負責做資料存取, follwer 負責同步資料
- 好處
- 高可用
- zookeeper
- 用來處理 leader 和 follwer 之間的關係
- 選出 leader, follwer replication
- 原則是 replication 盡量要分散到不同 node
- 消費者, 訂閱者 會向 zookeeper 存取資料, zookeeper 再把他們導到 leader replication
- 協調互相資料同步
- zookeeper 已被 kafka 內部功能取代,更有效率,且 zookeeper 太大包(因是額外的軟體)
- group_id
- 
- 會記錄不同 group_id 的 offset 到幾號,所以當斷線也可以從上次的地方斷的接回來
- consumer 每次讀完資料會 commit 更新 offset
- 大 throughput
- 處理資料是對 disk 做 sequential access 而不是 random
- 批量處理資料
- 訂閱者會等資料到一定量才做傳輸
- ref
- https://myapollo.com.tw/blog/python-kafka-part-2/
## 優點
- 能夠承受很大的流量 : 每分鐘可處理多達數十億個串流事件
- 可以對接多個不同系統
- 未用 kafka
- 
- 資料流複雜
- 需對接互相的 api
- 若接收資料的那方掛掉,資料可能流失
- 用 kafka
- 
- 高擴充性 : 可以把 process 分散給不同台上的 kafka server 處理,提高整體 kafka 的效能。
- 高可用性 : 一台掛不影響整體。
## 缺點
- 管理不同台機器的 kafka 較為困難
## confluent kafka
- 也是基於 apache kafka 為基礎而建立的 kafka 平台,差別在於提供了更多的系統對 kafka 的 connector(ex. mqtt connector),
- ref
- https://zhuanlan.zhihu.com/p/59615361
## ref
- https://www.readfog.com/a/1635090175644241920
- https://cloud.google.com/learn/what-is-apache-kafka?hl=zh-tw
- https://medium.com/@chihsuan/introduction-to-apache-kafka-1cae693aa85e
# pip3
- on rocky linux : `sudo dnf install python3-pip`
# nodejs 端(含 react 前端)
## 概念
- 
- ref : https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example
- device : mqtt producer, ex. sensor
- mqtt broker : mqtt server, 像是 mosquitto server
- moqtt connector : 連結 mqtt broker and kafka broker,可以轉換 mqtt 的型態到 kafka 能接收的型態、也有高可用
- kafka broker : kafka 的 server
- kafka consumer : 訂閱 kafka topic 的物件。
- ex. nodejs server
## 安裝
- 確定前面的安裝是正常 : https://hackmd.io/_w1tfOYZRHm5DU0UrHezjg?view
- container
- `docker ps`
- 
- mqtt
- `sudo systemctl status mosquitto`
- 
- redis
- `sudo systemctl status redis`
- 
- 把 connect 的 container 裝連接 mqtt 和 kafka 的 connector
- 用意 : mqtt connector 可以有多個 task(可以個別同時訂閱多個 mqtt server 的 topic),kafka 再去和 mqtt connector 拿資料。
- mqtt connector 的 task 數量可以設定,在 `tasks.max` 此參數。
- 進入 container : `docker exec -it connect bash`
- 安裝 : `confluent-hub install --no-prompt confluentinc/kafka-connect-mqtt:latest`
- 離開 : `exit`
- 重啟 container : `docker restart connect`
- 下載學姊給的 deliverwebForStudent.7z
- 傳進 rocky linux : `scp deliverwebForStudent.7z xxx@xxx:/tmp`
- 解壓縮 : `7z x deliverwebForStudent.7z`
- 安裝 7zip : `sudo yum install p7zip p7zip-plugins`
- `cd deliverwebForStudent`
- 安裝要用到的 node 套件
- `sudo yum install python3 gcc make gcc-c++`
- `npm install`
- `npm install pm2`
- 啟動服務
- `vi ecosystem.config.js`
```config=
module.exports = {
apps : [{
name: 'server',
cwd: './',
script: 'index.js',
instances: 1,
autorestart: true,
watch: false,
log_date_format : 'YYYY-MM-DD HH:mm:ss',
exec_mode: "cluster",
env: {
NODE_ENV: 'production',
TZ: 'Asia/Taipei'
}
}]
};
```
- run it : `pm2 start ecosystem.config.js`
- 
- 如果改要開在低於 1024 的 port
- `vi index.js`
- 
- 
- `sudo pm2 start ecosystem.config.js`
- 低於 1024 的 port 需要 root 權限
- https://stackoverflow.com/questions/9164915/node-js-eacces-error-when-listening-on-most-ports/9947222#9947222
- 訂閱 打卡 topic
- `vi mqtt.json` : uri 要改為本地的 ip
```json=
{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": 1,
"mqtt.server.uri": "tcp://[server的對內ip]:1883",
"mqtt.topics": "app/room1",
"kafka.topic": "test-topic",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"confluent.topic.bootstrap.servers": "broker:29092",
"confluent.topic.replication.factor": 1
}
}
```
- `curl -d @mqtt.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors`
- 
- 若要刪除 : `$ curl -X DELETE 'http://127.0.0.1:8083/connectors/{connectors_name}'`
- 訂閱 gps topic
- `vi mqtt2.json` : uri 要改為本地對 docker 的網卡的 ip
```json=
{
"name": "mqtt-source2",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": 1,
"mqtt.server.uri": "tcp://[server的對內ip]:1883",
"mqtt.topics": "app/room2",
"kafka.topic": "test-topic-room1",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"confluent.topic.bootstrap.servers": "broker:29092",
"confluent.topic.replication.factor": 1
}
}
```
- 用 docker 網卡的 ip,因為要讓 container 連到本地的 mqtt server : 這個例子就是 172.17.0.1
- 
- `curl -d @mqtt2.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors`
- 
- 檢查是否正常接收
- `curl localhost:8083/connectors`
- 
- note
- 如果有問題,可以看一下 container 的 log : `docker logs connect`
- 如果是 connect 的問題,可能是連接到 mqtt 的地方有問題 :
- 
- 可以自己在本地用和設定同樣的 ip 位址訂閱隨便一個主題看看是否可正常連線 :
- 
- 如果不行就回去看上面 mqtt 有沒有設定額外 ip
- 或看一下 container 是否正常 : `docker ps`
- `Failed to find any class that implements Connector and which name matches io.confluent.connect.mqtt.MqttSourceConnector`
- 應該是忘記設定 connect 的 container 的 confluent install
- `curl: (56) Recv failure: Connection reset by peer`
- connect 一開始要執行要等大概一兩分鐘就可以了。
- 若還是不行就進 connect 的 container 看 8083,如果沒有服務在跑就是有問題(可以去 connect 裡面執行 `/etc/confluent/docker/run`)
- 
- `java.net.unknownhostexception: api.hub.confluent.io error: unknown error`
- 進去 connector container 看 `curl https://api.hub.confluent.io/api/plugins` 有沒有回傳 json,若沒有就是 dns 解析有問題
- 可能是之前 prune 的時候影響
- `sudo systemctl restart docker`
## 實作
- 建好 db and table
- 建好 db 使用者
- 改 nodejs server 的 db 使用者 : `vi model/db.js`
- 
- 在 `index.js` 兩個地方先塞一個變數值讓 nodejs server 不要 error
- 
- 
- `dys09 undefined` : 新增一個 `let dys09`
- 
- 或是把 post 到後台註解
- 
- 
- 重啟 server : `pm2 restart ecosystem.config.js`
- 測試用 mqtt 格式
- `mosquitto_pub -h 172.17.0.1 -t app/room1 -m "23,121,1,1,12345,1,1"`
- [latitude, longtitude, ?, rehSNum, dpSNum]
- rehSNum : 路線 primary key (in table route)
- dpSNunm : 外送員 primary key (in table delivery_person)
- 最後 nodejs 端實際接收到 kafka 的訊息(ex. 上面的 23,121,0,1234,12345)會用 message.value.toString() or String(message.value) 獲得從 mqtt 傳的內容。
- 可以看一下 pm2 的 log 有沒有問題
- `cat ~/.pm2/logs/server-out-0.log`
- 沒問題的話,資料庫會接收到資料
- `select * from gps;`
- 
## 測試
### in host of 3G RAM and 2 CPU : 163.22.17.184
- send request with `mosquitto_pub -h 172.17.0.1 -t app/room2 -m "23,121,0,1234,12345"`
- single thread
- code
- 
- result
- total 50 with no sleep
- |interval| = 1 s
- total 500 with no sleep
- |interval| = 9,9,8 s
- total 5000 with no sleep
- |interval| = 98 s
- multi thread : 2
- cpu num
- 
- code
- 
- result
- total 100, each thread 50, with no sleep
- |interval| = 3 s
- total 1000, each thread 500, with no sleep
- |interval| = 22 s
- total 10000, each thread 5000, with no sleep
- |interval| = 239 s
- db
- 我是用 mariadb
- `sudo yum install mariadb-server mariadb-client`
-
- note
- 會因為某段時間頻繁的送 request 而導致 server 接收不穩定
- 一開始用單執行序送 5000 request 是沒問題,後來 multi thread 送完 10000 後,繼續用 single 送 500 都會有問題一小段時間(1 min),再過一小段時間(1~2 min)後 500 是正常,但 5000 就會少。
### in host of 3G RAM and 4 CPU : 163.22.32.40:6681
- send request with `mosquitto_pub -h 172.17.0.1 -t app/room2 -m "23,121,0,1234,12345"`
- `mosquitto_pub -h 172.17.0.1 -t app/room2 -m "23,121,林阿美,1,12345,1,1"`
- 經度, 緯度, 插入 gps_list 的 reh_s_num, public/js/index.min.js 接收到的 reh_s_num, ?, ?, ct_s_num
- single thread
- code
- 
- result
- total 50 with no sleep
- execution time = 0.11 s
- |interval| = 1 s
- total 500 with no sleep
- execution time = 1.22 s
- |interval| = 5 s
- total 5000 with no sleep
- execution time = 11.85 s
- |interval| = 54 s
- multi thread : 2
- cpu num
- 
- code
- 
- result
- total 100, each thread 50, with no sleep
- execution time = 0.23 s
- |interval| = 1 s
- total 1000, each thread 500, with no sleep
- execution time = 2.14 s
- |interval| = 12 s
- total 10000, each thread 5000, with no sleep
- execution time = 21.35 s
- |interval| = 145 s
# vm 版
## 安裝
- java : `sudo dnf install java-11-openjdk-devel -y`
- confluent-hub :
- confluent-hub client
- `wget https://client.hub.confluent.io/confluent-hub-client-latest.tar.gz`
- `tar xvf confluent-hub-client-latest.tar.gz`
- `mv confluent-hub-client-latest /usr/bin`
- 確認 : `confluent-hub -v`
- confluent-hub platform
- 先去 https://www.confluent.io/download/ 登入免費版的
- 複製 tar 按鈕的連結
- 
- 抓剛剛的資料 : `curl -L -o confluent.tar 剛剛複製的連結`
- 解壓縮 : `tar xvf confluent.tar`
- 把 confluent 放入環境變數
- `export CONFLUENT_HOME=/home/abc/confluent-7.0.0`
- 把 `/home/abc/confluent-7.0.0` 改為剛剛安裝的路徑
- `export PATH=$CONFLUENT_HOME/bin:$PATH`
- `export PATH=$CONFLUENT_HOME:$PATH`
- 下載 kafka, mqtt connector
- `cd confluent-版本/bin`
- `./confluent-hub install confluentinc/kafka-connect-mqtt:latest`
- 查看剛剛下載的服務的狀況
- `./confluent local services status`
- 開啟剛剛下載的服務
- `./confluent local services kafka start`
- `./confluent local services connect start`
- 查看是否正常開啟
- 
- 新增上面的 mqtt.json, mqtt2.json
- 檢查
- `mqtt.json`
- `curl -d @mqtt.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors`
- 
- `mqtt2.json`
- `curl -d @mqtt2.json -H "Content-Type: application/json" -X POST http://local
host:8083/connectors`
- 
- 綜合
- `curl localhost:8083/connectors`
- 
## 測試 mqtt -> mqtt connector -> kafka
- `cd confluent-x.x.x/bin`
- 開啟 ksql-server `./confluent local start ksql-server`
- 
- 創 kafka topic : `./kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic temperature`
- 設定 mqtt connector : `vi kafka-mqtt-quickstart.properties`
```
topic.regex.list=temperature:.*temperature
listeners=0.0.0.0:1884
bootstrap.servers=PLAINTEXT://localhost:9092
confluent.topic.replication.factor=1
```
- topic.regex.list : kafka_topic:mqtt_topic,把 kafka, mqtt 的不同 topic mapping
- ex. `topic.regex.list=test-topic-room1:app/room2`
- 
- 
- 
- 要把 mqtt connector 開在別的 port 上(這邊隨便挑 1884),不然等一下會跟 mqtt server 的 port (mqtt server default on 1883)相撞
- 啟動 mqtt connector : `./kafka-mqtt-start kafka-mqtt-quickstart.properties`
- 啟動 with daemon : `sudo ./control-center-start -daemon /home/tommygood/confluent-7.4.0/bin/kafka-mqtt-quickstart.properties`
- 預設在 9021 port 可以監控 connector 狀態
- 但是要先 `./kafka-mqtt-start kafka-mqtt-quickstart.properties` 之後才能開
- 但是一但開了就會卡爆
- 用別的方式開啟 control-center
- `control-center-start -daemon /etc/confluent-control-center/control-center.properties`
- https://lengxiaobing.github.io/2019/02/23/Confluent-%E6%90%AD%E5%BB%BA%E6%96%87%E6%A1%A3/
- 若只有 1 個 broker,要加入以下設定,因預設會使用 3 個 brokers :
```=
confluent.controlcenter.internal.topics.replication=1
confluent.controlcenter.command.topic.replication=1
confluent.monitoring.interceptor.topic.replication=1
confluent.metrics.topic.replication=1
```
- ref : https://stackoverflow.com/questions/61023514/how-to-configure-single-broker-for-confluent-control-center
- 使用 ksql : `./ksql http://localhost:8088`
- 
- Register the sensor topic’s schema with KSQL: `CREATE STREAM carsensor (eventid integer, sensorinput varchar) WITH (kafka_topic='temperature', value_format='DELIMITED');`
- Set a continuous query running in KSQL: `SELECT EVENTID, SENSORINPUT FROM CARSENSOR EMIT CHANGES;`
- 
- 開另一個介面,送訊息到 mqtt connector 的 port : `mosquitto_pub -h 0.0.0.0 -p 1884 -t car/engine/temperature -q 2 -m "99,2.10#"`
- 
- 看 ksql 是否可以接收
- ref
- https://github.com/kaiwaehner/ksql-udf-deep-learning-mqtt-iot/blob/master/live-demo-ksql-udf-deep-learning-mqtt-iot.adoc
- https://www.youtube.com/watch?v=L38-6ilGeKE&t=183s
### 遇到的問題
- npm 版本太舊
- 查看版本 `npm -v`
- 我是 6.14,但會太低,導致 npm 在 install pm2, ioredis 的時候出現 `error: ‘memcpy’ was not declared in this scope`
- 更新 npm 前要先更新 node
- 查看版本 : `node -v`
- 我是 10.x 所以不能導致 npm 無法從 6.14 更新至 9.6.6
- 下載官方的壓縮檔
- `wget https://nodejs.org/dist/v18.16.0/node-v18.16.0-linux-x64.tar.xz`
- `tar xvf node-v18.16.0-linux-x64.tar.xz`
- 把原本的 node 移除
- 看原本 node 的位置 : `whereis node`
- 我的話是 : `mv /usr/bin/node /usr/bin/node1`
- 換新的 node
- `ln -s /path/to/node-v18.16.0-linux-x64/bin/node /usr/bin/node`
- 更新 npm
- `npm install npm -g`
- Error: could not find share/java/confluent-common/common-config-*.jar in CONFLUENT_HOME
- 加入 confluent home 到系統變數
```conf=
export CONFLUENT_HOME=/path/to/confluent-7.4.0
export PATH=$CONFLUENT_HOME:$PATH
```
## 實驗 send request
- send request with `mosquitto_pub -h 0.0.0.0 -p 1884 -t app/room2 -q 2 -m "23,121,0,1234,12345"` continuously : 6680
- single thread
- result
- total 50 with no sleep
- execute time = 0.16 s
- |interval| = 0 s
- total 500 with no sleep
- execute time = 1.88, 1.53 s
- |interval| = 9,7,8,10,9 s
- total 5000 with no sleep
- execute time = 15.02 s
- |interval| = 74 s
- multi thread : 2
- cpu num
- 
- code
- 
- result
- total 100, each thread 50, with no sleep
- execute time = 0.21 s
- |interval| = 3 s
- total 1000, each thread 500, with no sleep
- execute time = 2.15 s
- |interval| = 18 s
- total 10000, each thread 5000, with no sleep
- execute time = 22.16 s
- |interval| = 184 s
## 備註
- 如果去看 kafka 的紀錄,可以發現它其實接收到 mqtt 資料的速度非常快,所以主要可能是慢在 nodejs server 接收和 nodejs server 丟到 db 裡面。
- 
- 
- ![Uploading file..._yzx5098hu]()
- 教學
- https://www.youtube.com/watch?v=L38-6ilGeKE&t=183s
- https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example/blob/master/live-demo-kafka-connect-iot-mqtt-connector.adoc
# 題目發想
- 架構
- 
- 到底要做啥
- 除了送餐服務以外到底要做啥
- 要先搞清楚系統的面向
- 對象 : 到底是要針對老人、獨居老人,還是也包含其他弱勢族群(ex. 新住民)
- 功能 : 只針對送餐,還是其他功能(ex. 長照)
- 若是對象是針對老人,可以先從老人會遇到甚麼問題開始想
- 沒朋友 : 可以開發一款針對老人使用的交友軟體(?,會每天更新不同長輩圖(用 ai 合成)
- 佛傳提供的服務
- 獨居老人送餐
- 長照服務
- 幫老人上課
- 食物銀行
- 新住民訓練
- 可新增的功能
1. 送餐路線最佳化
2. 利用各種財務報分析機構財務狀況
3. 利用模型分析外送員的狀態
- ex. 若外送員在同一個點待超過一定時間可能有問題
4. 送餐員送餐可以回報老人的狀況(生理, 心理)
5. 目前餐點評分制度以及基於評分做出調整的制度是如何
- 是否還有改善空間
# 後端地圖內容
## mqtt request
- 注意事項
- 因為用字串包起來,所以不同 index 間不可以有空格,會被認為是資料的一部分
- ex. `-m "1,2,3"`
- `app/room2` : punch (送達,對應到 kafka 的 `test-topic`)
- `mosquitto_pub -h 172.17.0.1 -t app/room2 -m "23,121,1,1,12345,1,1"`
- `送達時經度`
- `送達時緯度`
- `reh_s_num`(送達的客戶在的路線名稱)
- (不知做啥的)
- `ph_inorout`(不知做啥的)
- `ph_wifi`(應該是送達時有沒有 wifi)
- `ct_s_num`(送達的客戶 id)
- 會記錄一筆資料在 table `punch`
- 
- `app/room1` : gps (對應到 kafka 的 `test-topic-room1`)
- `mosquitto_pub -h 172.17.0.1 -t app/room1 -m "23,121,林阿美,1,12345"`
- `經度`
- `緯度`
- `外送員名稱`
- 一定要一樣,拿 name 當 pk ?
- `reh_s_num`
- `dp_s_num` : 外送員編號
- token,不知是啥的,在 `public/js/index.min.js`
- 會記錄一筆資料在 table `gps_list`
- 
- 會記錄一筆資料在 table `gps`
- 
## 前端
- path : `deliverwebForStudent/public/`
- js script path : `deliverwebForStudent/public/js`
### 新增的部分
- topic
1. gps
- mqtt : `app/room2` --> kafka : `test-topic-room1`
- 後端
- 後端接收後以 `const topic = 'sensor/Test/room1';` 的 topic 把資料丟給前端。
- 
- 前端
- 理論上前端接收來自後端的 topic(`sensor/Test/room1`) 的資料
- 
- 但原本是寫
- 
- 所以就把 `app/room2` 改成 `sensor/Test/room1`(如上上面)
2. punch(送完餐打卡)
- mqtt: `app/room1` --> kafka: `test-topic`
- 後端
- 後端接收後以 `const topic = 'sensor/Test/room2';` 的 topic 把資料丟給前端。
- 
- 前端
- 直接用 `else` 接收,因為只有兩種 topic,且不是上面的 `sensor/Test/room1`
- 開啟所有圖層
- 不開的話只看的到第一個圖層的 icon
- in `public/js/index.min.js` : line 12x
- 預設的 code 只開第一個圖層 :
- `markerGroups['路線: ' + allRoute[0].reh_name + ' ' + allRoute[0].name].addTo(map);`
- 改成全部圖層
```js=
for (let i = 0;i < allRoute.length;i++) {
markerGroups['路線: ' + allRoute[i].reh_name + ' ' + allRoute[i].name].addTo(map);
}
```
- 處理打卡 topic 的 bug
- 要加上 int 型態才讀的到 json object 的 key : `reh_s_num = parseInt(paylod[2]);`
- 
- 同樣邏輯另一個地方 : 27x line
- 
- 送餐員一天內第一次回傳 gps 位址會有的 bug
- 導致外送員當天第一次回傳 gps 會沒辦法畫紅線,需要重整頁面才行
- 觸發
- 把 gps 資料清空(當天的即可,因每天只會顯示當天的) : `delete from gps_list`
- 原因
- 只有當之前有走過的路線(gps_list)不為空,才會定義 polyline 的內容
- 
- 解法
- 加一個 `else`,定義內容,但設為空
- `all` 改成 `[]`
- 
- 一開始載入沒有去抓總公里數和總送餐數
- `public/js/index.min.js`
```js=
const insertTotalInfo = (total_km, total_meal_count) => {
// 總公里數
if (total_km != undefined) {
document.getElementById('totalkm').textContent = '總公里: ' + Number(total_km).toFixed(2) + ' km';
}
// 總送餐數
if (total_meal_count != undefined)
document.getElementById('totalMealCount').textContent = '總送餐數: ' + Number(total_meal_count);
}
// 抓他今天要送的總數量
// 首先載入總公里數, 總送餐數
const total_km = await fetchTotalKm();
const total_meal_count = await fetchTotalMealCount();
insertTotalInfo(total_km, total_meal_count);
```
## 後端
### 新增的部分
- 一開始載入就去抓總公里數和總送餐數會有的 bug
-
## 資料庫
- path : `deliverwebForStudent/model`
### table 介紹
- `daily_shipment` : 每天的送餐總表
- 應該是一個要送餐的客戶(`client`)有一筆紀錄
- 測試資料 :
```
insert into daily_shipment(`dp_s_num`, `ct_s_num`, `sec_s_num`, `reh_s_num`, `ct_name`, `dys01`, `dys02`, `dys03`, `dys04`, `dys05`, `dys05_type`, `dys06`, `reh_name`, `ct_order`, `dys09`, `dys10`, `dys11`) values(1, 1, 1, 1, '吳阿花', '2023-6-9', 'a', 'b', 'c', 'd', 'e', 123, '中正路', 1, 1, 'Y', 1);
```
- 欄位
- `dp_s_num` : 外送員編號
- `ct_s_num` : 客戶編號
- `sec_s_num` : ?
- `reh_s_num` : 路線編號
- `ct_name` : 客戶名稱
- `dys01` : 送餐日期
- `dys02` : ?
- `dys03` : 食物名稱
- `dys05` : ?
- `dys05_type` : ?
- `dys06` : ?
- `reh_name` : 路線名稱
- `ct_order` : 應該是此客戶的便當數量
- `dys09` : ?
- `dys10` : 要設 `Y`,因 `model/map.js` 只選 `Y`,不知為啥
- `dys11` : ?
- `client_route` : 客戶路線
- 有點看不懂在幹嘛,都有 `daily_shipment` 還要這幹嘛
- 測試資料 :
```
insert into client_route(`ct_s_num`, `ct_order`, `reh_s_num`) values(1, 5, 1);
```
- `ct_s_num` : 客戶編號
- `ct_order` : 應該是此客戶的便當數量
- `reh_s_num` : 此客戶所在的路線編號
- `client` : 客戶資料
- 測試資料 :
```
insert into client values(1, 'tommy', 'good', 'M', '163.22.32.40', '120.5', '23.5', 1);
```
- `s_num` : 編號,為啥不是 auto increment ?
- `ct_name` : 客戶姓
- `ct_lastname` : 客戶名
- `ct_gender` : M - 男性,其他為女性
- `ct_address` : 客戶住址
- `ct_lon` : 客戶住址的經度
- `ct_lat` : 客戶住址的緯度
- `status` : ?
- `route` : 路線資料
- 測試資料 :
```
insert into route values(1, "中正路", 1, 1, 0);
```
- `s_num` : 編號,為啥不是 auto increment
- `reh_name` : 路線名稱
- `reh_category` : 路線類別(1=山線,2=海線,3=屯線)
- `reh_time` 路線適用時段(1=午餐/午晚餐,2=晚餐),
- `status` : int(1) DEFAULT '0', ?
- `delivery_person` : 外送員資料
- 測試資料 : `app/room2` 的測資中,外送員名稱要符合此處的 姓 + 名(ex. 林阿美)
```
insert into delivery_person values(1, '小美', '', '喜歡送便當', '林', '阿美', '做過 foodpanda', 0);
```
```
insert into delivery_person values(2, '老吳', '', '喜歡騎車', '陳', '小吳', '做過 uber eats', 0);
```
- `s_num` : 編號,為啥不是 auto increment
- `dp_nickname` : 綽號
- `dp_img` : 圖像路徑
- `dp_reason` : 加入理由
- `dp01` : 姓
- `dp02` : 名
- `dp_experience` : 經驗
- `status` : int(1) DEFAULT '0', ?
- `gps_list` :
- 說明
- 不用自己記錄,新增 gps 資料會自動新增
- 一個路線(`reh_s_num`)上<b>一天</b>只會有一個外送員,這個外送員今天的 gps 資料會被記錄在 gps 和 gps_list 中。
- `gpsstring` :
- 一次 gps 紀錄包含經緯度,用 `@` 隔開。
- 每次 gps 紀錄會用 `,` 隔開
- 
# 多機構架構
## 概念
- 由 docker swarm cluster 組成,不同 container(mqtt broker, mqtt connector, kafka broker...)部屬在不同 node 上
- 高可用、部屬快、可利用多台 node 資源
- 可以由 3 台 manager node 組成
- kafka broker 可以有多個 container,組成一個 cluster
- redis 也可以有 cluster
- https://ithelp.ithome.com.tw/articles/10285566?sc=rss.qu
## 後端更改
- db connection limits
- connection 數量 : 原本 db 會有 6 個 connections + 一個 node server 會佔 100 個 connections
- 查看當前使用量 : `show full processlist;`
- 依照數量設置限制 : `SET GLOBAL max_connections = 200;`
- 預設是 150,查看當前限制量 : `show variables like "max_connections";`
- pm2 server name
- `vi ecosystem.config.js`
```=
name: '{new_server_name}'
```
- mqtt, kafka topic
- 要改的欄位
- `name`
- `mqtt.topics`
- `kafka.topic`
- `vi mqtt.json`
```json=
{
"name": "server-mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": 1,
"mqtt.server.uri": "tcp://172.17.0.1:1883",
"mqtt.topics": "server/app/room1",
"kafka.topic": "server-test-topic",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"confluent.topic.bootstrap.servers": "broker:29092",
"confluent.topic.replication.factor": 1
}
}
```
- `curl -d @mqtt.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors`
- `vi mqtt2.json`
```json=
{
"name": "server-mqtt-source2",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": 1,
"mqtt.server.uri": "tcp://172.17.0.1:1883",
"mqtt.topics": "server/app/room2",
"kafka.topic": "server-test-topic-room1",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"confluent.topic.bootstrap.servers": "broker:29092",
"confluent.topic.replication.factor": 1
}
}
```
- `curl -d @mqtt2.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors`
- 結果
- `curl localhost:8083/connectors`
- 
- db
- create a new database with sql
- node server settings
- `vi index.js`
- kafka consume topic
- `groupId`
- `topic`
- 
- 
- port
- 
- 
- 有一些 port pm2 沒有權限存取(ex. 80),就要用 root 權限去執行 pm2 : `sudo pm2 start xxx`
- `vi model/db.js`
- `database`
```=
database: "{new_db_name}"
```
- start with pm2
- `pm2 start ecosystem.config.js`
- 
## 測試
- `mosquitto_pub -h 172.17.0.1 -t server/app/room2 -m "23,121,0,1234,12345"`
- `-t` 後面改成自己的 mqtt topic
- 結果
- 新的 node server 的 db 可以接收
- 
- 舊的也還是可以
- 
## docker-compose yaml
### 介紹
- connect
- command
- 目的 : 要在此 container 安裝 mqtt connector。2, 3 不能相反
1. 基於 broker, schema-registry 所以若要下 command,要先等前兩個 container 跑完
2. 安裝要用的 mqtt connector 後 sleep 一下等安裝結束
3. 最後要去執行 connect 本來會去執行的執行檔(`/etc/confluent/docker/run`),因為用 command 會把此 command 覆蓋
## redis
- 因為 volume 所以要在每一個 node 上開 `/var/redis`
- 到時候要改用更好方式
- `vi docker-compose.yml`
```yml=
---
version: "3.6"
services:
master:
image: "redis:6.2.3"
ports:
- 6379:6379
volumes:
- /var/redis:/data
```
- 學姊把 redis 用在兩個地方,1 個在 `index.js`,一個在 `module/cache.js`
- 且 `module/cache.js` 寫 redis server 在 127.0.0.1
## mqtt
- 因為 volume 所以要在每一個 node 上開 `/var/mosquitto/mosquitto.conf`, `/var/mosquitto/log/`, `/var/mosquitto/data`
- `vi /var/mosquitto/mosquitto.conf`
```conf=
listener 1883 0.0.0.0
allow_anonymous true
```
## kafka
- compose file 的 : `KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092`
- 要改為 `KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://kafka:9092`
### file content
```yml=
---
version: '3.9'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.1.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.1.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:7.1.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
confluent-hub install --no-prompt confluentinc/kafka-connect-mqtt:latest && \
sleep 20 && \
/etc/confluent/docker/run && \
tail -f /dev/null'"
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.1.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
control-center:
image: confluentinc/cp-enterprise-control-center:7.1.1
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.1.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.1.1
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.1.1
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.1.1
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
mosquitto:
image: eclipse-mosquitto:latest
build:
context: .
volumes:
- type: bind
source: /var/mosquitto/mosquitto.conf
target: /mosquitto/config/mosquitto.conf
- type: bind
source: /var/mosquitto/log/
target: /mosquitto/log/
- type: volume
source: data
target: /mosquitto/data/
ports:
- target: 1883
published: 1883
protocol: tcp
mode: ingress
- target: 9001
published: 9001
protocol: tcp
mode: ingress
deploy :
placement:
constraints:
- node.role == manager
volumes:
data:
name: "mqtt-broker-data"
```
## 測試
- `curl {cluster_node_ip}:8083/connectors` : 要用在 cluster 內 vm 的 ip 去 curl,不能用 localhost or 127.0.0.1
- 
- 會得到一個空集合
- mqtt to kafka : 
- mqtt to kafka : 
### 結果
- 
## Dockerfile
- build : `sudo docker build -t backend_node .`
- 有些 file 會被 cache 而沒被改到 : 加上 `--no-cache`
- upload
- `docker tag backend_node tommygood/node_server`
- `docker push tommygood/node_server`
- 第一次 push 會比較久
- `vi Dockerfile`
```Dockerfile=
FROM node:18
# encode
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US:en
ENV LC_ALL en_US.UTF-8
# Create app directory
WORKDIR /usr/src/app
# Install app dependencies
# A wildcard is used to ensure both package.json AND package-lock.json are copied
# where available (npm@5+)
COPY package*.json ./
RUN npm install
# If you are building your code for production
# RUN npm ci --omit=dev
# install apt plugin
RUN apt update
RUN apt install curl
# Bundle app source
COPY . .
EXPOSE 3005
#CMD [ "node", "index.js" ]
```
- `ENV LC_ALL en_US.UTF-8` : 這行應該不用
- ref
- https://nodejs.org/en/docs/guides/nodejs-docker-webapp
- https://www.itread01.com/article/1481612743.html
## 問題
- 要注意記憶體的使用
- 砍掉一個開了 3 天的 backend_kafka 的 stack 直接少 21 GB
- 如果有部分 kafka 相關 container 不在同個 host 有問題
- 
- 
- sol
- 因為 VM 是用 VMWare 建的,而 VMWare 會使用 docker swarm 用來在多節點溝通的 `4789/udp`,所以會造成 container 在多節點間溝通有問題(不同 node 的 container 可以互相 ping 但無法存取服務)
- 所以只要把 docker swarm 的多節點溝通開在不同 port 上就可以
- `docker swarm init --data-path-port=7788`
- node 要記得把這個 port 打開(我是開 udp + tcp)
- ref
- https://stackoverflow.com/questions/75985365/docker-swarm-cant-reach-services-in-another-host
# 新增的功能
- 查詢菜單
- `vi public/js/auth.min.js`
```js=
<li>
<a class="nav-link <%- url == 'menuQuery' ? 'active' : '' %>" href="/menuQuery" id="menuQuery">
<i class="material-icons">fastfood</i>
<span class="mx-2">查詢菜單</span>
</a>
</li>
```
- `vi public/menuQuery.ejs`
- `vi module/pageRouter.js`
```js=
const PAGE = ['index','info','introduce','allintroduce', 'menuQuery']; // 所有的頁面
```
- open cors
- 讓 angular 的 map site 可 call api
- `index.js`
```js=
// socket io
const { Server } = require("socket.io");
const io = new Server(server, {
cors: {origin : '*'}
});
// cors
cors = require('cors');
app.use(cors());
```
# 資料來源
- 碳排放計算
- https://data.gov.tw/dataset/33215
# galera
- install
- `sudo yum install mariadb-server mariadb`
- `sudo dnf install -y mariadb-server-galera`
- `sudo systemctl stop mariadb`
- 每一台都先 stop
- config (在每一台)
- `sudo vi /etc/my.cnf.d/mariadb-server.cnf`
```conf=
[galera]
binlog_format=ROW
default_storage_engine=InnoDB
innodb_autoinc_lock_mode=2
bind-address=0.0.0.0
wsrep_on = ON
wsrep_provider=/usr/lib64/galera/libgalera_smm.so
wsrep_cluster_address="gcomm://192.168.10.110,192.168.10.71" # 所有 node 的 ip
wsrep_cluster_name="galera_cluster"
wsrep_sst_method=rsync
# 這兩行在不同 node 要改
wsrep_node_address="192.168.10.110"
wsrep_node_name=node1
```
- open port
- `3306,4567,4568,4444/tcp`
- `4567/udp`
- `sudo galera_new_cluster`
- 在第一台做就好
- 會複製第一台的資料到其他台
- `sudo systemctl start mariadb`
- 從第一台開始 start 到每一台
- `show status like 'wsrep_cluster_size';`
- 
# haproxy
- `sudo yum install haproxy`
- `sudo vi /etc/haproxy/haproxy.cfg` : 新增在最上面
```conf=
listen galera
bind 192.168.33.71:3307 # 這邊是 haproxy 新的 load balance 版的 mariadb 入口,不要和原本的 mariadb 重疊
balance roundrobin # load balancer policy
mode tcp # (tcp 設置為 layer 7 , http 為 layer 4)
option tcpka # enable keepalive to maintain tcp connection
option mysql-check user haproxy # enable database server check
server node1 192.168.10.71:3306 check weight 1
server node2 192.168.10.110:3306 check weight 1
server node3 192.168.10.111:3306 check weight 1
# 監聽狀態的 site
listen stats
bind 0.0.0.0:9000
mode http
stats enable # 啟用狀態
stats uri /stats # 網址路徑
stats realm HAProxy\ Statistics
stats auth howtoforge:howtoforge # 設定帳號密碼
stats admin if TRUE # 設定使用者登入後為管理者
stats refresh 30s # 每 30 秒刷新監控畫面
```
- `sudo systemctl restart haproxy`
- 如果 cannot bind socket (Permission denied)
- `sudo setenforce 0`
- `sudo sed -i 's/^SELINUX=enforcing$/SELINUX=permissive/' /etc/selinux/config`
- `sudo mysql`
- `create user 'haproxy'@'192.168.10.71';`
- `flush privileges;`
- 到三台的 mariadb 做不同標籤
- ex. `SET GLOBAL server_id=1;`
- `mysql -u fuboat -h 192.168.10.71 -P 3307 -p`
- 
- 看每次的標籤是否不一樣
- 
- ref
- https://gary840227.medium.com/mariadb-cluster-f7220e9eaac8