Qt中QThreadPool線程池實現與原始碼解析 === ## 參考文章 + [QThreadPool Documentation](http://doc.qt.io/qt-5/qthreadpool.html) + [QRunnable Documentation](http://doc.qt.io/qt-5/qrunnable.html) + [QThread Documentation](http://doc.qt.io/qt-5/qthread.html) + [QThreadPool 完整原始碼](https://gist.github.com/floatflower/f0e42d2b26915372f0fff40916624d72) *在這篇文章中只找了幾個比較關鍵的點來解析,並沒有每一個函數都做註釋,強烈建議閱讀本篇文章時可以對著完整的原始碼閱讀* QThreadPool是Qt框架中一個提供線程複用以及能夠依照任務優先度先後調度的一個類別,在這篇文章裡面會對一些關鍵的函數實做來進行解析。 在整個線程池的類中,如果maxThread被設為n,那麼整個線程池將會開啟n+1個線程,ThreadPoolPrivate是一個manager的概念,也就是他不負責運行任務,僅負責管理線程池中的線程。 ## QThreadPool::QThreadPool() 建構子 ```cpp /*! Constructs a thread pool with the given \a parent. */ QThreadPool::QThreadPool(QObject *parent) : QObject(*new QThreadPoolPrivate, parent) { } ``` 當我們創建一個新的QThreadPool物件的時候,實際上是創建了一個QThreadPoolPrivate,而這個QThreadPoolPrivate是一個管理線程的類別,實際上運行任務的類別是QThreadPoolThread顧名思義也就是線程池中的線程。 把這個線程池的架構畫成圖之後大概長成這樣: ![QThreadPool](https://ooo.0o0.ooo/2017/06/26/59510b4604b6a.png) 實際上我們調用一個QThreadPool中的函數,其實這個函數實際上會幫我們處理非常多的動作。 ## QThreadPoolPrivate header ```cpp #include "QtCore/qmutex.h" #include "QtCore/qwaitcondition.h" #include "QtCore/qset.h" #include "QtCore/qqueue.h" #include "private/qobject_p.h" #ifndef QT_NO_THREAD QT_BEGIN_NAMESPACE class QThreadPoolThread; class Q_CORE_EXPORT QThreadPoolPrivate : public QObjectPrivate { Q_DECLARE_PUBLIC(QThreadPool) friend class QThreadPoolThread; public: QThreadPoolPrivate(); // 嘗試開始運行作為參數送入的任務物件。 bool tryStart(QRunnable *task); // 將任務物件依照優先度排序在任務佇列中等待運行。 void enqueueTask(QRunnable *task, int priority = 0); // 回傳目前處於運行中的線程數量。 int activeThreadCount() const; // 嘗試開啟更多的線程,這個函數調用之後可能會有兩個結果: // + 不斷的將任務佇列中的任務取出,並且開啟新的線程,直到達到最大線程數。 // + 不斷的將任務佇列中的任務取出,雖未達最大線程數,淡將任務佇列清空。 void tryToStartMoreThreads(); // 檢查是否目前active的線程數超過最大線程數。 bool tooManyThreadsActive() const; // 新開一個線程並且運行作為參數的任務物件。 void startThread(QRunnable *runnable = 0); // 將線程池整個清空,等待目前所有的線程皆退出,並且清空"等待中"以及"已過期"的線程佇列。 void reset(); // 設定等待完成的時間,若在等待時間到了,所有線程皆被註銷,則返回true。 bool waitForDone(int msecs); // 清除任務佇列中,也就是尚未開始運行的佇列。 void clear(); // 找到任務佇列中指定的任務並且將其從任務佇列中刪除。 bool stealRunnable(QRunnable *runnable); // 找到任務佇列中指定的任務並且搶先運行然後將其從任務佇列中刪除。 void stealAndRunRunnable(QRunnable *runnable); // 互斥量 mutable QMutex mutex; // 所有線程的集合。 // Qset Documentation: http://doc.qt.io/qt-5/qset.html QSet<QThreadPoolThread *> allThreads; // 等待任務中的線程佇列 // QQueue Documentation: http://doc.qt.io/qt-5/qqueue.html QQueue<QThreadPoolThread *> waitingThreads; // 等待被註銷的線程佇列 QQueue<QThreadPoolThread *> expiredThreads; // 等待被運行的任務佇列,QRunnable為任務物件,int為優先度。 QVector<QPair<QRunnable *, int> > queue; QWaitCondition noActiveThreads; // 線程池是否退出。 bool isExiting; // 線程池中線程的過期時間。 int expiryTimeout; // 線程池中最大的線程數量。 int maxThreadCount; int reservedThreads; // 線程池中正在運行任務的線程數量。 int activeThreads; }; QT_END_NAMESPACE #endif // QT_NO_THREAD #endif ``` + isExiting變數為true時可能會讓目前沒有工作的線程結束等待新工作的狀態。以下這個程式片段出現在`QThreadPoolThread::run()`函數定義中。 ```cpp if (manager->isExiting) { registerThreadInactive(); break; } ``` ## QThreadPoolPrivate::QThreadPoolPrivate() 建構子 ```cpp /* \internal */ QThreadPoolPrivate:: QThreadPoolPrivate() : isExiting(false), expiryTimeout(30000), maxThreadCount(qAbs(QThread::idealThreadCount())), reservedThreads(0), activeThreads(0) { } ``` + QThread::idealThreadCount()會回傳目前運行的系統最理想的線程數,也就是目前的運行這個程式的電腦最多可以運行幾個線程。[QThread Documentation: QThread::idealThreadCount](http://doc.qt.io/qt-5/qthread.html#idealThreadCount) ## QThreadPoolPrivate::tryStart() ```cpp bool QThreadPoolPrivate::tryStart(QRunnable *task) { if (allThreads.isEmpty()) { // always create at least one thread startThread(task); return true; } // can't do anything if we're over the limit // 這裡使用了>=,如前面所說,可能存在一種狀態為,activeThread的數量比maxThread的值還大, // 這是因為當所有線程池中的線程都正在運行任務時,最大線程數被調降了。 if (activeThreadCount() >= maxThreadCount) return false; // 目前有正在等待任務的線程,若有正在等待任務的線程, // 就代表現在任務佇列中並沒有任何任務等待運行, // 因此直接將任務放入等待佇列中就可以保證這個任務能馬上被運行到。 if (waitingThreads.count() > 0) { // 將這個任務放入等待佇列中,並喚醒一個正在等待的線程運行。 // recycle an available thread enqueueTask(task); waitingThreads.takeFirst()->runnableReady.wakeOne(); return true; } // 如果有已經"過期"的線程過期佇列中等待被註銷, // 那麼就將這個還沒過期的線程拿來複用 // 可以省去一次創建線程的開銷 if (!expiredThreads.isEmpty()) { // restart an expired thread QThreadPoolThread *thread = expiredThreads.dequeue(); Q_ASSERT(thread->runnable == 0); ++activeThreads; if (task->autoDelete()) ++task->ref; thread->runnable = task; thread->start(); return true; } // 若線程池不是空的, // activeThread的數量也小於maxThread // 且沒有在等待新任務的線程 // 更沒有在等待被註銷的線程 // 那麼就直接創建一個新線程 // start a new thread startThread(task); return true; } ``` 在這個線程池中有幾個線程回收及創建的時機: + 複用就線程的時機 + 如果有正在等待的任務的線程,即複用該線程。 + 若有在已過期的線程佇列中等待註銷的線程但尚未被註銷,則複用該線程。 + 總括來說,線程的複用率越高,整個線程池的效率也就越高,因為可以減少較多的開新線程的開銷。 + 創建新線程的時機 + 當線程池裡一個線程都沒有的時候,創建一個新線程。 + 當不符合上述複用的時機的時候,才創建一個新線程。 + 註銷線程的時機 + 當處於active(正在運行任務)狀態的線程大於最大線程數時。 + 當一個線程太久沒有被使用,也就是任務佇列處於空時太久,線程一直等不到新任務,註銷等待太久的線程。 此外,線程如果處於等待任務的狀態,他並不是處於停滯的,他其實是利用for(;;)的無窮迴圈等待當指向任務物件的指針再也不等於0的時候馬上開始運行任務。以下是線程運行任務以及等待任務的過程。 ![QThreadPoolThread](https://ooo.0o0.ooo/2017/06/26/59510fdfa40b7.png) ## QThreadPoolPrivate::enqueueTask() ```cpp void QThreadPoolPrivate::enqueueTask(QRunnable *runnable, int priority) { if (runnable->autoDelete()) ++runnable->ref; // put it on the queue QVector<QPair<QRunnable *, int> >::const_iterator begin = queue.constBegin(); QVector<QPair<QRunnable *, int> >::const_iterator it = queue.constEnd(); // 按照即將插入的任務的優先級別插入相應區塊的最後一個。 if (it != begin && priority > (*(it - 1)).second) it = std::upper_bound(begin, --it, priority); queue.insert(it - begin, qMakePair(runnable, priority)); } ``` 這個函數的用途就是將tryStart()失敗的任務物件放入等待佇列中等待運行,並且任務將會依照各自的優先度被放在同樣優先度等級的最後面等待調度。假設目前任務佇列的優先度為: ![priority](https://ooo.0o0.ooo/2017/06/26/595113abdc2ff.png) 若來了一個優先度為4的任務,那麼這個任務將被插入那三個優先度同樣為4個後面。 因為所有線程在將運行下一個任務前會將任務佇列的第一個任務拿出,因此這種方法能夠確保越高優先度的將會更早被調度,而相同優先度的最早進入佇列中將會更早被調度。 ## QThreadPoolPrivate::tryToStartMoreThread() ```cpp void QThreadPoolPrivate::tryToStartMoreThreads() { // try to push tasks on the queue to any available threads /* 不斷的將任務佇列中的物件抽出開始運行,在tryStart()函數中, + 線程池不是空的, + activeThread的數量也小於maxThread + 且沒有在等待新任務的線程 + 更沒有在等待被註銷的線程 那麼就直接創建一個新線程。 因此這個迴圈終究能將線程池的activeThread數量=maxThread數 */ while (!queue.isEmpty() && tryStart(queue.constFirst().first)) queue.removeFirst(); } ``` 這個函數是當最大線程數被更改之後,這個函數就會被調用,在QThreadPool::setMaxThreadCount()函數中,就這樣使用了tryToStartMoreThreads()。 ``` void QThreadPool::setMaxThreadCount(int maxThreadCount) { Q_D(QThreadPool); QMutexLocker locker(&d->mutex); if (maxThreadCount == d->maxThreadCount) return; d->maxThreadCount = maxThreadCount; // 開始啟動更多的線程,直到: // + 線程數量與最大線程數量相等 // + 開始不斷的拿出任務開始運行直到任務佇列清空,且線程數量未達上限。 // 這個函數在maxThreadCount < maxThreadCount 時不會開啟更多線程或開始運行更多任務。 d->tryToStartMoreThreads(); } ``` tryStartMoreThreads()這個函數會不斷地將任務佇列中的第一個拿出來tryStart()直到任務佇列空了,或者線程數已經達到了最大線程數。 ## QThreadPoolPrivate::startThread() ```cpp /*! \internal */ void QThreadPoolPrivate::startThread(QRunnable *runnable) { // QScopedPointer Documentation: http://doc.qt.io/qt-5/qscopedpointer.html QScopedPointer <QThreadPoolThread> thread(new QThreadPoolThread(this)); thread->setObjectName(QLatin1String("Thread (pooled)")); allThreads.insert(thread.data()); ++activeThreads; if (runnable->autoDelete()) ++runnable->ref; thread->runnable = runnable; thread.take()->start(); } ``` 這個函數的用途就是創建一個QThreadPoolThread物件,並且將其放入allThreads的QSet中儲存起來。 並且將作為參數的任務物件用這個新創建的QThreadPoolThread運行。 ## QThreadPoolPrivate::reset() ```cpp /*! \internal Makes all threads exit, waits for each thread to exit and deletes it. */ void QThreadPoolPrivate::reset() { QMutexLocker locker(&mutex); isExiting = true; while (!allThreads.empty()) { // move the contents of the set out so that we can iterate without the lock QSet<QThreadPoolThread *> allThreadsCopy; allThreadsCopy.swap(allThreads); locker.unlock(); for (QThreadPoolThread *thread : qAsConst(allThreadsCopy)) { thread->runnableReady.wakeAll(); thread->wait(); delete thread; } locker.relock(); // repeat until all newly arrived threads have also completed } waitingThreads.clear(); expiredThreads.clear(); isExiting = false; } ``` 這個reset()的實現首先是創立一個新的allThreadsCopy並且將目前的allThreads複製過來,因為allThreads是一個臨界資源,因此要避免長時間操作allThreads,先將其複製過來並將allThreads中的數據清空,就可以在不鎖上狀況下等待線程結束並釋放線程的記憶體。這個動作將不斷的持續到allThreads完全被清空。 在這個動作開始之前,會先將isExiting設為true,在前面有說這個數值代表線程池是否退出,一旦這個變數被設為0,就會觸發QThreadPoolThread中這段程式碼,也就是所有運行完上一個任務的線程,在要開始運行下一個任務之前發現isExisting為true,那麼就會將自己變為inactive然後結束自己的工作迴圈: ``` if (manager->isExiting) { registerThreadInactive(); break; } ``` ## QThreadPoolPrivate::waitForDone() ```cpp bool QThreadPoolPrivate::waitForDone(int msecs) { QMutexLocker locker(&mutex); if (msecs < 0) { while (!(queue.isEmpty() && activeThreads == 0)) noActiveThreads.wait(locker.mutex()); } else { QElapsedTimer timer; // 創建一個時鐘並且不斷的等待。 timer.start(); int t; while (!(queue.isEmpty() && activeThreads == 0) && ((t = msecs - timer.elapsed()) > 0)) // 時間到後返回狀態 noActiveThreads.wait(locker.mutex(), t); } return queue.isEmpty() && activeThreads == 0; } ``` ## QThreadPoolThread header ```cpp #include "qthreadpool.h" #include "qthreadpool_p.h" #include "qelapsedtimer.h" #include <algorithm> #ifndef QT_NO_THREAD QT_BEGIN_NAMESPACE Q_GLOBAL_STATIC(QThreadPool, theInstance) /* QThread wrapper, provides synchronization against a ThreadPool */ class QThreadPoolThread : public QThread { public: QThreadPoolThread(QThreadPoolPrivate *manager); // 重載線程類的run()函數。 void run() Q_DECL_OVERRIDE; void registerThreadInactive(); // QWaitCondition是一個狀態變數,主要的用途是用來同步線程。 // QWaitCondition documentation: http://doc.qt.io/qt-5/qwaitcondition.html QWaitCondition runnableReady; // 每個線程被實例化後都會放到這個線程池中等待調度。 QThreadPoolPrivate *manager; // 指向該線程被分配到的QRunnable任務物件。 QRunnable *runnable; }; ``` QThreadPoolThread是用來運行送入線程池中的任務物件的線程實體,而這個QThreadPoolThread繼承了QThread,如同我們創建QThread衍生類一樣,這個線程最重要的動作就是重載了run()函數。 ## QThreadPoolThread::run() ```cpp /* \internal */ void QThreadPoolThread::run() { // QMutexLocker是一個用來操作互斥量的類 // QMutexLocker documentation: http://doc.qt.io/qt-5/qmutexlocker.html QMutexLocker locker(&manager->mutex); for(;;) { // 初始化r及runnable兩個指標 QRunnable *r = runnable; runnable = 0; do { if (r) { // 取得當前運行的QRunnable任務是否有開啟autoDelete const bool autoDelete = r->autoDelete(); // 這裡之所以解所互斥量是因為這個互斥量是為了保護"等待運行的任務佇列", // 因此當任務可以開始運行前,也就是已經從任務佇列中取出任務後, // 解鎖互斥量讓其他的線程可以繼續從佇列中取出任務。 // run the task locker.unlock(); #ifndef QT_NO_EXCEPTIONS try { #endif // 執行任務 r->run(); #ifndef QT_NO_EXCEPTIONS } catch (...) { // 不接受任何型態的Exception qWarning("Qt Concurrent has caught an exception thrown from a worker thread.\n" "This is not supported, exceptions thrown in worker threads must be\n" "caught before control returns to Qt Concurrent."); registerThreadInactive(); throw; } #endif // 鎖住互斥量, locker.relock(); if (autoDelete && !--r->ref) // 若這個任務物件的autoDelete屬性被設為true,則在任務運行結束之後, // 釋放這個任務物件的記憶體區塊。 delete r; } // 因為這個線程池的線程數是可變動的,也就是說有可能存在一個狀態為: // 現在處於運行狀態(active)的線程數大於最大線程數(maxThreadCount) // 但是這個狀態只會發生在這些線程已經開始運行之後,最大線程數才被下修 // 因此若這個狀態發生,已經做完工作的線程就會檢查是否activeThread數大於maxThread // 若目前已經超過最大線程數,則會將自己註銷並且不再被覆用。 // if too many threads are active, expire this thread if (manager->tooManyThreadsActive()) break; // 檢查是否還有等待執行的任務,如果有,則將任務佇列的第一個拿出來並且運行。 // 否則這個線程中的r指標被設為0,那麼就會跳出這個迴圈,並且這個線程將變為"Inactive"。 r = !manager->queue.isEmpty() ? manager->queue.takeFirst().first : 0; } while (r != 0); // 如果線程池處於退出狀態,那麼就停止運行下一個任務。 // 在線程池物件的reset()函數被調用時,這個數值會被設為true if (manager->isExiting) { registerThreadInactive(); break; } // if too many threads are active, expire this thread bool expired = manager->tooManyThreadsActive(); if (!expired) { // 將線程自己放入Inactive線程佇列中,等待被覆用。 manager->waitingThreads.enqueue(this); registerThreadInactive(); // 開始等待任務運行,直到設定的expiryTimeout,註銷這個線程。 // 不過如果進入了這個區塊,這個for(;;)迴圈將繼續進行等待任務直到太多線程處於active狀態。 // wait for work, exiting after the expiry timeout is reached runnableReady.wait(locker.mutex(), manager->expiryTimeout); ++manager->activeThreads; if (manager->waitingThreads.removeOne(this)) expired = true; } if (expired) { // 將自己放入已過期(expired)的線程佇列中,等待被註銷。 manager->expiredThreads.enqueue(this); registerThreadInactive(); break; } } } ``` ## QThreadPoolThread::registerThreadInactive() ```cpp void QThreadPoolThread::registerThreadInactive() { if (--manager->activeThreads == 0) manager->noActiveThreads.wakeAll(); } ``` 這個函數實際上並不會對調用這個函數的線程產生什麼實質的影響,只會將activeThreads的數量-1。 會調用這個函數的時機: + 任務在被運行任務期間拋出異常,這個任務就會停止工作。 + 線程池退出(即isExiting為true) + 處於active的線程數未達上限,但是任務等待佇列已經空了,這時候線程,等待新的任務物件進入,這時候僅只有activeThread數量被-1,然後這個線程的for(;;)循環將會繼續。 + 處於active的線程數已達上限,這時候這個線程就會被推入已過期的線程佇列中等待被註銷(或有可能被複用)。