# 2022q1 Homework6 (ktcp)
contributed by < `yaohwang99` >
>[homework description](https://hackmd.io/@sysprog/linux2022-ktcp)
---
## User level echo server
1. Use `socket()`, `bind()` `listen()` to create socket and link to server address.
2. `accept()` will block the process until connected to some client.
3. Set the socket to non-blocking.
### epoll()
The epoll API monitors multiple file descriptors to see if I/O is possible on any of them.
The central concept of the epoll API is the epoll instance, an in-kernel data structure which, from a user-space perspective, can be considered as a container for tow lists:
* The interest list (or epoll set): set of fd that the process has registered and interest in monitoring.
* The ready list: the set of fd that are ready for I/O (subset of interest list).
`epoll_create()` creates a new epoll instance and a fd referring to that instance.
`epoll_ctl()` adds items to the interest list.
`epoll_wait()` waits for I/O events(fetch items from the ready list), blocking the calling thread.
`epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev)` links `ev` with `listener` and add to interest list.
`ev` describes `listerner` that it is available for read and edge trigger.
```c
int main(void)
{
... /* socket, bind, listen is done and set to non-blocking*/
int epoll_fd;
if ((epoll_fd = epoll_create(EPOLL_SIZE)) < 0)
server_err("Fail to create epoll", &list);
static struct epoll_event ev = {.events = EPOLLIN | EPOLLET};// The associated file is available for read(2) operations, edge trigger
ev.data.fd = listener;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev) < 0) // register, Add an entry to the interest list of the epoll_fd.
server_err("Fail to control epoll", &list);
printf("Listener (fd=%d) was added to epoll.\n", epoll_fd);
```
The above code initiates the epoll instance by adding `listener` to the interest list.
`epoll_waits` writes all the ready events to `events[]` and return the number of ready events.
Iterate through the ready events. If the event is `listener`, then accept new client. ==Note that `listener` is set to non-blocking so if accept failed, the thread can continue to handle other event.==
Also, the program keep tracks of the client list by `push_back_client()`.
```c
while (1) {
struct sockaddr_in client_addr;
int epoll_events_count;
if ((epoll_events_count = epoll_wait(epoll_fd, events /* events[EPOLL_SIZE]*/, EPOLL_SIZE,
EPOLL_RUN_TIMEOUT)) < 0)
server_err("Fail to wait epoll", &list);
printf("epoll event count: %d\n", epoll_events_count);
clock_t start_time = clock();
for (int i = 0; i < epoll_events_count; i++) {
/* EPOLLIN event for listener (new client connection) */
if (events[i].data.fd == listener) {
int client;
while (
(client = accept(listener, (struct sockaddr *) &client_addr,
&socklen)) > 0) {
printf("Connection from %s:%d, socket assigned: %d\n",
inet_ntoa(client_addr.sin_addr),
ntohs(client_addr.sin_port), client);
setnonblock(client);
ev.data.fd = client;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client, &ev) < 0)
server_err("Fail to control epoll", &list);
push_back_client(&list, client,
inet_ntoa(client_addr.sin_addr));
printf(
"Add new client (fd=%d) and size of client_list is "
"%d\n",
client, size_list(list));
}
if (errno != EWOULDBLOCK)
server_err("Fail to accept", &list);
} else {
/* EPOLLIN event for others (new incoming message from client)
*/
if (handle_message_from_client(events[i].data.fd, &list) < 0)
server_err("Handle message from client", &list);
}
}
printf("Statistics: %d event(s) handled at: %.6f second(s)\n",
epoll_events_count,
(double) (clock() - start_time) / CLOCKS_PER_SEC);
}
close(listener);
close(epoll_fd);
exit(0);
}
```
---
### Edge trigger and level trigger
[Test program by `Manistein`](https://github.com/Manistein/test_epoll_lt_and_et)
From the test program we can see that the reader my fail to read all the data in the buffer, but the fd will not be ready at the next call of `epoll_wait`. So the remaining data in the buffer will never be read.
___
## bench
Create `MAX_THREAD `number of threads
Each thread executes `bench_worker()`:
```c
static void *bench_worker(__attribute__((unused)))
{
int sock_fd;
char dummy[MAX_MSG_LEN];
struct timeval start, end;
/* wait until all workers created */
pthread_mutex_lock(&worker_lock);
while (!ready)
if (pthread_cond_wait(&worker_wait, &worker_lock)) {
puts("pthread_cond_wait failed");
exit(-1);
}
pthread_mutex_unlock(&worker_lock);
```
`pthread_cond_wait(&worker_wait, &worker_lock)` releases `worker_lock` and blocked until `pthread_cond_broadcast(&worker_wait)` is called by `bench()`. This step is to make sure each thread is ready before connected to the server.
```c
/*... create socket and establish connection*/
pthread_mutex_lock(&res_lock);
time_res[idx++] += time_diff_us(&start, &end);
pthread_mutex_unlock(&res_lock);
pthread_exit(NULL);
}
```
After connection finishs read and write, store the measured time in `time_res[]` which is protected by the mutex lock.
```c=
static void bench(void)
{
for (int i = 0; i < BENCH_COUNT; i++) {
ready = false;
create_worker(MAX_THREAD);
pthread_mutex_lock(&worker_lock);
ready = true;
/* all workers are ready, let's start bombing kecho */
pthread_cond_broadcast(&worker_wait);
pthread_mutex_unlock(&worker_lock);
/* waiting for all workers to finish the measurement */
for (int x = 0; x < MAX_THREAD; x++)
pthread_join(pt[x], NULL);
idx = 0;
}
for (int i = 0; i < MAX_THREAD; i++)
fprintf(bench_fd, "%d %ld\n", i, time_res[i] /= BENCH_COUNT);
}
```
At line 13, the program calls `pthread_cond_broadcast` to unblock ==every== thread that is blocked by `worker_wait`.
At line 25, the program will output the result of the average response time of each thread.
---
## kernel level server
### CMWQ
[Concurrency Managed Work Queue](https://www.kernel.org/doc/html/latest/core-api/workqueue.html)
>In the original wq implementation, a multi threaded (MT) wq had one worker thread per CPU and a single threaded (ST) wq had one worker thread system-wide. A single MT wq needed to keep around the same number of workers as the number of CPUs. The kernel grew a lot of MT wq users over the years and with the number of CPU cores continuously rising, some systems saturated the default 32k PID space just booting up.
Concurrency Managed Workqueue (cmwq) is a reimplementation of wq with focus on the following goals:
* Maintain compatibility with the original workqueue API.
* Use per-CPU unified worker pools shared by all wq to provide flexible level of concurrency on demand without wasting a lot of resource.
* Automatically regulate worker pool and level of concurrency so that the API users don’t need to worry about such details.
CMWQ design:
1. Two worker-pool per CPU, one for normal work and one for prioritized work.
2. Some extra worker-pools to serve work items queued on ==unbound workqueues== - the number of these backing pools is dynamic.
3. A work item of a bound workqueue will be queued on the worklist of thr worker-pool that is associated to the CPU the issuer is running on ==(better locality)==.
```c=
static int kecho_init_module(void)
{
int error = open_listen(&listen_sock);
if (error < 0) {
printk(KERN_ERR MODULE_NAME ": listen socket open error\n");
return error;
}
param.listen_sock = listen_sock;
/*
* Create a dedicated workqueue instead of using system_wq
* since the task could be a CPU-intensive work item
* if its lifetime of connection is too long, e.g., using
* `telnet` to communicate with kecho. Flag WQ_UNBOUND
* fits this scenario. Note that the trade-off of this
* flag is cache locality.
*
* You can specify module parameter "bench=1" if you won't
* use telnet-like program to interact with the module.
* This earns you better cache locality than using default
* flag, `WQ_UNBOUND`. Note that your machine may going
* unstable if you use telnet-like program along with
* module parameter "bench=1" to interact with the module.
* Since without `WQ_UNBOUND` flag specified, a
* long-running task may delay other tasks in the kernel.
*/
kecho_wq = alloc_workqueue(MODULE_NAME, bench ? 0 : WQ_UNBOUND, 0);
echo_server = kthread_run(echo_server_daemon, ¶m, MODULE_NAME);
if (IS_ERR(echo_server)) {
printk(KERN_ERR MODULE_NAME ": cannot start server daemon\n");
close_listen(listen_sock);
}
return 0;
}
```
At line 28, a new work queue is allocated with `WQ_UNBOUND`
The last argument `0` is @max_active which determines the maximum number of execution contexts per CPU which can be assigned to the work items of a wq. For example, with @max_active of 16, at most 16 work items of the wq can be executing at the same time per CPU.
For an unbound wq, the limit is higher of `512` and `4 * num_possible_cpus()`. These values are chosen sufficiently high such that they are not the limiting factor while providing protection in runaway cases.
The number of active work items of a wq is usually regulated by the users of the wq, more specifically, by how many work items the users may queue at the same time. Unless there is a specific need for throttling the number of active work items, specifying ‘0’ is recommended.
At line 29 of the above code, a kernel thread is created to run `echo_server_daemon()`, which creates new work and insert into the workqueue.
```c=
int echo_server_daemon(void *arg)
{
struct echo_server_param *param = arg;
struct socket *sock;
struct work_struct *work;
allow_signal(SIGKILL);
allow_signal(SIGTERM);
INIT_LIST_HEAD(&daemon.worker);
while (!kthread_should_stop()) {
/* using blocking I/O */
int error = kernel_accept(param->listen_sock, &sock, 0);
if (error < 0) {
if (signal_pending(current))
break;
printk(KERN_ERR MODULE_NAME ": socket accept error = %d\n", error);
continue;
}
if (unlikely(!(work = create_work(sock)))) {
printk(KERN_ERR MODULE_NAME
": create work error, connection closed\n");
kernel_sock_shutdown(sock, SHUT_RDWR);
sock_release(sock);
continue;
}
/* start server worker */
queue_work(kecho_wq, work);
}
printk(MODULE_NAME ": daemon shutdown in progress...\n");
daemon.is_stopped = true;
free_work();
return 0;
}
```
A socket is stored as a structure in the kernel:
```c
struct socket {
socket_state state;
short type;
unsigned long flags;
struct file *file;
struct sock *sk;
const struct proto_ops *ops;
struct socket_wq wq;
};
```
The in-kernel server is created by the following steps:
1. `sock_create()` creates a socket.
`kernel_bind()` binds the socket to the server address.
`kernel_listen()` sets the socket to listen mode.
2. Use `kernel_accept()` to create a new socket for a client, and then ==assign the socket to a work and insert to the work queue==.
From [The Kernel Module Programming Guide](https://sysprog21.github.io/lkmpg/):
> **4.5 Passing Command Line Arguments to a Module**
Modules can take command line arguments, but not with the argc/argv you might be used to.
>To allow arguments to be passed to your module, declare the variables that will take the values of the command line arguments as global and then use the module_param() macro, (defined in include/linux/moduleparam.h) to set the mechanism up. At runtime, insmod will fill the variables with any command line arguments that are given, like insmod mymodule.ko myvariable=5 . The variable declarations and macros should be placed at the beginning of the module for clarity. The example code should clear up my admittedly lousy explanation.
## Linux Application Performance introduction
[Linux Application Performance introduction](https://unixism.net/2019/04/linux-applications-performance-introduction/)
process–based vs. thread–based vs. event–based
`select()`:
1. `nfds` is the highest-numbered file descriptor in any of the three sets, plus 1.
2. $O(n)$ search time.
3. Up to 1024 fd because it use `32 * 32` bit map to record if the event is set.
`poll()`:
1. The caller should specify the number of items in the fds array in `nfds`.
2. $O(n)$ search time.
3. The number of fd has no limit.
`epoll()`:
1. Event driven, edge or level trigger.
2. $O(1)$ search time.
3. The number of fd has no limit.
:::info
:bulb: `select` and `poll` needs to traverse through the fd list to check if any of them is set. `epoll` only needs to check if the "ready list" is not empty.
`select` or `poll` performs better when only very few fd is in the set.
:::
:::warning
The reason that `pselect()` or `ppoll` is needed is that if one wants to wait for either a signal or for a file descriptor to become ready, then an atomic test is needed to prevent race conditions.
(Suppose the signal handler sets a global flag and returns. Then a test of this global flag followed by a call of select() could hang indefinitely if the signal arrived just after the test but just before the call. By contrast, pselect() allows one to first block signals, handle the signals that have come in, then call pselect() with the desired sigmask, avoiding the race.)
:::
## khttpd
Similar to ktcp, the module creates a kernel thread to run `http_server_daemon()` in the background.
```c
static int __init khttpd_init(void)
{
int err = open_listen_socket(port, backlog, &listen_socket);
if (err < 0) {
pr_err("can't open listen socket\n");
return err;
}
param.listen_socket = listen_socket;
http_server = kthread_run(http_server_daemon, ¶m, KBUILD_MODNAME);
if (IS_ERR(http_server)) {
pr_err("can't start http server daemon\n");
close_listen_socket(listen_socket);
return PTR_ERR(http_server);
}
return 0;
}
```
In `open_listen_socket()`, a lot of option is set for the `sock`.
`SO_REUSEADDR`: Reuse of local addresses is supported.
`SO_SNDBUF`: Send buffer size.
`SO_RCVBUF`: Receive buffer size.
`TCP_NODELAY`: Applications that require lower latency on every packet sent must be run on sockets with TCP_NODELAY enabled.
Remove `TCP_CORK`: When the logical packet has been built in the kernel by the various components in the application, tell TCP to remove the cork. TCP will send the accumulated logical packet right away, without waiting for any further packets from the application.
```c
static int open_listen_socket(ushort port, ushort backlog, struct socket **res)
{
struct socket *sock;
struct sockaddr_in s;
int err = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
if (err < 0) {
pr_err("sock_create() failure, err=%d\n", err);
return err;
}
err = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1);
if (err < 0)
goto bail_setsockopt;
err = setsockopt(sock, SOL_TCP, TCP_NODELAY, 1);
if (err < 0)
goto bail_setsockopt;
err = setsockopt(sock, SOL_TCP, TCP_CORK, 0);
if (err < 0)
goto bail_setsockopt;
err = setsockopt(sock, SOL_SOCKET, SO_RCVBUF, 1024 * 1024);
if (err < 0)
goto bail_setsockopt;
err = setsockopt(sock, SOL_SOCKET, SO_SNDBUF, 1024 * 1024);
if (err < 0)
goto bail_setsockopt;
/*...kernel_bind(), kernel_listen()*/
}
```
### htstress
`hstress` uses ==both multi-thread and epoll== to increase the number of request per second.
The number of `requests/sec` is also affected by:
1. Number of thread.
2. Number of fd per epoll.
3. Server respond speed.
```c
static void *worker(void *arg)
{
int ret, nevts;
struct epoll_event evts[MAX_EVENTS];
char inbuf[INBUFSIZE];
struct econn ecs[concurrency], *ec;
(void) arg;
int efd = epoll_create(concurrency);
if (efd == -1) {
perror("epoll");
exit(1);
}
for (int n = 0; n < concurrency; ++n)
init_conn(efd, ecs + n);
//... epoll_wait ...
}
int main(int argc, char *argv[])
{
...
for (int n = 0; n < num_threads - 1; ++n)
pthread_create(&useless_thread, 0, &worker, 0);
worker(0);
...
}
```
By tweaking the number of thread and concurrency level, we can see different results with the same server.
```bash
$ ./htstress http://localhost:8081 -n 100000
num_threads:1
requests/sec: 19830.713
$ ./htstress http://localhost:8081 -t 10 -n 200000
num_threads:10
requests/sec: 41583.405
$ ./htstress http://localhost:8081 -c 10 -n 200000
num_threads:1
requests/sec: 31593.216
$ ./htstress http://localhost:8081 -t 3 -c 20 -n 200000
num_threads:3
requests/sec: 52022.194
```
## Implement CMWQ
[commit 0fa7260](https://github.com/yaohwang99/khttpd/commit/0fa72606412d20e0262985fabe53613258a5295f)
Define a new structure for binding the socket to the work.
```diff
+struct http_work {
+ struct socket *socket;
+ struct work_struct work;
+};
```
Allocate a workqueue.
```diff
param.listen_socket = listen_socket;
+ http_wq = alloc_workqueue("http_wq", WQ_UNBOUND, 0);
http_server = kthread_run(http_server_daemon, ¶m, KBUILD_MODNAME);
if (IS_ERR(http_server)) {
pr_err("can't start http server daemon\n");
```
Modify the to match the function used for the work.
==The key point is to use the macro `container_of()` to access `struct http_work *hwork` and remember to release the memeory.==
```diff
-static int http_server_worker(void *arg)
+static void http_server_worker(struct work_struct *work)
{
char *buf;
struct http_parser parser;
...
struct http_request request;
- struct socket *socket = (struct socket *) arg;
+ struct http_work *hwork = container_of(work, struct http_work, work);
struct socket *socket = hwork->socket;
allow_signal(SIGKILL);
allow_signal(SIGTERM);
buf = kmalloc(RECV_BUFFER_SIZE, GFP_KERNEL);
if (!buf) {
pr_err("can't allocate memory!\n");
- return -1;
+ return;
}
request.socket = socket;
...
kernel_sock_shutdown(socket, SHUT_RDWR);
sock_release(socket);
kfree(buf);
- return 0;
+ kfree(hwork);
+ return;
}
```
Result:
```bash
$ ./htstress http://localhost:8081 -n 200000
requests/sec: 26835.509
$ ./htstress http://localhost:8081 -t 3 -c 20 -n 200000
requests/sec: 71456.159
```