---
tags:
---
# Desktop Search and PageRank Parallelization
## 概念發想
原先想要做的是有關於 Linux Kernel 內 cmwq 方面的探討,研究相關機制,精簡化後實作出 [Thread Pool,然後應用在某個場景](https://hackmd.io/@qJEZpNvuSHe0kzJAmMSeQg/Hkvz5vMVs)。後來與老師討論,不知道這個 Thread Pool 究竟要應用在哪個地方,之後就把腦筋動到了 PageRank 演算法上。
### PageRank 演算法的探討
PageRank 計算會用到的方法有 iterative method。
:::info
我們可以從線性代數方面去理解:
[Linear Algebra with Applications:Eigenvalues and Page Rank](https://dm561.github.io/assets/dm561-pagerank.pdf)
:::
就如同大學一年級大家都學過,透過算出 eigenvalues 可以得到最終 PageRank 的值。我們無法實際上去這樣子算 PageRank,算出大型矩陣的 eigenvalues 非常耗時。
>[Powers of Matrices and Markov Matrices - by Gilbert Strang](https://www.youtube.com/watch?v=xtMzTXHO_zA&list=LL&index=2&t=655s&ab_channel=MITOpenCourseWare)
同時,老師介紹了 Hadoop's 對於 PageRank 平行化的做法。
:::info
**Hadoop's implementation**
[**MapReduce in C from Scratch Using Threads: Map**](https://towardsdatascience.com/mapreduce-in-c-from-scratch-using-threads-map-c6b1b01f020c)
:::
所以比較可行的實作是 Hadoop's 的做法,map-reduce 與平行化。所以,從以上資料,我認為,如果只是單純加速 PageRank 的運算,那不需要用到 Thread Pool(如同 [2021q1 Homework4 (quiz4)](https://hackmd.io/qLfC4JoBQ4qDNnaDW0Smig?view) 一樣,只用來計算 pi。
### 應用場景
老師提到了關鍵字:檔案總管,本地端搜尋引擎,[Desktop Search](https://en.wikipedia.org/wiki/Desktop_search)。
還有以下的資料:
:::success
概念可參照 Microsoft 相當創新 (但未能商業化) 的 **[WinFS](https://en.wikipedia.org/wiki/WinFS)**,不精準來說,WinFS 是 Dropbox 和關聯性資料庫的混合體 —— 允許你隨時檢索本地端 (或你可存取的有效裝置) 的檔案內容,就像在網際網路的體驗。
在 GNU/Linux 上,儘管有些開放原始碼的成果,但層次侷限在 file indexing,如:
* [Recoll](https://www.lesbonscomptes.com/recoll/pages/index-recoll.html): Full-text search for your desktop
* [Tracker](https://wiki.gnome.org/Projects/Tracker)
:::
所以應用 PageRank 在本地端,監視本地端檔案的桌面搜尋引擎,這樣的一個初期概念就有了。這就是我們要做的東西。
原先非常初步的討論:
:::success
linux kernel module: IO ---> 切割資料,資料有變動
演算法:user space nmap, mmap
poc - inotify 追蹤檔案系統變化:一邊讀取檔一邊建 graph
:::
---
### PageRank 的進一步探討
所以我們決定用建立 Graph 的方式,進一步計算 PageRank,而非使用矩陣。不過一些演算法的方面可以透過矩陣運算去理解。例如 Edge weighted PageRank:
:::info
透過 Edge Weighted PageRank,我們可以讓比較頻繁被觸擊到的 node,PageRank 的值較高:

例如,node0 被 node1 access 了一次,就要找尋 node1 -> node0 的 edge 然後將 reference count +1,所以 edgeweight +1;如果是增加一個 node,也需要將 node 放上 graph,然後更新該新的 node 對於其他所有 node 包含連向它自己的本身的 edge,包含 edge weight 的數值。
更新 graph 了之後就會需要重新計算 PageRank。
:::
不過在我第一次實作的過程中,發現 PageRank 在計算過程中,在一次的 iteration 內,幾乎沒有辦法進行資料分割後,做並行的處理。否則計算出來的結果不會正確。所以 PageRank 要平行化,只能在一次 iteration 內做,分割資料,處理計算,然後 merge 完所有資料後,在下一次 iteration 內,重複進行上述動作。也因為這樣的特質,PageRank 的平行化難以實作,或是以比較優美的方式去實作。
之後又參考了以下有關於如何有效率的計算 PageRank 的平行化演算法。
:::info
**Computing PageRank for each vertex**
[Parallel PageRank: An overview of algorithms and their performance](https://medium.com/analytics-vidhya/parallel-pagerank-an-overview-of-algorithms-and-their-performance-ce3b2a2bfdb6)
:::
### Map-reduce
進一步探討 [map-reduce](https://github.com/sysprog21/concurrent-programs/tree/master/map-reduce) 機制並著手改寫。
以下內容在後來的實作中,幾乎全部被捨棄,不過幫助了我思考 PageRank 演算法與如何實作平行化,所需要的資料結構以及 map-reduce 原理。
---
#### Parallel Read file
先利用 mmap 映射整塊記憶體,讀取,之後再分割 buffer 大小讀取。
(for workers)
```c
/* Initialize file access for worker threads.
* Should be called by main thread. Return 0 on success.
*/
static inline int fa_init(char *file, uint32_t n_threads, off_t *fsz)
{
/* Opening file */
if ((fd = open(file, O_RDONLY)) < 0) {
perror("open");
return -1;
}
/* Get file size */
if ((file_size = lseek(fd, 0, SEEK_END)) < 0) {
perror("lseek");
return -1;
}
file_content = mmap(NULL, file_size, PROT_READ, MMAP_FLAGS, fd, 0);
if (file_content == MAP_FAILED) file_content = NULL;
*fsz = file_size;
return 0;
}
/* Initialize file read access.
* Should be called by worker threads. Return 0 on success.
*/
static inline int fa_read_init()
{
if (!file_content && !(worker_buffer = malloc(BUFFER_SIZE))) return -1;
return 0;
}
/* destroy fa_read_init allocated ressource */
static inline int fa_read_destroy()
{
free(worker_buffer);
return 0;
}
/* Read worker part of the file. Should be called by worker threads. */
static inline off_t fa_read(uint32_t id, char **buff, off_t size, off_t pos)
{
off_t size_read;
if (file_content) {
if (pos >= file_size) /* EOF */
return 0;
*buff = (char *) file_content + pos;
off_t end = pos + size;
size_read = (end > file_size) ? (end - file_size) : size;
return size_read;
}
off_t size_to_read = BUFFER_SIZE < size ? BUFFER_SIZE : size;
if ((size_read = pread(fd, worker_buffer, size_to_read, pos)) == -1) {
perror("pread");
return -1;
}
*buff = worker_buffer;
return size_read;
}
```
:::success
**why `pread()` instead of `read()`?**
reference: https://stackoverflow.com/questions/1687275/what-is-the-difference-between-read-and-pread-in-unix
:::
之後底下三個 function 用來讀記憶體資料內,完整的數字 (如:1032)或是文字 (如:hello)。
其中 `isdigit()` 可以判斷該 letter 是否為 digit。
可以在 `add_sep()` 做改寫,讓讀完 word 後,如果是數字,則 `atoi` 回傳數字出去。
在讀取 graph 的時候考慮 edge 的權重,from_id, to_id 等等變數,都會是數字,所以我覺得可以用這種方式去讀取資料。
```c
char *file_name;
uint32_t n_threads;
struct thread_info {
pthread_t thread_id; /* ID returned by pthread_create() */
int thread_num; /* Application-defined thread # */
};
#define BETWEEN(_wd, _min, _max) ((_wd >= _min) && (_wd <= _max))
#define IS_LETTER(c) (BETWEEN(*buff, 'A', 'Z') || BETWEEN(*buff, 'a', 'z'))
static __thread off_t worker_slice, worker_current;
static __thread uint32_t count = 0, wsize = 0;
static __thread char *word = NULL;
/* The next three funcitons handle a buffer of the file.
* Note that a buffer may end in the middle of word.
* The word will be completed on the next call to the func.
*/
static int add_letter(char c)
{
if ((count > wsize - 1) || !wsize) {
wsize += MAX_WORD_SIZE;
char *orig = word;
if (!(word = realloc(word, wsize))) {
free(orig);
return -1;
}
}
word[count++] = c;
return 0;
}
static inline int add_sep(uint32_t tid)
{
if (count) {
word[count] = '\0'; /* Add current word */
// if (wc_add_word(tid, word, count)) return -1;
count = 0;
}
return 0;
}
static int buff_proceed(uint32_t tid, char *buff, size_t size, char last)
{
for (; size; size--, buff++) {
if (!isdigit(*buff)) { // Support digit letter
if (add_sep(tid)) /* Not a letter */
return -1;
continue;
}
if (add_letter(*buff)) /* Is a letter */
return -1;
}
if (last) /* If this is the last buffer, end the word (if any) */
add_sep(tid);
return 0;
}
```
底下的函式跟,用來調整因為依據每個 worker 的 buffer 大小切割讀取範圍,過程中,將數字 (如:1032)或是文字 (如:hello) 破壞掉的情況,做細部調整。
:::warning
改進空間:
`add_letter` 會逐個字元複製,這可以改成紀錄 word 首尾在 file_content 的 `buff` 裡面的位置,還有 word 的 size,再一次複製過去。
:::
:::danger
實驗結果:無顯著差異(不確定是否為有效實驗結果,可能要重新測試)。
:::
```c
/* Configure the buffer slices of each worker */
static int buff_init(uint32_t tid)
{
if (fa_read_init()) return -1;
worker_slice = file_size / n_threads;
worker_current = worker_slice * tid;
/* Last thread handle remaining bytes */
if (tid == (n_threads - 1)) worker_slice += file_size % n_threads;
off_t worker_end = worker_current + worker_slice;
/* Balance worker_slice to include words at the ends.
* skip first letters if we are not the first thread.
*/
char *buff;
do {
if (tid == 0) break;
if (fa_read(tid, &buff, 1, worker_current) != 1) return -1;
if (!isdigit(*buff)) break; // Support digit letter
worker_current++;
worker_slice--;
} while (*buff);
/* add letters of the last word if we are not the last thread */
do {
if (tid == (n_threads - 1)) break;
if (fa_read(tid, &buff, 1, worker_end) != 1) return -1;
if (!isdigit(*buff)) break; // Support digit letter
worker_end++, worker_slice++;
} while (*buff);
return 0;
}
static int buff_destroy()
{
free(word);
if (fa_read_destroy()) return -1;
return 0;
}
/* Read a buffer from the file */
static int buff_read(uint32_t tid, char **buff, off_t *size, char *last)
{
if (!worker_slice) return 0;
off_t size_read = fa_read(tid, buff, worker_slice, worker_current);
if (size_read == -1) return -1;
*size = size_read;
worker_current += size_read, worker_slice -= size_read;
if (!worker_slice) *last = 1;
return 0;
}
```
之後考慮的就剩下,讀取什麼樣的資料,利用什麼樣的資料以及資料結構建立 graph。
原先想法:
:::warning
資料格式
```
100:81:2'\0'
from_id:to_id:edge_weight`\0`
```
```
from_id:to_id:to_id:...
```
一行的結尾加上辨識符號,可能是換行 `\n`。
所以說 `buff_proceed()` 這個函式要重寫,還有 `add_sep` 應該也要修改。
目前 naive 的實作想法:
```c
static __thread uint32_t args[3], idx;
static int buff_proceed(uint32_t tid, char *buff, size_t size, char last)
{
// args[0]: from_id,
// args[1]: to_id,
// args[2]: edge_weight
idx = 2;
for (; size; size--, buff++) {
if (!isdigit(*buff)) { // Support digit letter
if (add_sep(tid)) /* Not a letter */
return -1;
args[idx] = atoi(word);
if (!idx) {
if (graph_add_edge(args));
idx = 2;
memset(args, 0, sizeof(args));
}
--idx;
continue;
}
if (add_letter(*buff)) /* Is a letter */
return -1;
}
return 0;
}
```
然後就是讀檔案建立 graph 的 map-reduce。
然後考慮到 cpu affinity。
```c
#include <sched.h>
static __thread cpu_set_t cpuset;
void *mr_map(void *id)
{
uint32_t tid = ((struct thread_info *) id)->thread_num;
CPU_ZERO(&cpuset);
CPU_SET(tid % sysconf(_SC_NPROCESSORS_ONLN), &cpuset);
pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
int ret = buff_init(tid);
if (ret) goto bail;
char *buff;
off_t size = 0;
char last = 0;
while (!(ret = buff_read(tid, &buff, &size, &last))) {
if (!size) break;
if (buff_proceed(tid, buff, size, last)) goto bail;
if (last) /* If this was the last buffer */
break;
}
if (buff_destroy()) goto bail;
/* wait for other worker before merging */
if (pthread_barrier_wait(&barrier) > 0) {
perror("barrier wait");
goto bail;
}
/* merge results (done in parrallel) */
bail:
return ((void *) (long) ret);
}
```
### pagerank graph 的資料結構
:::info
* **Hadoop's data structure:** adjacency list, key/value pairs, key is the name of the page, value is the out-links.
:::
:::warning
是否會用到 string hash?
:::
```c
typedef struct _vertex {
double p0;
double p1;
uint32_t outDegree;
// uint32_t inDegree;
// uint32_t *to_id;
// list to_id;
list from_id_list; // with edge weight
} vertex;
```
```c
typedef struct _graph {
uint32_t v_size;
// uint32_t e_size;
vertex *v;
} graph;
```
`graph` 裡面的 `vertex *v` 會是一個 array,所以 vertex 本身的 id 就是 vertex array 的 index。
:::warning
* With edge weight - 讓 access 的次數影響 initial pagerank value: [Application of Markov Chain in the PageRank Algorithm](https://www.researchgate.net/publication/256089617_Application_of_Markov_Chain_in_the_PageRank_Algorithm) at page 5
* 0.85 * (該 page 被 access 的次數 / 所有 page 被 access 的次數) + 0.15 * 1/number of pages
:::
### parallel PageRank
```c
static __thread uint32_t graph_slice, graph_current;
static __thread uint32_t graph_size;
static int pgrk(uint32_t tid)
{
graph_slice = file_size / n_threads;
graph_current = graph_slice * tid;
/* Last thread handle remaining vertices */
if (tid == (n_threads - 1)) graph_slice += graph_size % n_threads;
uint32_t graph_end = graph_current + graph_slice;
for (int i = graph_current; i < graph_end; i++) {
/*
* pagerank algorithm
*
*/
}
}
void *mr_pgrk(void *id)
{
uint32_t tid = ((struct thread_info *) id)->thread_num;
// pgrk(tid);
/* wait for other worker before merging */
if (pthread_barrier_wait(&barrier) > 0) {
perror("barrier wait");
goto bail;
}
/* merge results (done in parrallel) */
bail:
return ((void *) (long) -1);
}
```
這個 PageRank 的 map-reduce 要重複在迴圈內做,一直 map/reduce,直到結果在 tolerence 的範圍之內。或者是讓迴圈設定一個固定的次數。
---
### Kernel module to moniter events in file system
因為我們要做的事 Desktop Search,桌面檔案搜尋服務,所以原先想要讓監視桌面下所有 subdirectories 以及檔案生成,後建立 Graph 這樣子的程式在 Kernel Space 下運作,不過後來覺得可以先在 user space 實作這部分所以沒有在此做進一步探討,稍微感到可惜。
有關實作這部分,以下為關於 inotify、file tree 以及 IO multiplexing 方面的知識,以及相關的 library。
:::success
lkmpg: https://sysprog21.github.io/lkmpg/
epoll: https://man7.org/linux/man-pages/man7/epoll.7.html
inotify manual - https://man7.org/linux/man-pages/man7/inotify.7.html
stat.c - https://man7.org/linux/man-pages/man2/lstat.2.html
list directories - https://stackoverflow.com/questions/8436841/how-to-recursively-list-directories-in-c-on-linux
nftw - https://linux.die.net/man/3/nftw
:::
:::info
fanotify - reference: https://stackoverflow.com/questions/1835947/how-do-i-program-for-linuxs-new-fanotify-file-system-monitoring-feature
fanotify does not support removing, deleting, creating events. - https://www.linux-magazine.com/Issues/2017/194/Core-Technologies
:::
#### Relative implementation in user space
```c
#include "list.h"
typedef struct watched_dirs {
int wd;
char name[256];
struct list_head list;
} W_DIRS;
```
Data structure: watched directories.
用 `struct list_head` 來紀錄所有 inotify 監視的目標。
```c
static int wd_add(int fd, struct list_head *list, const char *path)
{
W_DIRS *new = malloc(sizeof(W_DIRS));
strcpy(new->name, path);
new->wd = inotify_add_watch(fd, new->name, IN_ALL_EVENTS);
list_add(&new->list, list);
return 0;
}
```
```c
static W_DIRS *wd_find(const struct inotify_event *e, struct list_head *list)
{
W_DIRS *wd;
struct list_head *entry;
list_for_each(entry, list) {
wd = list_entry(entry, W_DIRS, list);
if (wd->wd == e->wd) break;
}
return wd;
}
static void wd_find_path(char *buf, const struct inotify_event *e, struct list_head *list)
{
W_DIRS *wd = wd_find(e, list);
strcpy(buf, wd->name); /* strcpy unsafe function */
strcat(buf, "/");
strcat(buf, e->name);
strcat(buf, "\0");
}
```
找出事件發生的地方 (檔案路徑/目錄路徑)。
在 `IN_CREATE` 事件發生的時候,用來找出新生成的 directory 的路徑,從而能夠監視新生成的 directory。
```c
static int wd_watch(const char *fpath, const struct stat *sb,
int tflag, struct FTW *ftwbuf)
{
if ((tflag == FTW_D) || (tflag == FTW_DNR) || (tflag == FTW_DP)) {
wd_add(inotify_fd, &wds_list, fpath);
}
return 0;
}
```
`wd_watch` 是用來在一開始的時候透過 `nftw`,將所有子目錄納入 inotify 的監視目標中。在 `main` function 中的實作如下。
```c
/* Mark directories for all events */
int flags = 0;
for (i = 1; i < argc; i++) {
if (nftw((argc < 2) ? "." : argv[1], fwatch, 20, flags) == -1) {
perror("nftw");
exit(EXIT_FAILURE);
}
}
```
使用 Epoll 監聽事件發生。然後讀取 inotify input file descripter.
```c
static int epoll_add(int epoll_fd, int file_fd)
{
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN;
ev.data.fd = file_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, file_fd, &ev)) {
perror("epoll_ctr add file_fd failed.");
return 1;
}
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLOUT;
ev.data.fd = STDIN_FILENO;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &ev)) {
perror("epoll_ctr add stdin failed.");
return 1;
}
return 0;
}
```
最重要的 function: event handler
```c
static void wd_handle_events(int fd, struct list_head *list)
{
char buf[4096]
__attribute__ ((aligned(__alignof__(struct inotify_event))));
const struct inotify_event *event;
for (;;) {
len = read(fd, buf, sizeof(buf));
if (len == -1 && errno != EAGAIN) {
perror("read");
exit(EXIT_FAILURE);
}
if (len <= 0)
break;
/* Loop over all events in the buffer. */
for (char *ptr = buf; ptr < buf + len;
ptr += sizeof(struct inotify_event) + event->len) {
event = (const struct inotify_event *) ptr;
/* Print event type. */
/* IS_DIR */
if (event->mask & IN_CREATE)
if (event->mask & IN_DELETE)
if (event->mask & IN_ACCESS)
if (event->mask & IN_OPEN)
if (event->mask & IN_CLOSE_NOWRITE)
if (event->mask & IN_CLOSE_WRITE)
/* Print the name of the watched directory. */
struct list_head *entry;
list_for_each(entry, list) {
W_DIRS *tmp = list_entry(entry, W_DIRS, list);
if (tmp->wd == event->wd) {
printf("%s/", tmp->name);
break;
}
}
/* Print the name of the file. */
if (event->len)
printf("%s", event->name);
/* Print type of filesystem object. */
if (event->mask & IN_ISDIR)
printf(" [directory]\n");
else
printf(" [file]\n");
}
}
}
```
### 被捨棄的想法
以下關於 PageRank 實作的資料結構與演算法的想法被捨棄。

* vertices are scheduled for thier in-degree
data copy 很大 -> kernel space
inotify 用途 -> web server,
* data structure of pgrk
* in-degree number to justify low-degree/high-degree vertex
*

* data structure of pagerank vertex
:::warning
* do we use linked list for the inlink and the outlink or we just use array?
* do we need `edge`?
* do we need to consider the weight of the edge?
*
:::
:::info
* vertices are range-partitioned or hash-partitioned
* use contiguous memory to speed up the process.
* swap data, reduce DRAM overhead
* so the `p0` `p1` variable in the data structure will be replaced by another data structure ( or not)
* high-degree nodes and low-degree nodes (in-degree)
* priority in scheduling
* high priority and low priority
* min-indegree
* do I need a min heap? no.
* use a linked list to store the partitioned graph vertices, so do not worry about the size of the partitioned graph
* or the partition vertecies need to be sorted first? right, use linked list to sort from min to max.
* concurrency? maybe not. we already have shortest job first approach design(min in-degree vertex first).
* same vertex, same thead, same partition
* same inDegree for each vertex & same list-order
* calculate weight for each edges. (push method)
* push vs pull? (in parallel environment)
Reference:
https://liuzhidan.github.io/files/2020-ICPADS-APPR.pdf
https://arxiv.org/pdf/2203.07937.pdf
:::
---
### Pushed Based PageRank
在 [PageRank 的近一步探討](https://hackmd.io/cvAPBfr1T6WF25XxDlGfmg?view#PageRank-%E7%9A%84%E9%80%B2%E4%B8%80%E6%AD%A5%E6%8E%A2%E8%A8%8E) 的 reference 有很好的介紹 PageRank 平行化演算法以及優劣。我們實作以 Pushed-based PageRank 為主。
:::info

Reference:
[Scalable Data-driven PageRank: Algorithms, System Issues, and Lessons Learned](https://www.cs.utexas.edu/~inderjit/public_papers/scalable_pagerank_europar15.pdf)
:::
如果用平行的方式去改善 PageRank 的效能,必須要平均分配工作量給予每個 Thread。
以 Pushed Based algorithm,對於每個 Vertex 而言,PageRank 計算的工作量取決於 outlinks 的數量,導致處理每個 Vertex 的工作量不相同,所以如果只是單純的根據 Thread 的數量平均分配處理計算各個 Vertices (hadoop's map reduce 的做法),那顯然這種做法太過於 Naïve。
但是如果要用 outlinks,來平均分配做法,那麼在進行 parallel 之前的前置作業 --- 平均分配工作,便會顯得十分繁瑣,難以進行一次 parallel 並且在做到 scan 整個 graph 一次的前提下做到。
不過,我們可以注意到,這個演算法的這個 function 本身是一個生產者也是一個消費者,使用 residual 計算 PageRank,並且在最後的 `if` 條件式為一個 filter,去決定這個 PageRank 最終 converge 的速度,所以後續如果要實作並行的程式設計,這個演算法有實作的潛力。
以下是 Push Based PageRank algorithm implemented in C
```c
static int first_iter(double alpha, uint32_t tid)
{
struct list_head *it;
struct g_vertex *v, *w;
list_for_each(it, &thread_worklists[tid]) {
v = list_entry(it, struct g_vertex, worklist);
for_each_in_neighbor(w, v) {
v->r += (double) 1 / (double) w->outDegree;
}
v->r = (1 - alpha) * alpha * v->r;
}
return 0;
}
#define for_each_out_neighbor(outlinks, vertex) \
for ((outlinks) = vertex->out[0]; \
(outlinks) < vertex->out[vertex->outDegree]; \
(outlinks)++)
static int pgrk_one_vertex(struct g_vertex *v, double alpha, double epsilon,
uint32_t tid)
{
double r_old;
struct g_vertex *w;
for_each_out_neighbor(w, v) {
r_old = w->r;
w->r += alpha * v->r / (double) v->outDegree;
if (w->r >= epsilon && r_old < epsilon)
list_add(&w->worklist, &thread_worklists[tid]);
}
}
static int pgrk_push_based (double alpha, double epsilon, uint32_t tid)
{
struct list_head *curr;
while (curr = list_pop(&thread_worklists[tid])) {
struct g_vertex *v = list_entry(curr, struct g_vertex, worklist);
v->p += v->r;
pgrk_one_vertex(v, alpha, epsilon, tid);
}
}
```
所以或許可以要回歸到使用 Thread Pool 的方式進行。
原因是因為,根據 [Scalable Data-driven PageRank: Algorithms, System Issues, and Lessons Learned](https://www.cs.utexas.edu/~inderjit/public_papers/scalable_pagerank_europar15.pdf) 這篇論文,PageRank 的計算無法實現並行的問題,可以在 Push-based PageRank 演算法解決。
否則原先沒有 Push-based PageRank 實行 Thread Pool 方案那麼其中會遇到一個問題:該如何讓所有 Threads 在提取完 worklist 的所有工作,一次計算完整個 Graph 的每個 vertex 的 PageRank 的值一次,停下來之後,再進行下一次的整個 Graph 的 PageRank 的計算?
更進一步來說,在計算 PageRank 的值的過程中,便會判斷檢查此次的某個 Vertex 的 PageRank 是否還需要進行下一次的 PageRank 計算,然後 Push 到 worklist 裡面去,但是我們不能確保執行順序,認為前一次計算 PageRank 的 worklist 在此時已經全部執行完畢,所以要避免在進行第一次 PageRank 的過程中計算到第二次 PageRank 的值,要讓所有 Threads 在執行完一次整個 Graph 的計算後全部同步。
* 是否需要一直 fork join 來解決這個問題?
* 用兩個 worklist
* 考量桌面端檔案的 Graph 結構
不過如果使用 Thread Pool,那勢必會有 Threads 一同競爭 Jobqueue 裡面的 task 的問題,劣勢在於,需要使用 lock 來確保以 linked list 為資料結構的 worklist 在被 worker thread 提取工作的時候不會發生衝突。
### Priority scheduling
以下內容日後再做探討。
large residual node first to converge faster.
:::info
https://en.wikipedia.org/wiki/Desktop_search
https://ieeexplore.ieee.org/document/494601
https://research.google/pubs/pub41344/
:::
---
### 實作討論
平行化計算回歸到 indexed based parallel method
* 不能保證 load-balancing
* 較為 Naïve 的實作方式
Hash Table/Tree structure 的實作
* 運用 nftw 建立 files 的樹狀對應關係
* 字串處理
### Hash Table Implementation
**Data Structure**
```c
typedef uint32_t hash_t;
struct ht_node {
hash_t hash;
struct hlist_node hlist;
};
/* user-defined functions */
typedef int hashfunc_t(void *key, hash_t *hash, uint32_t *bkt);
typedef int cmp_t(struct ht_node *n, void *key);
/* hash table */
struct htable {
hashfunc_t *hashfunc;
cmp_t *cmp;
uint32_t n_buckets;
uint32_t size;
struct hlist_head *buckets;
};
```
* 採用 `linux/list.h` hlist 實作 Hash Table。
* 其中 `hashfunc_t` Hash function 需要更好的實作,否則會出現很多碰撞,分布不均。
* Resizable HashTable 實作
*
### Graph Implementation
**Data Structure**
```c
struct g_vertex;
struct g_edge {
struct g_vertex *to;
uint32_t click;
};
struct g_vertex {
double r, p;
char name[256], *full_name; /* full name should contains base + filename */
struct g_vertex *in[20]; /* resizeable array will be better */
struct g_edge out[20];
uint32_t inDegree;
uint32_t outDegree;
struct list_head worklist;
struct ht_node node;
};
```
* `struct g_vertex *in[]` 的用意在於實作 Push-based 演算法需要用到 inlink 進行初次 residual 計算。
* `struct ht_node node` 為 Hash table,進行 Mapping 以及後續的資料分割平行化。
* `struct list_head worklist` 用來後續平行化進行 PageRank 計算。
### 走訪 Directory tree 建構 Graph 與 HashTable
經由 DFS 走訪一次。由 `nftw` 實作 treewalk 並且同時建構 Graph 的實作困難重重,必須要考慮到 directory 的 name 以及 basename,還有 directory 彼此之間連結關係。最後決定重寫一個 DFS。
不過 `nftw` 可以用來做 `inotify` API 監視的 directory list 是沒問題的。
```c
void g_tree(struct g_vertex *entry, char *name, char *base, int root) {
DIR *dir = opendir(base);
if (!dir) return;
char path[256];
struct dirent *dp;
while ((dp = readdir(dir)))
{
if (strcmp(dp->d_name, ".") && strcmp(dp->d_name, "..") && (dp->d_name[0] != '.')) {
strcpy(path, base);
strcat(path, "/");
strcat(path, dp->d_name);
if(dp->d_type != DT_REG) {
struct g_vertex *new = g_add_name(dp->d_name, strlen(dp->d_name));
entry->out[entry->outDegree].to = new;
entry->outDegree++;
new->in[new->inDegree] = entry;
new->inDegree++;
tree(new, dp->d_name, path, root + 1);
}
}
}
closedir(dir);
}
```
進行 hash 的 mapping。
```c
#include <sys/types.h>
#include <dirent.h>
struct g_vertex *g_add_name(char *name, uint32_t count)
{
struct g_vertex *g;
struct g_cache *cache = &main_cache;
struct ht_node *n;
if (!(n = ht_find(&cache->htable, name))) {
if (!(g = calloc(1, sizeof(struct g_vertex)))) return NULL;
if (count > (256 - 1)) g->full_name = calloc(1, count + 1);
w_strncpy(GET_NAME(g), name, count);
ht_insert(&cache->htable, &g->node, name);
} else
g = container_of(n, struct g_vertex, node); /* This else statement is redundant */
return g;
}
```
### 討論&後續開發
海報報告: https://docs.google.com/presentation/d/1pJtV6l28bdByKvjLeUvghjHuuGbu4j0GjmYurgRCrWc/edit
流程圖:

---
```graphviz
digraph ele_list {
node[shape=record];
h [label="監控目標目錄下的檔案事件", style="bold"];
h->e1 [label="Y/N"]
e1 [label="更新用來計算 PageRank 的 Graph", style="bold"];
e2 [label="計算 PageRank", style="bold"];
e1->e2
h0 [label="掃瞄目標目錄下的所有檔案", style="bold"];
h0->h1
h1 [label="依據目標目錄下的所有檔案關係建立 Graph", style="bold"];
}
```
TODO: 流程圖 embedded in Graphviz
TODO: 整理標頭檔 並且讓程式碼「乾淨」一點 放上 GitHub
TODO: 整理程式碼並且進行說明
TODO: 改進程式
:::success
主要初步改進空間:
1. 檔案命名用獨立的資料結構,解決 filepath:name+base 的空間浪費以及檔案重名問題。
2. Resizable 的資料結構,包含 HashTable,Graph 的 inlinks、outlinks 等等。
3. Hash function
4. Inverted Index Tree 與最終搜尋檔案的介面實作,這部分本身是一個比較大的問題,如果要在最後實作出合適的成果,這部分會是之後主要需要實作的點。我對於這部分沒有詳細的探討,需要更多時間或是別人來幫助我完成。
5. Parallel PageRank 的效能比較。
6. Pushed-based PageRank 的完善。
7.
:::
:::danger
服兵役中,八月底之後再來改。
* 先將 PageRank 平行化 與 Desktop full text Search 分成兩個專案。以檔案系統來說,用 PageRank 作為檔案系統 Ranking 似乎沒有那麼恰當,用 BM25 或是 TF-IDF 或許會是比較合理的作法。
* 試著用 bktree (symspell ? ) / trie / radix tree ... 取代 hash table,作為檔案查詢的資料結構。
:::