1. java線程池中的核心線程是如何被重復利用的
Java線程池中的核心線程是如何被重復利用的?
引言
在Java開發中,經常需要創建線程去執行一些任務,實現起來也非常方便,但如果並發的線程數量很多,並且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁創建線程就會大大降低系統的效率,因為頻繁創建線程和銷毀線程需要時間。此時,我們很自然會想到使用線程池來解決這個問題。
使用線程池的好處:
降低資源消耗。java中所有的池化技術都有一個好處,就是通過復用池中的對象,降低系統資源消耗。設想一下如果我們有n多個子任務需要執行,如果我們為每個子任務都創建一個執行線程,而創建線程的過程是需要一定的系統消耗的,最後肯定會拖慢整個系統的處理速度。而通過線程池我們可以做到復用線程,任務有多個,但執行任務的線程可以通過線程池來復用,這樣減少了創建線程的開銷,系統資源利用率得到了提升。
降低管理線程的難度。多線程環境下對線程的管理是最容易出現問題的,而線程池通過框架為我們降低了管理線程的難度。我們不用再去擔心何時該銷毀線程,如何最大限度的避免多線程的資源競爭。這些事情線程池都幫我們代勞了。
提升任務處理速度。線程池中長期駐留了一定數量的活線程,當任務需要執行時,我們不必先去創建線程,線程池會自己選擇利用現有的活線程來處理任務。
很顯然,線程池一個很顯著的特徵就是「長期駐留了一定數量的活線程」,避免了頻繁創建線程和銷毀線程的開銷,那麼它是如何做到的呢?我們知道一個線程只要執行完了run()方法內的代碼,這個線程的使命就完成了,等待它的就是銷毀。既然這是個「活線程」,自然是不能很快就銷毀的。為了搞清楚這個「活線程」是如何工作的,下面通過追蹤源碼來看看能不能解開這個疑問。
分析方法
在分析源碼之前先來思考一下要怎麼去分析,源碼往往是比較復雜的,如果知識儲備不夠豐厚,很有可能會讀不下去,或者讀岔了。一般來講要時刻緊跟著自己的目標來看代碼,跟目標關系不大的代碼可以不理會它,一些異常的處理也可以暫不理會,先看正常的流程。就我們現在要分析的源碼而言,目標就是看看線程是如何被復用的。那麼對於線程池的狀態的管理以及非正常狀態下的處理代碼就可以不理會,具體來講,在ThreadPollExcutor類中,有一個欄位private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));是對線程池的運行狀態和線程池中有效線程的數量進行控制的, 它包含兩部分信息: 線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount),還有幾個對ctl進行計算的方法:
- // 獲取運行狀態
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- // 獲取活動線程數
- private static int workerCountOf(int c) { return c & CAPACITY; }123456
以上兩個方法在源碼中經常用到,結合我們的目標,對運行狀態的一些判斷及處理可以不用去管,而對當前活動線程數要加以關注等等。
下面將遵循這些原則來分析源碼。
解惑
當我們要向線程池添加一個任務時是調用ThreadPollExcutor對象的execute(Runnable command)方法來完成的,所以先來看看ThreadPollExcutor類中的execute(Runnable command)方法的源碼:
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- else if (!addWorker(command, false))
- reject(command);
- }
按照我們在分析方法中提到的一些原則,去掉一些相關性不強的代碼,看看核心代碼是怎樣的。
- // 為分析而簡化後的代碼
- public void execute(Runnable command) {
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) {
- // 如果當前活動線程數小於corePoolSize,則新建一個線程放入線程池中,並把任務添加到該線程中
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // 如果當前活動線程數大於等於corePoolSize,則嘗試將任務放入緩存隊列
- if (workQueue.offer(command)) {
- int recheck = ctl.get();
- if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }else {
- // 緩存已滿,新建一個線程放入線程池,並把任務添加到該線程中(此時新建的線程相當於非核心線程)
- addWorker(command, false)
- }
- }22
這樣一看,邏輯應該清晰很多了。
如果 當前活動線程數 < 指定的核心線程數,則創建並啟動一個線程來執行新提交的任務(此時新建的線程相當於核心線程);
如果 當前活動線程數 >= 指定的核心線程數,且緩存隊列未滿,則將任務添加到緩存隊列中;
如果 當前活動線程數 >= 指定的核心線程數,且緩存隊列已滿,則創建並啟動一個線程來執行新提交的任務(此時新建的線程相當於非核心線程);
接下來看addWorker(Runnable firstTask, boolean core)方法
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- int wc = workerCountOf(c);
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- if ((c))
- break retry;
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed e to workerCount change; retry inner loop
- }
- }
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int rs = runStateOf(ctl.get());
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
同樣,我們也來簡化一下:
- // 為分析而簡化後的代碼
- private boolean addWorker(Runnable firstTask, boolean core) {
- int wc = workerCountOf(c);
- if (wc >= (core ? corePoolSize : maximumPoolSize))
- // 如果當前活動線程數 >= 指定的核心線程數,不創建核心線程
- // 如果當前活動線程數 >= 指定的最大線程數,不創建非核心線程
- return false;
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- // 新建一個Worker,將要執行的任務作為參數傳進去
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- workers.add(w);
- workerAdded = true;
- if (workerAdded) {
- // 啟動剛剛新建的那個worker持有的線程,等下要看看這個線程做了啥
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }2223242526272829303132
看到這里,我們大概能猜測到,addWorker方法的功能就是新建一個線程並啟動這個線程,要執行的任務應該就是在這個線程中執行。為了證實我們的這種猜測需要再來看看Worker這個類。
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable{
- // ....
- }
- Worker(Runnable firstTask) {
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);
- }123456789101112
從上面的Worker類的聲明可以看到,它實現了Runnable介面,以及從它的構造方法中可以知道待執行的任務賦值給了它的變數firstTask,並以它自己為參數新建了一個線程賦值給它的變數thread,那麼運行這個線程的時候其實就是執行Worker的run()方法,來看一下這個方法:
- public void run() {
- runWorker(this);
- }
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- while (task != null || (task = getTask()) != null) {
- w.lock();
- // If pool is stopping, ensure thread is interrupted;
- // if not, ensure thread is not interrupted. This
- // requires a recheck in second case to deal with
- // shutdownNow race while clearing interrupt
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }04142434445464748
在run()方法中只調了一下 runWorker(this) 方法,再來簡化一下這個 runWorker() 方法
- // 為分析而簡化後的代碼
- final void runWorker(Worker w) {
- Runnable task = w.firstTask;
- w.firstTask = null;
- while (task != null || (task = getTask()) != null) {
- try {
- task.run();
- } finally {
- task = null;
- }
- }
- }12345678910111213
很明顯,runWorker()方法裡面執行了我們新建Worker對象時傳進去的待執行的任務,到這里為止貌似這個worker的run()方法就執行完了,既然執行完了那麼這個線程也就沒用了,只有等待虛擬機銷毀了。那麼回顧一下我們的目標:Java線程池中的核心線程是如何被重復利用的?好像並沒有重復利用啊,新建一個線程,執行一個任務,然後就結束了,銷毀了。沒什麼特別的啊,難道有什麼地方漏掉了,被忽略了?再仔細看一下runWorker()方法的代碼,有一個while循環,當執行完firstTask後task==null了,那麼就會執行判斷條件(task = getTask()) != null,我們假設這個條件成立的話,那麼這個線程就不止只執行一個任務了,可以執行多個任務了,也就實現了重復利用了。答案呼之欲出了,接著看getTask()方法
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // Are workers subject to culling?
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if ((c))
- return null;
- continue;
- }
- try {
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
老規矩,簡化一下代碼來看:
- // 為分析而簡化後的代碼
- private Runnable getTask() {
- boolean timedOut = false;
- for (;;) {
- int c = ctl.get();
- int wc = workerCountOf(c);
- // timed變數用於判斷是否需要進行超時控制。
- // allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時;
- // wc > corePoolSize,表示當前線程池中的線程數量大於核心線程數量;
- // 對於超過核心線程數量的這些線程,需要進行超時控制
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- if (timed && timedOut) {
- // 如果需要進行超時控制,且上次從緩存隊列中獲取任務時發生了超時,那麼嘗試將workerCount減1,即當前活動線程數減1,
- // 如果減1成功,則返回null,這就意味著runWorker()方法中的while循環會被退出,其對應的線程就要銷毀了,也就是線程池中少了一個線程了
- if ((c))
- return null;
- continue;
- }
- try {
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- // 注意workQueue中的poll()方法與take()方法的區別
- //poll方式取任務的特點是從緩存隊列中取任務,最長等待keepAliveTime的時長,取不到返回null
- //take方式取任務的特點是從緩存隊列中取任務,若隊列為空,則進入阻塞狀態,直到能取出對象為止
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }39
從以上代碼可以看出,getTask()的作用是
如果當前活動線程數大於核心線程數,當去緩存隊列中取任務的時候,如果緩存隊列中沒任務了,則等待keepAliveTime的時長,此時還沒任務就返回null,這就意味著runWorker()方法中的while循環會被退出,其對應的線程就要銷毀了,也就是線程池中少了一個線程了。因此只要線程池中的線程數大於核心線程數就會這樣一個一個地銷毀這些多餘的線程。
如果當前活動線程數小於等於核心線程數,同樣也是去緩存隊列中取任務,但當緩存隊列中沒任務了,就會進入阻塞狀態,直到能取出任務為止,因此這個線程是處於阻塞狀態的,並不會因為緩存隊列中沒有任務了而被銷毀。這樣就保證了線程池有N個線程是活的,可以隨時處理任務,從而達到重復利用的目的。
小結
通過以上的分析,應該算是比較清楚地解答了「線程池中的核心線程是如何被重復利用的」這個問題,同時也對線程池的實現機制有了更進一步的理解:
當有新任務來的時候,先看看當前的線程數有沒有超過核心線程數,如果沒超過就直接新建一個線程來執行新的任務,如果超過了就看看緩存隊列有沒有滿,沒滿就將新任務放進緩存隊列中,滿了就新建一個線程來執行新的任務,如果線程池中的線程數已經達到了指定的最大線程數了,那就根據相應的策略拒絕任務。
當緩存隊列中的任務都執行完了的時候,線程池中的線程數如果大於核心線程數,就銷毀多出來的線程,直到線程池中的線程數等於核心線程數。此時這些線程就不會被銷毀了,它們一直處於阻塞狀態,等待新的任務到來。
注意:
本文所說的「核心線程」、「非核心線程」是一個虛擬的概念,是為了方便描述而虛擬出來的概念,在代碼中並沒有哪個線程被標記為「核心線程」或「非核心線程」,所有線程都是一樣的,只是當線程池中的線程多於指定的核心線程數量時,會將多出來的線程銷毀掉,池中只保留指定個數的線程。那些被銷毀的線程是隨機的,可能是第一個創建的線程,也可能是最後一個創建的線程,或其它時候創建的線程。一開始我以為會有一些線程被標記為「核心線程」,而其它的則是「非核心線程」,在銷毀多餘線程的時候只銷毀那些「非核心線程」,而「核心線程」不被銷毀。這種理解是錯誤的。
另外還有一個重要的介面 BlockingQueue 值得去了解,它定義了一些入隊出隊同步操作的方法,還可以阻塞,作用很大。
2. 關於java中DecimalFormat的問題。
把newSalary轉為double型,然後再format就好了,看源碼就會知道,String類型是不被允許的
publicfinalStringBufferformat(Objectnumber,
StringBuffertoAppendTo,
FieldPositionpos){
if(numberinstanceofLong||numberinstanceofInteger||
numberinstanceofShort||numberinstanceofByte||
numberinstanceofAtomicInteger||
numberinstanceofAtomicLong||
(numberinstanceofBigInteger&&
((BigInteger)number).bitLength()<64)){
returnformat(((Number)number).longValue(),toAppendTo,pos);
}elseif(numberinstanceofBigDecimal){
returnformat((BigDecimal)number,toAppendTo,pos);
}elseif(numberinstanceofBigInteger){
returnformat((BigInteger)number,toAppendTo,pos);
}elseif(numberinstanceofNumber){
returnformat(((Number)number).doubleValue(),toAppendTo,pos);
}else{
("");
}
}
3. Java線程池newFixedThreadPool源碼分析
本文深入探討了Java線程池的創建與任務提交機制,以`newFixedThreadPool`為例進行源碼分析。首先,`Executors.newFixedThreadPool()`創建線程池時,底層調用`ThreadPoolExecutor`的構造函數,核心與最大線程數默認值傳入,線程存活時間默認為0毫秒。值得注意的是,`workQueue`底層採用`LinkedBlockingQueue`,容量設置為`Integer.MAX_VALUE`,意味著任務隊列容量無限大,極端情況下可能導致內存溢出。
在自定義線程池時,用戶需明確傳入隊列`workQueue`,並賦予其實際容量。`workQueue`內部實現為鏈表結構,初始化時設置`last`和`head`為`null`,在`execute`方法中再次涉及隊列操作。進一步分析,`ThreadPoolExecutor`調用自身內部構造函數,核心參數保持一致,隊列設置為`new LinkedBlockingQueue()`,工廠為默認線程工廠,過期策略默認為`AbortPolicy()`。
`ThreadPoolExecutor`繼承自`AbstractExecutorService`,後者實現了`Executors`介面,提供了創建線程池的基礎框架。創建線程池時,系統會進行一系列參數合法性檢查,確保傳入的參數合理,包括核心與最大線程數、存活時間等,同時驗證隊列、工廠、過期策略是否非空。
接著,我們將視線轉移到`submit`方法,通過`submit()`向線程池提交任務。此操作實際由`AbstractExecutorService`實現,支持`Runnable`與`Callable`的提交,二者皆轉為`RunnableFuture`。`newTaskFor`方法中,`FutureTask`類被構造,實現了`RunnableFuture`介面,繼承了`Runnable`與`Future`。因此,`submit`方法最終生成`FutureTask`對象,通過`run`方法執行,實際由內部`callable`屬性實現。
`execute(FutureTask)`方法由`ThreadPoolExecutor`實現,接收`FutureTask`作為參數。方法首先檢查`FutureTask`是否為`null`,並進行相關注釋。接著,進行一系列判斷,包括線程池狀態、隊列容量與新任務狀態等,確保線程池狀態合法、任務與隊列匹配。
在`addWorker`方法中,`ctl`變數被用於記錄線程池的生命周期狀態與當前工作線程數。`ctl`通過`ctlOf`方法封裝`runState`與`workerCount`,並提供解封裝方法,如`runStateOf`、`workerCountOf`等。`COUNT_BITS`與`CAPACITY`常量用於處理`ctl`變數的位操作,以實現高效狀態管理。
`execute`方法執行邏輯分為兩部分:一部分處理核心線程數與工作線程數的關系,通過`addWorker`方法動態擴展線程池;另一部分處理隊列任務的提交與執行,確保線程池狀態與任務需求相匹配。`addWorker`方法內部,通過CAS操作安全增加工作線程數,並根據線程池狀態與新任務狀態執行相應邏輯。`Worker`類作為線程執行器,繼承`AbstractQueuedSynchronizer`,實現`Runnable`介面,封裝任務執行邏輯。
`runWorker`方法實現線程執行邏輯,包括從隊列獲取任務、處理線程池狀態與異常情況、執行任務與任務完成後的工作清理。`FutureTask`的`run`方法調用任務執行邏輯,並將結果存儲在`outcome`中,供後續獲取返回值。
`execute`方法第三部分處理線程池滿載或核心線程數量時的擴展邏輯,通過隊列`offer`方法添加任務,並根據線程池狀態與隊列容量調整工作線程數。`LinkedBlockingQueue`中的`offer`方法實現隊列元素的添加,利用`AtomicInteger`類的`getAndIncrement`方法安全更新隊列元素計數。`Condition`機制用於線程間的同步與喚醒,確保線程池與任務隊列的高效管理。
總之,`newFixedThreadPool`源碼展示了Java線程池管理的高效與靈活性。通過深入理解線程池的創建、任務提交與執行機制,開發者能夠更好地利用線程池優化應用性能,解決並發編程中的資源管理與任務調度問題。本文僅提供了一個簡要概述,實際源碼細節與更多優化策略值得進一步探索與研究。