# Thread Pool
###### tags: `Construction`
## Description
This is [OAI utils library](https://gitlab.flux.utah.edu/oai/openairinterface5g/-/blob/develop/common/utils/threadPool)
The thread pool is a working server, made of a set of worker threads that can be mapped on CPU cores.
Each worker loop on picking from the same input queue jobs to do.
When a job is done, the worker sends a return if a return is defined.
A selective abort allows to cancel parallel jobs (usage: a client pushed jobs, but from a response of one job, the other linked jobs becomes useless).
All the thread pool functions are thread safe, nevertheless the working functions are implemented by the thread pool client, so the client has to tackle the parallel execution of his functions called "processingFunc" hereafter.
* [reference](https://blog.csdn.net/qq_31985307/article/details/115742065)
## jobs
A job is a message (notifiedFIFO_elt_t)
The job messages can be created with newNotifiedFIFO_elt() and delNotifiedFIFO_elt() or managed by the client.
## Queues of jobs
Queues are type of: notifiedFIFO_t that must be initialized by init_notifiedFIFO()
No delete function is required, the creator has only to free the data of type notifiedFIFO_t
push_notifiedFIFO() add a job in the queue
pull_notifiedFIFO() is blocking, poll_notifiedFIFO() is non blocking
abort_notifiedFIFO() allows the customer to delete all waiting jobs that match with the key (see key in jobs definition)
## initialization
The clients can create one or more thread pools with init_tpool()
the params string structure: describes a list of cores, separated by "," that run a worker thread
If the core exists on the CPU, the thread pool initialization sets the affinity between this thread and the related code (use negative values is allowed, so the thread will never be mapped on a specific core).
The threads are all Linux real time scheduler, their name is set automatically is "Tpool_coreID"
## Adding jobs
The client create their jobs messages as a notifiedFIFO_elt_t, then they push it with pushTpool() (that internally calls push_notifiedFIFO())
If they need a return, they have to create response queues with init_notifiedFIFO() and set this FIFO pointer in the notifiedFIFO_elt_t before pushing the job.
## abort
A abort service abortTpool() allows to abort all jobs that match a key (see jobs "key"). When the abort returns, it garanties no job (matching the key) response will be posted on response queues.
Nevertheless, jobs already performed before the return of abortTpool() are pushed in the response Fifo queue.
## Performance measurements
A performance measurement is integrated: the pool will automacillay fill timestamps:
* creationTime: time the request is push to the pool;
* startProcessingTime: time a worker start to run on the job
* endProcessingTime: time the worker finished the job
* returnTime: time the client reads the result
if you set the environement variable: thread-pool-measurements to a valid file name
These measurements will be wrote to this Linux pipe.
A tool to read the linux fifo and display it in ascii is provided: see the local directory Makefile for this tool and to compile the thread pool unitary tests.
## Struct
* Jobs
```c=
typedef struct notifiedFIFO_elt_s {
// internal FIFO chain
struct notifiedFIFO_elt_s *next;
// long int to identify a message or a group of meesages responseFifo
uint64_t key;
// if the client defines a response FIFO, the message will be posted back after processing
struct notifiedFIFO_s *reponseFifo;
// callback function
void (*processingFunc)(void *);
// a boolean that enable internal free in these cases: no return Fifo or Abort featur
bool malloced;
uint64_t creationTime;
uint64_t startProcessingTime;
uint64_t endProcessingTime;
uint64_t returnTime;
// the data passed to processingFunc.
void *msgData;
} notifiedFIFO_elt_t;
```
* Queue of Jobs
```c=
typedef struct notifiedFIFO_s {
// front
notifiedFIFO_elt_t *outF;
// rear
notifiedFIFO_elt_t *inF;
// mutex lock
pthread_mutex_t lockF;
// condition signal
pthread_cond_t notifF;
} notifiedFIFO_t;
```
* one_thread
```c=
struct one_thread {
pthread_t threadID;
int id;
int coreID;
char name[256];
uint64_t runningOnKey;
bool abortFlag;
struct thread_pool *pool;
struct one_thread *next;
};
```
* thread_pool
```c=
typedef struct thread_pool {
int activated;
bool measurePerf;
int traceFd;
int dummyTraceFd;
uint64_t cpuCyclesMicroSec;
uint64_t startProcessingUE;
int nbThreads;
bool restrictRNTI;
notifiedFIFO_t incomingFifo;
struct one_thread *allthreads;
} tpool_t;
```
## Principle
```c=
/*
* Initiate the thread-pool
* 1. Create thread-pool space
* 2. intial the thread-pool parameters
* 3. Create the thread based on parameters
*/
initTPool(char *params, tpool_t *pool, bool performanceMeas);
```
After initiating the thread-pool, blocked threads will wait for broadcasting from queue.
When there is any task is added into the queue, the queue will broadcast signal to all the blocked threads.
After receiving the broadcast signal, the threads will compete for the job.
If competing successfully, the thread will start to run the job and put the return value into respnse queue after execution finished.
After either finished or failed, the thread will be in blocked state and wait for the next round signal.

## Usages
```c=
// Initial the task thread
notifiedFIFO_t *task_fifo = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initNotifiedFIFO(task_fifo);
// Add the task to the specific thread
notifiedFIFO_elt_t *msg = newNotifiedFIFO_elt(sizeof(struct test_data), 0, resp_fifo /* If any or NULL*/, callback_func /* If any or NULL*/);
pushNotifiedFIFO(task_fifo, msg);
// To get return value
notifiedFIFO_elt_t *res;
res = pullTpool(task_fifo, threadPool);
```