# 2021q1 Homework4 (quiz4)
contributed by < `ilkclord` >
## Problem `1`
### Struct
* **thread_task_t**
Store each task with the funtion needs to execute .
```clike
typedef struct __threadtask {
void *(*func)(void *);
void *arg;
struct __tpool_future *future;
struct __threadtask *next;
} threadtask_t;
```
* **jobqueue_t**
A job queue with mutex to ensure that only one function doing on the queue in the same time .
```clike
typedef struct __jobqueue {
threadtask_t *head, *tail;
pthread_cond_t cond_nonempty;
pthread_mutex_t rwlock;
} jobqueue_t;
```
* **__tpool_future**
``result`` store the result
``flag`` shows the task state
```clike
struct __tpool_future {
int flag;
void *result;
pthread_mutex_t mutex;
pthread_cond_t cond_finished;
};
```
* **__threadpool**
`count` record the muber of workers (threads)
`workers` threads that do the task
`jobqueue` a linked list of task
```clike
struct __threadpool {
size_t count;
pthread_t *workers;
jobqueue_t *jobqueue;
};
```
### Code flow
4 threads start running at the background
```graphviz
Digraph G{
node[shape = box];
tpool_create -> jobfetch
tpool_create -> jobfetch2
tpool_create -> jobfetch3
tpool_create -> jobfetch4
rankdir = LR
}
```
`tool_apply` adds task into the queue
`jobfetch` pops task out and do it
```graphviz
Digraph G{
node[shape = box]
jobqueue->jobfetch
jobqueue->jobfetch2
jobqueue->jobfetch3
jobqueue->jobfetch4
tpool_apply->jobqueue
rankdir = LR
}
```
In the end , call the `tpool_join `to join each threads , and release the memory .
```graphviz
Digraph G{
node[shape = box]
jobfetch->tpool_join
jobfetch2->tpool_join
jobfetch3->tpool_join
jobfetch4->tpool_join
tpool_join->end
rankdir = LR
}
```
### Function explaination
* **jobqueue_create**
Initialize the `jobqueue`
```clike
static jobqueue_t *jobqueue_create(void)
{
jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
if (jobqueue) {
jobqueue->head = jobqueue->tail = NULL;
pthread_cond_init(&jobqueue->cond_nonempty, NULL);
pthread_mutex_init(&jobqueue->rwlock, NULL);
}
return jobqueue;
}
```
* **jobqueue_destroy**
Release the memory used by `jobqueue`
```clike
static void jobqueue_destroy(jobqueue_t *jobqueue)
{
threadtask_t *tmp = jobqueue->head;
while (tmp) {
jobqueue->head = jobqueue->head->next;
pthread_mutex_lock(&tmp->future->mutex);
if (tmp->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&tmp->future->mutex);
pthread_mutex_destroy(&tmp->future->mutex);
pthread_cond_destroy(&tmp->future->cond_finished);
free(tmp->future);
} else {
tmp->future->flag |= __FUTURE_CANCELLED;
pthread_mutex_unlock(&tmp->future->mutex);
}
free(tmp);
tmp = jobqueue->head;
}
pthread_mutex_destroy(&jobqueue->rwlock);
pthread_cond_destroy(&jobqueue->cond_nonempty);
free(jobqueue);
}
```
* **__jobqueue_fetch_cleanup**
Unlock the specific mutex which depends on input (`arg`)
```clike
static void __jobqueue_fetch_cleanup(void *arg)
{
pthread_mutex_t *mutex = (pthread_mutex_t *) arg;
pthread_mutex_unlock(mutex);
}
```
* **jobqueue_fetch**
It can be consider as the Worker function of this program .`pthread_cleanup_push` makes sure that the mutex is unlocked ,and we go into the first while-loop .The while-loop will be running if the jobqueue is empty ,and it will wait and sleep untill the cond_t brocast . After the brocast , it will lock the mutex immediately , and lock the mutex during the pop-queue process .Then it will do the func in the task
This function will keep running with this order
```graphviz
Digraph G{
Waiting->"get the task" ->"do the task" ->"Waiting"
rankdir = LR ;
}
```
```clike
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
while (!jobqueue->tail)
pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= KKK;
task->future->result = ret_value;
LLL;
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
}
```
* **tpool_create**
Create the thread pool , and start the parrallelism program
```clike
struct __threadpool *tpool_create(size_t count)
{
jobqueue_t *jobqueue = jobqueue_create();
struct __threadpool *pool = malloc(sizeof(struct __threadpool));
if (!jobqueue || !pool) {
if (jobqueue)
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
pool->count = count, pool->jobqueue = jobqueue;
if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
(void *) jobqueue)) {
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
return pool;
}
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
```
* **tpool_apply**
It can be consider as Producer function which push the task with it's arg into the queue
```clike
struct __tpool_future *tpool_apply(struct __threadpool *pool,
void *(*func)(void *),
void *arg)
{
jobqueue_t *jobqueue = pool->jobqueue;
threadtask_t *new_head = malloc(sizeof(threadtask_t));
struct __tpool_future *future = tpool_future_create();
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
HHH;
}
pthread_mutex_unlock(&jobqueue->rwlock);
} else if (new_head) {
free(new_head);
return NULL;
} else if (future) {
tpool_future_destroy(future);
return NULL;
}
return future;
}
```
* **tpool_join**
The end of the program , make all threads join and stop running . It also free the memory used by pool .
```clike
int tpool_join(struct __threadpool *pool)
{
size_t num_threads = pool->count;
for (int i = 0; i < num_threads; i++)
tpool_apply(pool, NULL, NULL);
for (int i = 0; i < num_threads; i++)
pthread_join(pool->workers[i], NULL);
free(pool->workers);
jobqueue_destroy(pool->jobqueue);
free(pool);
return 0;
}
```
* **bbp**
Cacualte the specific digit of pi , which is designed by the formula of bbp
```clike
static void *bpp(void *arg)
{
int k = *(int *) arg;
double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) -
(1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6));
double *product = malloc(sizeof(double));
if (product)
*product = 1 / pow(16, k) * sum;
return (void *) product;
}
```
### Timeout
```
==14488== Memcheck, a memory error detector
==14488== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==14488== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info
==14488== Command: ./pi_time
==14488==
==14488== Thread 2:
==14488== Invalid read of size 4
==14488== at 0x486D7B4: __pthread_mutex_unlock_usercnt (pthread_mutex_unlock.c:41)
==14488== by 0x486D7B4: pthread_mutex_unlock (pthread_mutex_unlock.c:357)
==14488== by 0x109C48: jobqueue_fetch (in /home/ilkclord/linux2020/quiz4/pi_time)
==14488== by 0x4869608: start_thread (pthread_create.c:477)
==14488== by 0x4AF4292: clone (clone.S:95)
==14488== Address 0x4bc69d0 is 32 bytes inside a block of size 104 free'd
==14488== at 0x483CA3F: free (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==14488== by 0x1095CD: tpool_future_destroy (in /home/ilkclord/linux2020/quiz4/pi_time)
==14488== by 0x10A3F2: main (in /home/ilkclord/linux2020/quiz4/pi_time)
==14488== Block was alloc'd at
==14488== at 0x483B7F3: malloc (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==14488== by 0x1094CD: tpool_future_create (in /home/ilkclord/linux2020/quiz4/pi_time)
==14488== by 0x109F8D: tpool_apply (in /home/ilkclord/linux2020/quiz4/pi_time)
==14488== by 0x10A34C: main (in /home/ilkclord/linux2020/quiz4/pi_time)
==14488==
==14488== Invalid read of size 4
==14488== at 0x486D300: __pthread_mutex_unlock_full (pthread_mutex_unlock.c:103)
==14488== by 0x109C48: jobqueue_fetch (in /home/ilkclord/linux2020/quiz4/pi_time)
==14488== by 0x4869608: start_thread (pthread_create.c:477)
==14488== by 0x4AF4292: clone (clone.S:95)
==14488== Address 0x4bc69d0 is 32 bytes inside a block of size 104 free'd
==14488== at 0x483CA3F: free (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==14488== by 0x1095CD: tpool_future_destroy (in /home/ilkclord/linux2020/quiz4/pi_time)
==14488== by 0x10A3F2: main (in /home/ilkclord/linux2020/quiz4/pi_time)
==14488== Block was alloc'd at
==14488== at 0x483B7F3: malloc (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==14488== by 0x1094CD: tpool_future_create (in /home/ilkclord/linux2020/quiz4/pi_time)
==14488== by 0x109F8D: tpool_apply (in /home/ilkclord/linux2020/quiz4/pi_time)
==14488== by 0x10A34C: main (in /home/ilkclord/linux2020/quiz4/pi_time)
==14488==
```
### Problem
For a perfect parallelism , the workers should be loaded .Every rest of a worker is a waste of source .I take a test to check if every worker is doing a job when someones gets a task .From the result showed below , we can find out that over 75% workers are resting which points out that the speed of pushing process is much slower than that of doing the task . So
we may speed up the pushing process to enhence the efficiency .
[code](https://github.com/ilkclord/quiz4/blob/master/pi_debug.c)
[result](https://github.com/ilkclord/quiz4/blob/master/result.txt)
This problem may occured since pop and push will both lock the queue , and cause the pushing process delayed .
#### Improvement
##### **adding more jobqueue**
```graphviz
Digraph G{
queue1->Worker1
queue2->Worker2
queue3->Worker3
queue4->Worker4
rankdir = LR
}
```
To fix the delay between pop and push , we give every worker a own queue , which make the pushing process not being blocked .
[4-queue version](https://github.com/ilkclord/quiz4/blob/master/pi_v2.c)
[result_v2](https://github.com/ilkclord/quiz4/blob/master/result_v2)
The improvement still needs to be complete , there are a few issues need to be fixed
* Achieive fully loaded , with Busy state 1 1 1 1
* The Timeout process has to be rewrite
##### **Switching**
We can take the use of resting worker and let them do the pushing task .
----still trying to finish this program------