中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

Java 線程池詳解

2018-09-12    來源:importnew

容器云強(qiáng)勢上線!快速搭建集群,上萬Linux鏡像隨意使用

構(gòu)造一個線程池為什么需要幾個參數(shù)?如果避免線程池出現(xiàn)OOM?RunnableCallable的區(qū)別是什么?本文將對這些問題一一解答,同時還將給出使用線程池的常見場景和代碼片段。

基礎(chǔ)知識

Executors創(chuàng)建線程池

Java中創(chuàng)建線程池很簡單,只需要調(diào)用Executors中相應(yīng)的便捷方法即可,比如Executors.newFixedThreadPool(int nThreads),但是便捷不僅隱藏了復(fù)雜性,也為我們埋下了潛在的隱患(OOM,線程耗盡)。

Executors創(chuàng)建線程池便捷方法列表:

方法名 功能
newFixedThreadPool(int nThreads) 創(chuàng)建固定大小的線程池
newSingleThreadExecutor() 創(chuàng)建只有一個線程的線程池
newCachedThreadPool() 創(chuàng)建一個不限線程數(shù)上限的線程池,任何提交的任務(wù)都將立即執(zhí)行

小程序使用這些快捷方法沒什么問題,對于服務(wù)端需要長期運行的程序,創(chuàng)建線程池應(yīng)該直接使用ThreadPoolExecutor的構(gòu)造方法。沒錯,上述Executors方法創(chuàng)建的線程池就是ThreadPoolExecutor

ThreadPoolExecutor構(gòu)造方法

Executors中創(chuàng)建線程池的快捷方法,實際上是調(diào)用了ThreadPoolExecutor的構(gòu)造方法(定時任務(wù)使用的是ScheduledThreadPoolExecutor),該類構(gòu)造方法參數(shù)列表如下:

// Java線程池的完整構(gòu)造函數(shù)
public ThreadPoolExecutor(
  int corePoolSize, // 線程池長期維持的線程數(shù),即使線程處于Idle狀態(tài),也不會回收。
  int maximumPoolSize, // 線程數(shù)的上限
  long keepAliveTime, TimeUnit unit, // 超過corePoolSize的線程的idle時長,
                                     // 超過這個時間,多余的線程會被回收。
  BlockingQueue<Runnable> workQueue, // 任務(wù)的排隊隊列
  ThreadFactory threadFactory, // 新線程的產(chǎn)生方式
  RejectedExecutionHandler handler) // 拒絕策略

竟然有7個參數(shù),很無奈,構(gòu)造一個線程池確實需要這么多參數(shù)。這些參數(shù)中,比較容易引起問題的有corePoolSize,?maximumPoolSize,?workQueue以及handler

  • corePoolSizemaximumPoolSize設(shè)置不當(dāng)會影響效率,甚至耗盡線程;
  • workQueue設(shè)置不當(dāng)容易導(dǎo)致OOM;
  • handler設(shè)置不當(dāng)會導(dǎo)致提交任務(wù)時拋出異常。

正確的參數(shù)設(shè)置方式會在下文給出。

線程池的工作順序

If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

corePoolSize -> 任務(wù)隊列 -> maximumPoolSize -> 拒絕策略

Runnable和Callable

可以向線程池提交的任務(wù)有兩種:RunnableCallable,二者的區(qū)別如下:

  1. 方法簽名不同,void Runnable.run(),?V Callable.call() throws Exception
  2. 是否允許有返回值,Callable允許有返回值
  3. 是否允許拋出異常,Callable允許拋出異常。

Callable是JDK1.5時加入的接口,作為Runnable的一種補(bǔ)充,允許有返回值,允許拋出異常。

三種提交任務(wù)的方式:

提交方式 是否關(guān)心返回結(jié)果
Future<T> submit(Callable<T> task)
void execute(Runnable command)
Future<?> submit(Runnable task) 否,雖然返回Future,但是其get()方法總是返回null

如何正確使用線程池

避免使用無界隊列

不要使用Executors.newXXXThreadPool()快捷方法創(chuàng)建線程池,因為這種方式會使用無界的任務(wù)隊列,為避免OOM,我們應(yīng)該使用ThreadPoolExecutor的構(gòu)造方法手動指定隊列的最大長度:

ExecutorService executorService = new ThreadPoolExecutor(2, 2, 
                0, TimeUnit.SECONDS, 
                new ArrayBlockingQueue<>(512), // 使用有界隊列,避免OOM
                new ThreadPoolExecutor.DiscardPolicy());

明確拒絕任務(wù)時的行為

任務(wù)隊列總有占滿的時候,這是再submit()提交新的任務(wù)會怎么樣呢?RejectedExecutionHandler接口為我們提供了控制方式,接口定義如下:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

線程池給我們提供了幾種常見的拒絕策略:
undefined

拒絕策略 拒絕行為
AbortPolicy 拋出RejectedExecutionException
DiscardPolicy 什么也不做,直接忽略
DiscardOldestPolicy 丟棄執(zhí)行隊列中最老的任務(wù),嘗試為當(dāng)前提交的任務(wù)騰出位置
CallerRunsPolicy 直接由提交任務(wù)者執(zhí)行這個任務(wù)

線程池默認(rèn)的拒絕行為是AbortPolicy,也就是拋出RejectedExecutionHandler異常,該異常是非受檢異常,很容易忘記捕獲。如果不關(guān)心任務(wù)被拒絕的事件,可以將拒絕策略設(shè)置成DiscardPolicy,這樣多余的任務(wù)會悄悄的被忽略。

