owned this note
owned this note
Published
Linked with GitHub
# [AIdrifter CS 浮生筆錄](https://hackmd.io/s/rypeUnYSb) : <br>Linux IPC And Mulit-thread
## Inter-Process Communication
```sequence
Note left of Process A: Process A thinks
Process A->Process B: Transfer Data
Note right of Process B: Process B thinks
Process B-->Process A: Signal
```
## 8 way for IPC
- File
- FIFO (is called Named pipe)
- Signal
- Semaphore
- Pipe
- Message queue
- Shared memory
- Socket
### File
一個 Process 把資料寫在檔案裡面
其它 Process(es) 讀取
- 優點:
- 就算關機,內容還是存在
- 可以一對多分享
- 簡單,大概沒有比這個更基本的方法了
- 缺點:
- 要自己做【通知】的動作,讀取者很難知道什麼時候有新資料產生了,或是去 hook file system ?!
- 要如何指定某個資料只給某個 reader ?
- 資料拿走之後要自行刪除
- 要自己處理 race condition
### FIFO (Named Pipe)
用 `mkfifo( )` 建立特殊格式 File
用 File I/O 操作達到資料傳輸 (一個讀,一個寫)
**最後記得 `remove()` / `unlink()` 這個 file***
- 優點:
- **Blocking**,可以做到寫入等待,直到對方讀取資料
- 簡單直接,畢竟可以用基本 File I/O
- 缺點:
- 一次只能對一個,資料被讀取,其它人讀不到
- 檔案名稱衝突問題
- 必須預先知道資料 size
- PID 001
```C
int nInput[2] = {3,5};
int fd = 0, result = 0;
mkfifo (fileA, 0666); // Create FIFO
/* send two int var to fileA ,wait write() until PID 002 recived.(non-blocking) */
fd = open(fileA, O_WRONLY);
write (fd, nInput, sizeof(int)*2); // blocking until PID 002 read
close (fd);
/* read result from fileB */
fd = open (fileB, O_RDONLY);
read (fd, &result, sizeof(int));
close (fd);
printf (“result = %d\n”, result);
// Don't forget to unlink() or remove() fifo
unlink (fileA);
unlink (fileB):
```
- PID 002
```C
int nInput[2], result=0, fd=0;
mkfifo (fileB, 0666); // Create FIFO
/* Read two int from fileA */
fd = open (fileA, O_RDONLY);
read (fd, nInput, sizeof(int)*2);
close (fd);
// Result of Sum
result = nInput[0] + nInput[1];
// write result to fileB
// wait write() until PID 001 received
fd = open (fileB, O_WRONLY);
write (fd, &result, sizeof(int); // blocking until PID 001 received
close (fd);
```
### Signal
只能用來通知,無法傳輸資料
Blocking mode 等待
- 優點:
- 一次可以等待多個 signals
- 一次可以通知多個 processes
- 可以用來傳送系統命令 (例如SIGKILL)
- 缺點:
- 不能傳輸資料
```C
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
void *aa(void *arg) {
pid_t ppid = 0;
// get parent pid
ppid = getppid();
// kill parent process , send signal
kill (ppid, SIGUSR1);
}
```
```C
main() {
pthread_t tid; sigset_t set;
int sig = 0;
// init set (wait SIGUSR1)
sigemptyset (&set);
sigaddset (&set, SIGUSR1);
sigprocmask (SIG_SETMASK, &set, NULL);
// create thread aa
pthread_create (&tid, NULL, aa, NULL);
// wait aa's signal(terminate)
sigwait (&set, &sig);
// after aa is finished , continue our code flow.
// …
}
```
### Semaphore
只能用来通知,無法傳輸data
適合用来解决 `PV problem` **<--WTH?????**
**Binary semaphore 可以当 Mutex 使用**
- **jserv 不這樣認為喔 0.0**
優點:
- 適合一對多,甚至多對多
缺点:
- 不能傳輸data
#### Semaphore Concept
- Cutomer use `sem_wait()` or `sem_trywait()`
- Customer 一次把 Resource count 減一
- When `Resource count = 0` => Customer Sleep()
- 用 `sem_post()` 當Producer
- Producer不會真的sleep
- 連上限都沒有
- Producer要睡覺,要自己想辦法
- 單純使用 `sem_init()` + `fork()`,==Parent 跟 Child將不會共用同一個 semaphore,要加上 shared memory 配合使用。這是個常見的陷阱==
- Reference: [Semaphores on linux - Sem-init() vs sem_open()](http://blog.superpat.com/2010/07/14/semaphores-on-linux-sem_init-vs-sem_open/)
#### Semaphore example
- Producer
```C
#define SEM_NAME “phantom_sem”
sem_t *sem = NULL;
// create semaphore
sem = sem_open (SEM_NAME, O_CREAT, S_IRWXU, 0);
// increase semaphore (+1)
sem_post (sem); // produce used it
// 離開前的釋放資源
sem_close (sem);
sem_unlink (SEM_NAME);
sem = NULL;
```
- Customer
```C
#define SEM_NAME “phantom_sem”
sem_t *sem = NULL;
// create semaphore
sem = sem_open (SEM_NAME, O_CREAT, S_IRWXU, 0);
// wait semaphore
sem_wait (sem);
// release resouce before levaing
sem_close (sem);
sem = NULL;
```
### PIPE
- 只能在 Parent 與 Child 之間,或是 Child 與 Child 之間互傳資料
- 讀取時 **Blocking Mode** 等待
- 單方向的 IPC 方式
- 優點:
- 不用處理 Race condition
- 缺點:
- 只適用與 Parent vs. Child 或是 Child vs. Child
- Hint
- **`Pipe[0]` : read**
- **`Pipe[1]` : write**
```C
int pPipe[2], cPipe[2], nInput[2], sum=0;
// pPipe is Child => Parent
pipe (pPipe);
// cPipe is Parent => Child
pipe (cPipe);
if ((pid = fork()) == 0)
{ // Child pcoess
read (cPipe[0], nInput, 8); // read two integers from parent
sum = nInput[0] + nInput[1];
write (pPipe[1], &sum, 4); // Write pPipe[1] Result to Parent
}
else
{ // Parent process
nInput[0] = 3; nInput[1] = 5;
write (cPipe[1], nInput, 8); // send two intgers to Child
read (pPipe[0], &sum, 4); // get result pPipe[0] from child
printf (“Result = %d”,sum);
}
```
### Message Queue
- 提供非同步模式來傳送跟接收資料
- `msgget`(create)
- `msgsnd`(send)
- `msgrcv`(receive)
- `msgctl`(release)
- 如果是使用 `IPC_NOWAIT`,將變成**non-blocking mode**
- 有些進階的 **MQ** 甚至可以跨伺服器
- 優點:
- 可以指定Receiver
- 可以asynchronization send and receive
- 缺點:
- Message queue 儲存容量有上限,單筆最大 **8192 bytes**,最多存放 **16384 筆**
- 最後要記得刪除臨時檔案
- **Hint**
- Data Structure is **Stack**
```C
struct msgBuf { int type; int data; };
key_t mqKey;
struct msgBuf msg;
int mqID=0;
// msgget(create)
mqKey = ftok(“msg.txt”, ‘b’); // create key
mqID = msgget(mqKey, 0660 | IPC_CREAT);
// send data and set type=1
// msgsnd(send) nInput[0]
msg.type = 1; msg.data = nInput[0];
msgsnd(mqID, &msg, sizeof(msg), 0);
// send data and set type=1
// msgsnd(send) nInput[1]
msg.type = 1; msg.data = nInput[1];
msgsnd(mqID, &msg, sizeof(msg), 0);
// Specified receive "type=2" data (return result)
// msgrcv(receive)
msgrcv (mqID, &msg, sizeof(msg), 2, 0);
result = msg.data;
// msgctl(release)
msgctl (mqID, IPC_RMID, NULL);
```
```C
struct msgBuf { int type; int data; };
key_t mqKey;
struct msgBuf msg;
int mqID = 0;
// msgget(create)
mqKey = ftok(“msg.txt”, ‘b’); create key
mqID = msgget(mqKey, 0660 | IPC_CREAT);
// Specified receive "type=1" data
msgrcv (mqID, &msg, sizeof(msg), 1, 0);
nInput[0] = msg.data;
msgrcv (mqID, &msg, sizeof(msg), 1, 0);
nInput[1] = msg.data;
result = nInput[0] + nInput[1];
// Specified send result and set type=2
msg.type = 2; msg.data = result;
msgsnd(mqID, &msg, sizeof(msg), 0);
```
### Shared Memory
- 大家共用**同一塊記憶體**來交換資料
- `shmget`(create)
- `shmdt`(send)
- `shmat`(receive)
- `shmctl`(releae)
- 優點:
- 可以用在多對多
- 速度最快的方式
- 缺點:
- 要自己處理所有的 race condition 問題
```C
key_t shmKey;
int shmID = 0, result = 0, nInput[2] = {3,5}, *pBuf;
// create key
shmKey = ftok (“msg.txt”, ‘b’);
// create shm size = 8, set to correspond pBuf
shmID = shmget (shmKey, 8, MODE_MASK);
// prepare to receive
pBuf = shmat (shmID, NULL, 0);
// write two integer to shm
memcpy (pBuf, nInput, 8);
// shmdt(send)
shmdt (pBuf);
sem_post (semA); // A notice B, two integers are ready
sem_wait (semB); // A wait B's notice result
// get result from shm (放在 pBuf)
pBuf = shmat (shmID, NULL, 0);
memcpy (&result, pBuf, 4); // get result
msgctl (mqID, IPC_RMID, NULL); // release shm
```
```C
key_t shmKey;
int shmID=0, result=0, nInput[2], *pBuf;
// create key
shmKey = ftok(“msg.txt”, ‘b’);
// create shm szie = 8, , set to correspond pBuf
shmID = shmget(shmKey, 8, MODE_MASK);
// prepare to receive
pBuf = shmat(shmID, NULL, 0);
sem_wait (semA); // wait A ‘s response
// get 8 bytes from shm
memcpy(nInput, pBuf, 8);
result = nInput[0] + nInput[1];
// write sum of result to shm
memcpy(pBuf, &result, 4);
// shmdt(send)
shmdt(pBuf);
sem_post(semB); // notice A we have already computed result.
```
### Socket Progrmming
- 透過網路傳輸資料的最基本方式
- 各種類型資料,大小都適用
- 優點:
- 可以一對一 (TCP) 或是一對多 (UDP)
- 可以跨伺服器
- 缺點:
- Socket Programming 稍微複雜一些
- Sample code
- To be Continued ...
## Final ConClusion
| IPC |Description | blocking? |Limit |Situation |
|:------ |:----------- |:----------- |:----------- |:-----------|
| File | 多對多 | ?? | 自行處理race condition | file exist after shutdown|
| FIFO | 一對一 | blocking | 先進先出 | 任意兩個process之間簡單傳資料|
| Signal | 一對多 | blocking | 無法傳資料 | 簡單的信號通知|
| Semaphore | 多對多 | ?? | 無法傳資料 | "PV" Problem|
| PIPE | 一對一<br>親緣Process | read(blocking) | 先進先出 | 父子進程,"兄弟進程"<--sibling??|
| Message Queue | 多對多<br>可指定接收者 | non-blocking<br>blocking | 數據量有限(8192 bytes) | 非同步,資料會被保存,可挑選自己要的資料|
| Shared Memory | 多對多 | ?? | 自行處理race condition | 速度最快,靈活度高|
| Socket Programming | 一對一(TCP)<br>一對多(UDP) | ?? |同台電腦內,大炮打小鳥 | 網路傳輸|