# OS Pthread Programming ## Implementation ### TSqueue * Initializing a mutex, condition variables, and a buffer for storing items. * Destorying all the attribute in TSqueue's destructor. * In the enqueue method, if the size equals the maximum buffer size, then the thread will wait. Otherwise, we will put the item at the end of the queue and increase the value of the size by 1. * In the dequeue method, if the size equals zero, then the thread will wait. Otherwise, pop the element from the head of the queue and increase the value of the size by 1. ``` template <class T> TSQueue<T>::TSQueue(int buffer_size) : buffer_size(buffer_size) { // TODO: implements TSQueue constructor pthread_mutex_init(&mutex,NULL); pthread_mutex_lock(&mutex); buffer=new T[buffer_size]; size=head=0; tail=-1; pthread_cond_init(&cond_enqueue,NULL); pthread_cond_init(&cond_dequeue,NULL); pthread_mutex_unlock(&mutex); } template <class T> TSQueue<T>::~TSQueue() { // TODO: implenents TSQueue destructor pthread_mutex_lock(&mutex); delete buffer; pthread_cond_destroy(&cond_enqueue); pthread_cond_destroy(&cond_dequeue); pthread_mutex_unlock(&mutex); pthread_mutex_destroy(&mutex); } template <class T> void TSQueue<T>::enqueue(T item) { // TODO: enqueues an element to the end of the queue pthread_mutex_lock(&mutex); while(size == buffer_size) { pthread_cond_wait(& cond_enqueue,& mutex); } tail = (tail + 1) % buffer_size; buffer[tail] = item; size++; pthread_cond_signal(& cond_dequeue); pthread_mutex_unlock(&mutex); } template <class T> T TSQueue<T>::dequeue() { // TODO: dequeues the first element of the queue pthread_mutex_lock(& mutex); while(size == 0) { pthread_cond_wait(& cond_dequeue,& mutex); } T val=buffer[head]; head = (head + 1) % buffer_size; size --; pthread_cond_signal(& cond_enqueue); pthread_mutex_unlock(& mutex); return val; } template <class T> int TSQueue<T>::get_size() { // TODO: returns the size of the queue return size; } template <class T> int TSQueue<T>::get_max_buffersize() { return buffer_size; } ``` ### Producer * It dequeues items from input_queue, using transformer to processes them, and enqueues the results into a worker_queue. ``` void* Producer::process(void* arg) { // TODO: implements the Producer's work Producer* producer = (Producer*) arg; for(;;) { if(producer->input_queue->get_size()>0) { // std::cout<<"Enter producer process\n"; Item* transItem = producer->input_queue->dequeue(); unsigned long long int val = producer->transformer-> producer_transform(transItem -> opcode , transItem->val); Item* temp = new Item(transItem->key ,val , transItem->opcode); producer->worker_queue->enqueue(temp); delete transItem; } } return nullptr; } ``` ### Consumer * In the process function, the consumer continuously checks if there are items in the worker_queue. If so, it dequeues an item, processes it (using transformer), and enqueues the result into the output_queue. * In cancel() function, set is_cancel to true,then use pthread library to cancel thread. ``` int Consumer::cancel() { // TODO: cancels the consumer thread is_cancel=true; return pthread_cancel(this->t); } void* Consumer::process(void* arg) { Consumer* consumer = (Consumer*)arg; pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, nullptr); while (!consumer->is_cancel) { pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, nullptr); // TODO: implements the Consumer's work if(consumer->worker_queue->get_size() > 0) { Item* TransItem=consumer->worker_queue->dequeue(); unsigned long long int val = consumer->transformer-> consumer_transform(TransItem -> opcode, TransItem -> val); Item* temp = new Item(TransItem->key , val , TransItem->opcode); consumer->output_queue->enqueue(temp); delete TransItem; } pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr); } delete consumer; return nullptr; } ``` ### ConsumerController * In the process function, the controller continuously monitors the size of the worker_queue. If the queue size exceeds **high_threshold**, it dynamically scales up the number of consumer threads. Conversely, if the queue size falls below **low_threshold**, it dynamically scales down the number of consumer threads. * Using **usleep** function to check worker queue status periodically. ``` void* ConsumerController::process(void* arg) { // TODO: implements the ConsumerController's work ConsumerController* controller=(ConsumerController*) arg; for(;;) { usleep(controller->check_period/1000); if((double)controller->worker_queue->get_size()/controller->worker_queue->get_max_buffersize() > (double)controller->high_threshold/100) { Consumer* temp=new Consumer(controller->worker_queue, controller->writer_queue , controller->transformer); temp->start(); controller->consumers.push_back(temp); std::cout<<"Scaling up consumers from "<< controller->consumers.size()-1 << " to " << controller->consumers.size() << "\n"; } else if((double)controller->worker_queue->get_size()/controller->worker_queue->get_max_buffersize() < (double) controller->low_threshold/100 && controller->consumers.size() > 1 ) { Consumer* temp = controller->consumers[controller->consumers.size()-1]; temp->cancel(); controller->consumers.pop_back(); std::cout<<"Scaling down consumers from "<< controller->consumers.size()+1 << " to " << controller->consumers.size() << "\n"; } } return nullptr; } ``` ### Writer * Using Pthread library to create process * It dequeues items from an output_queue and writes them to a file through the ofs stream. ``` void* Writer::process(void* arg) { // TODO: implements the Writer's work Writer* writer = (Writer*) arg; while(writer -> expected_lines --) { Item* temp = new Item; temp = writer->output_queue->dequeue(); writer->ofs << *temp; } return nullptr; } ``` ### main.cpp * Initalizing input queue,worker queue, output queue,transformer ,reader ,writer ,producer , comsumer ,and consumer controller. * Calling start function of 4 producer . * Calling reader's join function and writer's join function. * Deleting all pointers. ``` int main(int argc, char** argv) { assert(argc == 4); int n = atoi(argv[1]); std::string input_file_name(argv[2]); std::string output_file_name(argv[3]); // TODO: implements main function TSQueue<Item*> *input_queue=new TSQueue<Item*>(READER_QUEUE_SIZE); TSQueue<Item*> *worker_queue=new TSQueue<Item*>(WORKER_QUEUE_SIZE); TSQueue<Item*> *output_queue=new TSQueue<Item*>(WRITER_QUEUE_SIZE); Transformer *transformer=new Transformer(); Reader *reader=new Reader(n, input_file_name, input_queue); Writer *writer=new Writer(n, output_file_name, output_queue); Producer* p1=new Producer(input_queue, worker_queue, transformer); Producer* p2=new Producer(input_queue, worker_queue, transformer); Producer* p3=new Producer(input_queue, worker_queue, transformer); Producer* p4=new Producer(input_queue, worker_queue, transformer); ConsumerController* controller=new ConsumerController(worker_queue,output_queue, transformer,CONSUMER_CONTROLLER_CHECK_PERIOD,CONSUMER_CONTROLLER_LOW_THRESHOLD_PERCENTAGE ,CONSUMER_CONTROLLER_HIGH_THRESHOLD_PERCENTAGE); //1 reader thread reader->start(); //1 writer thread writer->start(); //1 controller thread controller->start(); //4 producer threads p1->start(); p2->start(); p3->start(); p4->start(); reader->join(); writer->join(); //terminated delete p1;delete p2;delete p3;delete p4; delete controller; delete reader; delete writer; delete transformer; delete input_queue;delete worker_queue;delete output_queue; return 0; } ``` # Experiment * Scaling up :紀錄Scaling up consumer的次數 * Scaling down :紀錄Scaling down consumer的次數 ## 1. Different values of CONSUMER_CONTROLLER_CHECK_PERIOD CONSUMER_CONTROLLER_CHECK_PERIOD越低,scaling 的次數上升。 | testcase | 00 | 00 | 01 | 01 | | ------------------- | ------ | ------ | ------ | ------ | | | 對照組 | 實驗組 | 對照組 | 實驗組 | | CONSUMER_CONTROLLER_CHECK_PERIOD | 1000000 | 100000 | 1000000 | 100000 | | Scaling up number | 4 | 5 | 52 | 75 | | Scaling down number | 3 | 4 | 51 | 74 | ## 2. Different values of CONSUMER_CONTROLLER_LOW_THRESHOLD_PERCENTAGE and CONSUMER_CONTROLLER_HIGH_THRESHOLD_PERCENTAGE. 改變CONSUMER_CONTROLLER_LOW_THRESHOLD_PERCENTAGE在testcase 00對實驗組和對照組來說,結果都是一樣的,而在testcase 01就有明顯的差距,由於提高CONSUMER_CONTROLLER_LOW_THRESHOLD_PERCENTAGE,導致被cancel的thread變多了,因而造成scaling的次數上升。 | testcase | 00 | 00 | 01 | 01 | | -------------------------------------------- | ------ | ------ | ------ | ------ | | | 對照組 | 實驗組 | 對照組 | 實驗組 | | CONSUMER_CONTROLLER_HIGH_THRESHOLD_PERCENTAGE| 80 | 80 | 80 | 80 | | CONSUMER_CONTROLLER_LOW_THRESHOLD_PERCENTAGE | 20 | 60 | 20 | 60 | | Scaling up number | 4 | 4 | 65 | 84 | | Scaling down number | 3 | 3 | 64 | 83 | ## 3.Different values of WORKER_QUEUE_SIZE worker queue size 越小,就越容易滿足CONSUMER_CONTROLLER_HIGH_THRESHOLD_PERCENTAGE,也就是說,scaling的次數會越來越高。 | testcase | 00 | 00 | 01 | 01 | | ------------------- | ------ | ------ | ------ | ------ | | | 對照組 | 實驗組 | 對照組 | 實驗組 | | WRITER_QUEUE_SIZE | 200 | 100 | 200 | 100 | | Scaling up number | 4 | 3 | 29 | 32 | | Scaling down number | 3 | 2 | 28 | 25 | ## 4. What happens if WRITER_QUEUE_SIZE is very small? writer queue size變小,scaling consumers的次數上升。 | testcase | 00 | 00 | 01 | 01 | | ------------------- | ------ | ------ | ------ | ------ | | | 對照組 | 實驗組 | 對照組 | 實驗組 | | WRITER_QUEUE_SIZE | 4000 | 50 | 4000 | 50 | | Scaling up number | 5 | 2 | 9 | 35 | | Scaling down number | 4 | 1 | 8 | 34 | ## 5. What happens if READER_QUEUE_SIZE is very small? 可以看出對照組和實驗組的差距很小,READER_QUEUE_SIZE對consumer的影響並不重要。 | testcase | 00 | 00 | 01 | 01 | | ------------------- | ------ | ------ | ------ | ------ | | | 對照組 | 實驗組 | 對照組 | 實驗組 | | READER_QUEUE_SIZE | 200 | 10 | 200 | 10 | | The number of Scaling up number | 4 | 3 | 10 | 13 | | Scaling down number | 3 | 2 | 9 | 12 | # What difficulties did you encounter when implementing this assignment? 在實作一個週期去檢查consumer controller的狀態時,如果沒用usleep function,實作會變得很麻煩。 # Any feedback you would like to let us know. None.