ExecutorService executorService = new ThreadPoolExecutor(2, 2, 
                0, TimeUnit.SECONDS, 
                new ArrayBlockingQueue<>(512), 
                new ThreadPoolExecutor.DiscardPolicy());// 指定拒絕策略

獲取處理結(jié)果和異常

線程池的處理結(jié)果、以及處理過程中的異常都被包裝到Future中,并在調(diào)用Future.get()方法時獲取,執(zhí)行過程中的異常會被包裝成ExecutionException,submit()方法本身不會傳遞結(jié)果和任務(wù)執(zhí)行過程中的異常。獲取執(zhí)行結(jié)果的代碼可以這樣寫:

ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<Object> future = executorService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            throw new RuntimeException("exception in call~");// 該異常會在調(diào)用Future.get()時傳遞給調(diào)用者
        }
    });
    
try {
  Object result = future.get();
} catch (InterruptedException e) {
  // interrupt
} catch (ExecutionException e) {
  // exception in Callable.call()
  e.printStackTrace();
}

上述代碼輸出類似如下:
undefined

線程池的常用場景

正確構(gòu)造線程池

int poolSize = Runtime.getRuntime().availableProcessors() * 2;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();
executorService = new ThreadPoolExecutor(poolSize, poolSize,
    0, TimeUnit.SECONDS,
            queue,
            policy);

獲取單個結(jié)果

submit()向線程池提交任務(wù)后會返回一個Future,調(diào)用V Future.get()方法能夠阻塞等待執(zhí)行結(jié)果,V get(long timeout, TimeUnit unit)方法可以指定等待的超時時間。

獲取多個結(jié)果

如果向線程池提交了多個任務(wù),要獲取這些任務(wù)的執(zhí)行結(jié)果,可以依次調(diào)用Future.get()獲得。但對于這種場景,我們更應(yīng)該使用ExecutorCompletionService,該類的take()方法總是阻塞等待某一個任務(wù)完成,然后返回該任務(wù)的Future對象。向CompletionService批量提交任務(wù)后,只需調(diào)用相同次數(shù)的CompletionService.take()方法,就能獲取所有任務(wù)的執(zhí)行結(jié)果,獲取順序是任意的,取決于任務(wù)的完成順序:

void solve(Executor executor, Collection<Callable<Result>> solvers)
   throws InterruptedException, ExecutionException {
   
   CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor);// 構(gòu)造器
   
   for (Callable<Result> s : solvers)// 提交所有任務(wù)
       ecs.submit(s);
       
   int n = solvers.size();
   for (int i = 0; i < n; ++i) {// 獲取每一個完成的任務(wù)
       Result r = ecs.take().get();
       if (r != null)
           use(r);
   }
}

單個任務(wù)的超時時間

V Future.get(long timeout, TimeUnit unit)方法可以指定等待的超時時間,超時未完成會拋出TimeoutException

多個任務(wù)的超時時間

等待多個任務(wù)完成,并設(shè)置最大等待時間,可以通過CountDownLatch完成:

public void testLatch(ExecutorService executorService, List<Runnable> tasks) 
    throws InterruptedException{
      
    CountDownLatch latch = new CountDownLatch(tasks.size());
      for(Runnable r : tasks){
          executorService.submit(new Runnable() {
              @Override
              public void run() {
                  try{
                      r.run();
                  }finally {
                      latch.countDown();// countDown
                  }
              }
          });
      }
      latch.await(10, TimeUnit.SECONDS); // 指定超時時間
  }

線程池和裝修公司

以運營一家裝修公司做個比喻。公司在辦公地點等待客戶來提交裝修請求;公司有固定數(shù)量的正式工以維持運轉(zhuǎn);旺季業(yè)務(wù)較多時,新來的客戶請求會被排期,比如接單后告訴用戶一個月后才能開始裝修;當(dāng)排期太多時,為避免用戶等太久,公司會通過某些渠道(比如人才市場、熟人介紹等)雇傭一些臨時工(注意,招聘臨時工是在排期排滿之后);如果臨時工也忙不過來,公司將決定不再接收新的客戶,直接拒單。

線程池就是程序中的“裝修公司”,代勞各種臟活累活。上面的過程對應(yīng)到線程池上:

// Java線程池的完整構(gòu)造函數(shù)
public ThreadPoolExecutor(
  int corePoolSize, // 正式工數(shù)量
  int maximumPoolSize, // 工人數(shù)量上限,包括正式工和臨時工
  long keepAliveTime, TimeUnit unit, // 臨時工游手好閑的最長時間,超過這個時間將被解雇
  BlockingQueue<Runnable> workQueue, // 排期隊列
  ThreadFactory threadFactory, // 招人渠道
  RejectedExecutionHandler handler) // 拒單方式

總結(jié)

Executors為我們提供了構(gòu)造線程池的便捷方法,對于服務(wù)器程序我們應(yīng)該杜絕使用這些便捷方法,而是直接使用線程池ThreadPoolExecutor的構(gòu)造方法,避免無界隊列可能導(dǎo)致的OOM以及線程個數(shù)限制不當(dāng)導(dǎo)致的線程數(shù)耗盡等問題。ExecutorCompletionService提供了等待所有任務(wù)執(zhí)行結(jié)束的有效方式,如果要設(shè)置等待的超時時間,則可以通過CountDownLatch完成。

參考

  • ThreadPoolExecutor API Doc

標(biāo)簽: 代碼 服務(wù)器

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:JDK 源碼閱讀 : DirectByteBuffer

下一篇:SpringBoot | 第十八章:web 應(yīng)用開發(fā)之WebJars 使用