`
wankunde
  • 浏览: 158088 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

jdk5 多线程学习

阅读更多

 

多线程复习

线程有时称为 轻量级进程。与进程一样,它们拥有通过程序运行的独立的并发路径,并且每个线程都有自己的程序计数器,称为堆栈和本地变量。然而,线程存在于进程中,它们与同一进程内的其他线程共享内存、文件句柄以及每进程状态。

一个进程中的线程是在同一个地址空间中执行的,所以多个线程可以同时访问相同对象,并且它们从同一堆栈中分配对象。

 

在 JDK 5.0 之前,确保线程安全的主要机制是 synchronized 原语。访问共享变量(那些可以由多个线程访问的变量)的线程必须使用同步来协调对共享变量的读写访问。

 

创建线程的方法:

可以用两种方法创建线程,通过扩展 Thread 和覆盖 run() 方法,或者通过实现 Runnable 接口和使用 Thread(Runnable)构造函数:

Java代码 复制代码
  1. class WorkerThread extends Thread {    
  2.   public void run() { /* do work */ }   
  3. }   
  4. Thread t = new WorkerThread();   
  5. t.start();  

 

或是

Java代码 复制代码
  1. Thread t = new Thread(new Runnable() {    
  2.   public void run() { /* do work */ }   
  3. }   
  4. t.start();  

 

创建线程会使用相当一部分内存,其中包括有两个堆栈(Java 和 C),以及每线程数据结构。如果创建过多线程,其中每个线程都将占用一些 CPU 时间,结果将使用许多内存来支持大量线程,每个线程都运行得很慢。这样就无法很好地使用计算资源。

下面的代码就是一段不好的利用线程的代码:

Java代码 复制代码
  1. class UnreliableWebServer {    
  2.   public static void main(String[] args) {   
  3.     ServerSocket socket = new ServerSocket(80);   
  4.       while (true) {   
  5.       final Socket connection = socket.accept();   
  6.       Runnable r = new Runnable() {   
  7.         public void run() {   
  8.           handleRequest(connection);   
  9.         }   
  10.       };   
  11.       // Don't do this!   
  12.       new Thread(r).start();   
  13.     }   
  14.   }   
  15. }  

 

当服务器被请求吞没时,UnreliableWebServer 类不能很好地处理这种情况。每次有请求时,就会创建新的类。根据操作系统和可用内存,可以创建的线程数是有限的。不幸的是,您通常不知道限制是多少 —— 只有当应用程序因为OutOfMemoryError 而崩溃时才发现。如果足够快地向这台服务器上抛出请求的话,最终其中一个线程创建将失败,生成的 Error 会关闭整个应用程序。

为任务创建新的线程并不一定不好,但是如果创建任务的频率高,而平均任务持续时间低,我们可以看到每项任务创建一个新的线程将产生性能(如果负载不可预知,还有稳定性)问题.

使用线程池解决问题

管理一大组小任务的标准机制是组合工作队列线程池。工作队列就是要处理的任务的队列,线程池是线程的集合,每个线程都提取公用工作队列。当一个工作线程完成任务处理后,它会返回队列,查看是否有其他任务需要处理。如果有,它会转移到下一个任务,并开始处理。作为一种额外好处,因为请求到达时,线程已经存在,从而可以消除由创建线程引起的延迟。因此,可以立即处理请求,使应用程序更易响应。而且,通过正确调整线程池中的线程数,可以强制超出特定限制的任何请求等待,直到有线程可以处理它,它们等待时所消耗的资源要少于使用额外线程所消耗的资源,这样可以防止资源崩溃。

说了半天,上段代码

Java代码 复制代码
  1. class ReliableWebServer {    
  2.   Executor pool =   
  3.     Executors.newFixedThreadPool(7);   
  4.     public static void main(String[] args) {   
  5.     ServerSocket socket = new ServerSocket(80);   
  6.       while (true) {   
  7.       final Socket connection = socket.accept();   
  8.       Runnable r = new Runnable() {   
  9.         public void run() {   
  10.           handleRequest(connection);   
  11.         }   
  12.       };   
  13.       pool.execute(r);   
  14.     }   
  15.   }   
  16. }  

 java.util.concurrent 包中包含灵活的线程池实现,Executor就是这个包中的。(以后会对其进行详细的介绍,但不在本文内)

创建 Executor 时,人们普遍会问的一个问题是“线程池应该有多大?”

用 WT 表示每项任务的平均等待时间,ST 表示每项任务的平均服务时间(计算时间)。则 WT/ST 是每项任务等待所用时间的百分比。对于 N 处理器系统,池中可以近似有 N*(1+WT/ST) 个线程。

 

二、

 

在学习jdk5的新特性之前,先看一个多线程的模式:Future Pattern

       去蛋糕店买蛋糕,不需要等蛋糕做出来(假设现做要很长时间),只需要领个提货单就可以了(去干别的事情),等到蛋糕做好了,再拿提货单取蛋糕就可以了。future模式与这个场景类似。

       假设有一个需要执行一段时间的方法,我们可以不必等待结果出来,而是获取一个替代的“提货单”。因为获取“提货单”不需要花时间,这时这个“提货单”就是future参与者。

      获取future参与者的线程会在事后再去获取执行结果,就好像拿提货单去取蛋糕一样。如果有执行结果了,就可以马上拿到数据。如果没有结果,就等到有结果。

     下面看一段代码:

Java代码 复制代码
  1. public class Main {   
  2.     public static void main(String[] args) {   
  3.         System.out.println("main BEGIN");   
  4.         Host host = new Host();   
  5.         Data data1 = host.request(10'A');   
  6.         Data data2 = host.request(20'B');   
  7.         Data data3 = host.request(30'C');   
  8.   
  9.         System.out.println("main otherJob BEGIN");   
  10.         try {   
  11.             Thread.sleep(200);   
  12.         } catch (InterruptedException e) {   
  13.         }   
  14.         System.out.println("main otherJob END");   
  15.   
  16.         System.out.println("data1 = " + data1.getContent());   
  17.         System.out.println("data2 = " + data2.getContent());   
  18.         System.out.println("data3 = " + data3.getContent());   
  19.         System.out.println("main END");   
  20.     }   
  21. }  

 这里的main类就相当于“顾客”,host就相当于“蛋糕店”,顾客向“蛋糕店”定蛋糕就相当于“发请求request”,返回的数据data就相当于“提货单”而不是真正的“蛋糕”。在过一段时间后(sleep一段时间后),再去凭“提货单”取蛋糕“data1.getContent()”。

下面来看一下,顾客定蛋糕后,蛋糕店做了什么:

Java代码 复制代码
  1. public class Host {   
  2.     public Data request(final int count, final char c) {   
  3.         System.out.println("    request(" + count + ", " + c + ") BEGIN");   
  4.   
  5.         // (1) 建立FutureData的实体   
  6.         final FutureData future = new FutureData();   
  7.   
  8.         // (2) 为了建立RealData的实体,启动新的线程   
  9.         new Thread() {                                         
  10.             public void run() {                                
  11.                 RealData realdata = new RealData(count, c);   
  12.                 future.setRealData(realdata);   
  13.             }                                                  
  14.         }.start();                                             
  15.   
  16.         System.out.println("    request(" + count + ", " + c + ") END");   
  17.   
  18.         // (3) 取回FutureData实体,作为传回值   
  19.         return future;   
  20.     }   
  21. }  

  host("蛋糕店")在接到请求后,先生成了“提货单”FutureData的实例future,然后命令“蛋糕师傅”RealData去做蛋糕,realdata相当于起个线程去做蛋糕了。然后host返回给顾客的仅仅是“提货单”future,而不是蛋糕。当蛋糕做好后,蛋糕师傅才能给对应的“提货单”蛋糕,也就是future.setRealData(realdata)。

   下面来看看蛋糕师傅是怎么做蛋糕的:

 

Java代码 复制代码
  1.  public class RealData implements Data {   
  2.     private final String content;   
  3.     public RealData(int count, char c) {   
  4.         System.out.println("        making RealData(" + count + ", " + c + ") BEGIN");   
  5.         char[] buffer = new char[count];   
  6.         for (int i = 0; i < count; i++) {   
  7.             buffer[i] = c;   
  8.             try {   
  9.                 Thread.sleep(1000);   
  10.             } catch (InterruptedException e) {   
  11.             }   
  12.         }   
  13.         System.out.println("        making RealData(" + count + ", " + c + ") END");   
  14.         this.content = new String(buffer);   
  15.     }   
  16.     public String getContent() {   
  17.         return content;   
  18.     }   
  19. }  

   现在来看看“提货单”future是怎么与蛋糕"content"对应的:

Java代码 复制代码
  1. public class FutureData implements Data {   
  2.     private RealData realdata = null;   
  3.     private boolean ready = false;   
  4.   //将提货单与蛋糕师傅对应也就是与蛋糕对应,一个蛋糕师傅做一个订单   
  5.     public synchronized void setRealData(RealData realdata) {   
  6.         if (ready) {                           
  7.             return;     // balk   
  8.         }   
  9.         this.realdata = realdata;   
  10.         this.ready = true;   
  11.         notifyAll();   
  12.     }   
  13.     public synchronized String getContent() {   
  14.         while (!ready) {   
  15.             try {   
  16.                 wait();   
  17.             } catch (InterruptedException e) {   
  18.             }   
  19.         }   
  20.         return realdata.getContent();   
  21.     }   
  22. }  

 

   顾客做完自己的事情后,会拿着自己的“提货单”来取蛋糕:

 

Java代码 复制代码
  1. System.out.println("data1 = " + data1.getContent());  

 这时候如果蛋糕没做好,就只好等了:

Java代码 复制代码
  1. while (!ready) {   
  2.             try {   
  3.                 wait();   
  4.             } catch (InterruptedException e) {   
  5.             }   
  6. //等做好后才能取到     
  7. return realdata.getContent();  

 

 

   本文只是简单介绍一下future pattern,本人也是初学,如果要深入了解,还需要研究研究,本文代码并不优,只是做个说明性的例子。在以后将继续学习多线程。

 

三、生产者消费者模式

 

在进一步学习jdk5.0的多线程编程以前,先介绍一下生产者--消费者模式(producer-consumer)

生产者是指:生产数据的线程

消费者是指:使用数据的线程

生产者和消费者是不同的线程,他们处理数据的速度是不一样的,一般在二者之间还要加个“桥梁参与者”,用于缓冲二者之间处理数据的速度差。

下面用代码来说明:

Java代码 复制代码
  1. //生产者   
  2. public class MakerThread extends Thread {   
  3.     private final Random random;   
  4.     private final Table table;   
  5.     private static int id = 0;    
  6.     public MakerThread(String name, Table table, long seed) {   
  7.         super(name);   
  8.         this.table = table;//table就是桥梁参与者   
  9.         this.random = new Random(seed);   
  10.     }   
  11.     public void run() {   
  12.         try {   
  13.             while (true) {   
  14.                 Thread.sleep(random.nextInt(1000));//生产数据要耗费时间   
  15.                 String cake = "[ Cake No." + nextId() + " by " + getName() + " ]";//生产数据   
  16.                 table.put(cake);//将数据存入桥梁参与者   
  17.             }   
  18.         } catch (InterruptedException e) {   
  19.         }   
  20.     }   
  21.     private static synchronized int nextId() {   
  22.         return id++;   
  23.     }   
  24. }  

 

再来看看消费者:

Java代码 复制代码
  1. //消费者线程   
  2. public class EaterThread extends Thread {   
  3.     private final Random random;   
  4.     private final Table table;   
  5.     public EaterThread(String name, Table table, long seed) {   
  6.         super(name);   
  7.         this.table = table;   
  8.         this.random = new Random(seed);   
  9.     }   
  10.     public void run() {   
  11.         try {   
  12.             while (true) {   
  13.                 String cake = table.take();//从桥梁参与者中取数据   
  14.                 Thread.sleep(random.nextInt(1000));//消费者消费数据要花时间   
  15.             }   
  16.         } catch (InterruptedException e) {   
  17.         }   
  18.     }   
  19. }  

 

看来在这个模式里table是个很重要的角色啊,让我们来看看他吧(这里只给出个简单的):

Java代码 复制代码
  1. public class Table {   
  2.     private final String[] buffer;   
  3.     private int tail;  /下一个放put(数据)的地方    
  4.     private int head;  //下一个那曲take(数据)的地方   
  5.     private int count; // buffer内的数据数量   
  6.     public Table(int count) {   
  7.         this.buffer = new String[count];//总量是确定的   
  8.         this.head = 0;   
  9.         this.tail = 0;   
  10.         this.count = 0;   
  11.     }   
  12.     // 放置数据   
  13.     public synchronized void put(String cake) throws InterruptedException {   
  14.         System.out.println(Thread.currentThread().getName() + " puts " + cake);   
  15.         while (count >= buffer.length) {//数据放满了就只能等待   
  16.             wait();   
  17.         }   
  18.         buffer[tail] = cake;   
  19.         tail = (tail + 1) % buffer.length;   
  20.         count++;   
  21.         notifyAll();//有数据了,唤醒线程去取数据   
  22.     }   
  23.     // 取得数据   
  24.     public synchronized String take() throws InterruptedException {   
  25.         while (count <= 0) {//没有数据就只能等待   
  26.             wait();   
  27.         }   
  28.         String cake = buffer[head];   
  29.         head = (head + 1) % buffer.length;   
  30.         count--;   
  31.         notifyAll();//有位置可以放数据了,唤醒线程,不等了   
  32.         System.out.println(Thread.currentThread().getName() + " takes " + cake);   
  33.         return cake;   
  34.     }   
  35. }i  

 

好了我们来实验吧:

Java代码 复制代码
  1. public class Main {   
  2.     public static void main(String[] args) {   
  3.         Table table = new Table(3);     // 建立可以放置数据的桥梁参与者,3是他所能放置的最大数量的数据。  
  4.         new MakerThread("MakerThread-1", table, 31415).start();//生产数据   
  5.         new MakerThread("MakerThread-2", table, 92653).start();   
  6.         new MakerThread("MakerThread-3", table, 58979).start();   
  7.         new EaterThread("EaterThread-1", table, 32384).start();//消费数据   
  8.         new EaterThread("EaterThread-2", table, 62643).start();   
  9.         new EaterThread("EaterThread-3", table, 38327).start();   
  10.     }   
  11. }  

 

之所以在这里要介绍这个模式,是为了更好的理解jdk5的线程编程。

 

 

进入jdk5.0的线程编程了。

先来看一段代码:

Java代码 复制代码
  1. public class ThreadPoolTest {   
  2.     public static void main(String[] args) {   
  3.         int numWorkers = 10;//工作线程数   
  4.         int threadPoolSize = 2;//线程池大小   
  5.         ExecutorService tpes =   
  6.             Executors.newFixedThreadPool(threadPoolSize);//初始化线程池   
  7.   
  8.         WorkerThread[] workers = new WorkerThread[numWorkers];   
  9.         for (int i = 0; i < numWorkers; i++) {   
  10.             workers[i] = new WorkerThread(i);//初始一个任务   
  11.             tpes.execute(workers[i]);//执行任务   
  12.         }   
  13.         tpes.shutdown();//所有线程执行完毕后才关闭。   
  14. //         tpes.shutdownNow();//立即关闭   
  15.     }   
  16.   
  17. }  

 

看看工作线程:

Java代码 复制代码
  1. public class WorkerThread implements Runnable {   
  2.   
  3.     private int workerNumber;   
  4.   
  5.     WorkerThread(int number) {   
  6.         workerNumber = number;   
  7.     }   
  8.   
  9.     public void run() {   
  10.         for (int i=0;i<=100;i+=20) {   
  11.         //Perform some work...   
  12.             System.out.format("Worker number: %d, percent complete: %d%n",   
  13.                 workerNumber, i);   
  14.             try {   
  15.                 Thread.sleep((int)(Math.random() * 1000));   
  16.             } catch (InterruptedException e) { }   
  17.         }   
  18.     }   
  19.   
  20.   
  21. }  

 

从执行的结果可以看出:有两个线程在执行操作,因为我们的线程池中就只有两个线程。

这里要注意一下:

tpes.execute(workers[i]);

这里不是启动一个新线程,而是在仅仅是调用了run方法,并没有新建线程。这一点可以参看如下代码(节选自jdk5):

Java代码 复制代码
  1. **     
  2.          * Run a single task between before/after methods.     
  3.          */     
  4.         private void runTask(Runnable task) {      
  5.             final ReentrantLock runLock = this.runLock;      
  6.             runLock.lock();      
  7.             try {      
  8.                 // Abort now if immediate cancel.  Otherwise, we have      
  9.                 // committed to run this task.      
  10.                 if (runState == STOP)      
  11.                     return;      
  12.      
  13.                 Thread.interrupted(); // clear interrupt status on entry      
  14.                 boolean ran = false;      
  15.                 beforeExecute(thread, task);      
  16.                 try {      
  17.                     task.run();  //调用的是run()方法 而不是start()      
  18.                     ran = true;      
  19.                     afterExecute(task, null);      
  20.                     ++completedTasks;      
  21.                 } catch(RuntimeException ex) {      
  22.                     if (!ran)      
  23.                         afterExecute(task, ex);      
  24.                     // Else the exception occurred within      
  25.                     // afterExecute itself in which case we don't      
  26.                     // want to call it again.      
  27.                     throw ex;      
  28.                 }      
  29.             } finally {      
  30.                 runLock.unlock();      
  31.             }      
  32.         }    

 

请注意task.run(); 这句, 这儿并没有启动线程 而是简单的调用了一个普通对象的一个方法

从多线程设计的角度来讲,jdk5中的线程池应该是基于worker模式的。下一节将对worker模式进行介绍,以加深对jdk5中多线程编程的理解。

 

五、

我们来学学worker模式,大家也好对jdk5.0的线程池有一个更好的理解。

先来看看代码:

Java代码 复制代码
  1. public class Main {   
  2.     public static void main(String[] args) {   
  3.         Channel channel = new Channel(5);   // 工人线程的數量,即线程池内的线程数目   
  4.         channel.startWorkers();//启动线程池内的线程   
  5.         new ClientThread("Alice", channel).start();//发送请求的线程,相当于向队列加入请求   
  6.         new ClientThread("Bobby", channel).start();   
  7.         new ClientThread("Chris", channel).start();   
  8.     }   
  9. }  

 再来看看发送请求的client代码:

Java代码 复制代码
  1. public class ClientThread extends Thread {   
  2.     private final Channel channel;//相当于线程池   
  3.   
  4.     private static final Random random = new Random();   
  5.   
  6.     public ClientThread(String name, Channel channel) {   
  7.         super(name);   
  8.         this.channel = channel;   
  9.     }   
  10.   
  11.     public void run() {   
  12.         try {   
  13.             int i = 0;   
  14.             Request request = new Request(getName(), i);//生成请求   
  15.             channel.putRequest(request);//向队列中放入请求,也即把请求传给线程池   
  16.             Thread.sleep(random.nextInt(1000));   
  17.         } catch (InterruptedException e) {   
  18.         }   
  19.     }   
  20. }  

 

clientthread建立请求,并把请求传给了channel,下面来看看channel类(相当于线程池类)

Java代码 复制代码
  1. public class Channel {   
  2.     private static final int MAX_REQUEST = 100;   
  3.     private final Request[] requestQueue;//存放请求的队列   
  4.     private int tail;  // 下一个putRequest的地方   
  5.     private int head;  // 下一个takeRequest的地方   
  6.     private int count; // Request的数量   
  7.   
  8.     private final WorkerThread[] threadPool;   
  9.   
  10.     public Channel(int threads) {   
  11.         this.requestQueue = new Request[MAX_REQUEST];   
  12.         this.head = 0;   
  13.         this.tail = 0;   
  14.         this.count = 0;   
  15.   
  16.         threadPool = new WorkerThread[threads];   
  17.         for (int i = 0; i < threadPool.length; i++) {   
  18.             threadPool[i] = new WorkerThread("Worker-" + i, this);//生成线程池中的线程   
  19.         }   
  20.     }   
  21.     public void startWorkers() {   
  22.         for (int i = 0; i < threadPool.length; i++) {   
  23.             threadPool[i].start();//启动线程池中的线程   
  24.         }   
  25.     }   
  26.     public synchronized void putRequest(Request request) {//向队列中存入请求   
  27.         while (count >= requestQueue.length) {   
  28.             try {   
  29.                 wait();   
  30.             } catch (InterruptedException e) {   
  31.             }   
  32.         }   
  33.         requestQueue[tail] = request;   
  34.         tail = (tail + 1) % requestQueue.length;   
  35.         count++;   
  36.         notifyAll();   
  37.     }   
  38.     public synchronized Request takeRequest() {//从队列取出请求   
  39.         while (count <= 0) {   
  40.             try {   
  41.                 wait();   
  42.             } catch (InterruptedException e) {   
  43.             }   
  44.         }   
  45.         Request request = requestQueue[head];   
  46.         head = (head + 1) % requestQueue.length;   
  47.         count--;   
  48.         notifyAll();   
  49.         return request;   
  50.     }   
  51. }  

 

channel类把传给他的请求放入队列中,等待worker去取请求,下面看看worker(即工作线程,线程池中已经初始话好的线程)

Java代码 复制代码
  1. public class WorkerThread extends Thread {   
  2.     private final Channel channel;   
  3.     public WorkerThread(String name, Channel channel) {   
  4.         super(name);   
  5.         this.channel = channel;   
  6.     }   
  7.     public void run() {   
  8.         while (true) {   
  9.             Request request = channel.takeRequest();//取出请求   
  10.             request.execute();//处理请求   
  11.         }   
  12.     }   
  13. }  

 

在工作线程中会从线程池的队列里取出请求,并对请求进行处理。这里的workerthread相当于背景线程,他一直都在运行,当有请求的时候,他就会进行处理,这里处理请求的线程是已经存在在channel(线程池里的线程),他不会因为请求的增加而增加(这是本例中的情况),不会来一个请求就新建立一个线程,节省了资源。

再看看请求的代码:

Java代码 复制代码
  1. public class Request {   
  2.     private final String name; //  委托者   
  3.     private final int number;  // 请求编号   
  4.     private static final Random random = new Random();   
  5.     public Request(String name, int number) {   
  6.         this.name = name;   
  7.         this.number = number;   
  8.     }   
  9.     public void execute() {//执行请求   
  10.         System.out.println(Thread.currentThread().getName() + " executes " + this);   
  11.         try {   
  12.             Thread.sleep(random.nextInt(1000));   
  13.         } catch (InterruptedException e) {   
  14.         }   
  15.     }   
  16.     public String toString() {   
  17.         return "[ Request from " + name + " No." + number + " ]";   
  18.     }   
  19. }  

 参考(多线程四)中所写的 ExecutorService,其就相当于channel,即线程池。至于其实现当然要比channel复杂多了,channel只是举个例子。而WorkerThread可不是工作线程,他相当于发送到channel的请求,也就是request,当执行代码:tpes.execute(workers[i]);时,相当于向线程池加入一个请求,而WorkerThread中的run则相当于request中的execute,这也是当执行tpes.execute(workers[i]);时,并不会产生新的线程的原因。(多线程四)中的写法是让人有些迷糊的。ExecutorService中产生的背景线程(相当于本篇的WorkerThread )我们是看不到的。

 

jdk5中的多线程还有很多需要进一步学习,其实现也反应了多线程的设计模式。本篇的worker模式只是其中的一种。

 

六、

从前面的文章可以看出,jdk1.5为我们提供了很多线程池

这里做一下简要的说明:

类Executors,提供了一些创建线程池的方法

 

newFixedThreadPool(int nThreads)

创建一个可重用固定线程集合的线程池,以共享的无界队列方式来运行这些线程。

如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。

 

newCachedThreadPool()

创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。

对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。

调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。

终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。

注意,可以使用 ThreadPoolExecutor类 构造方法创建具有类似属性但细节不同,(例如超时参数)的线程池。

其实executors的线程池也是基于ThreadPoolExecutor扩展的。

 

newSingleThreadExecutor()

创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。

(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,

一个新线程将代替它执行后续的任务)。

可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。

(与其他等效的 newFixedThreadPool(1) 不同,

可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。)

 

ThreadPoolExecutor

Java代码 复制代码
  1. public ThreadPoolExecutor(int corePoolSize,   
  2.                           int maximumPoolSize,   
  3.                           long keepAliveTime,   
  4.                           TimeUnit unit,   
  5.                           BlockingQueue<Runnable> workQueue,   
  6.                           ThreadFactory threadFactory,   
  7.                           RejectedExecutionHandler handler)  
 
核心和最大池大小
corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize - 池中允许的最大线程数。

ThreadPoolExecutor 将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小。

当新任务在方法 execute(java.lang.Runnable)中提交时,

如果运行的线程少于corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。

如果运行的线程多于corePoolSize 而少于maximumPoolSize,则仅当队列满时才创建新线程。

如果设置的corePoolSize 和maximumPoolSize 相同,则创建了固定大小的线程池。

如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),

则允许池适应任意数量的并发任务。

在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int)

setMaximumPoolSize(int)进行动态更改。

默认情况下,即使核心线程也只是在新任务需要时才创建和启动的。

 

threadFactory - 执行程序创建新线程时使用的工厂 

使用 ThreadFactory创建新线程。

如果没有另外说明,则在同一个ThreadGroup中一律使用Executors.defaultThreadFactory()创建线程,

并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。

通过提供不同的ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。

如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。

 

keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 

(保持活动时间)

unit - keepAliveTime 参数的时间单位。

如果池中当前有多于corePoolSize的线程,则这些多出的线程在空闲时间超过keepAliveTime时将会终止。

这提供了当池处于非活动状态时减少资源消耗的方法。

如果池后来变得更为活动,则可以创建新的线程。

也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit)动态地更改此参数。

使用 Long.MAX_VALUE TimeUnit.NANOSECONDS的值在关闭前有效地从以前的终止状态禁用空闲线程。

 

workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。 

 

所有BlockingQueue都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:

1.如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。

2.如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。

3.如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,

在这种情况下,任务将被拒绝。

 

排队有三种通用策略:

1.直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。

在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。

此策略可以避免在处理可能具有内部依赖性的请求集合时出现锁定。

直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。

当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

 

2.无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue

将导致在所有 corePoolSize 线程都忙的情况下将新任务加入队列。

这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)

当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。

这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

 

3.有界队列。当使用有限的 maximumPoolSizes 时,

有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。

队列大小和最大池大小可能需要相互折衷:

使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,

但是可能导致人工降低吞吐量。

如果任务频繁阻塞(例如,如果它们是 I/O 边界),

则系统可能为超过您许可的更多线程安排时间。

使用小型队列通常要求较大的池大小,CPU 使用率较高,

但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

 

handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序 (被拒绝的任务 )

当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,

在方法 execute(java.lang.Runnable)中提交的新任务将被拒绝

在以上两种情况下,execute 方法都将调用其 RejectedExecutionHandler

Java代码 复制代码
  1. RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,   
  2. java.util.concurrent.ThreadPoolExecutor)  

 

 方法。

下面提供了四种预定义的处理程序策略:

 1.在默认的 ThreadPoolExecutor.AbortPolicy中,处理程序遭到拒绝将抛出运行时RejectedExecutionException

2.在 ThreadPoolExecutor.CallerRunsPolicy中,线程调用运行该任务的 execute 本身。

此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

3. 在 ThreadPoolExecutor.DiscardPolicy中,不能执行的任务将被删除。

4.在 ThreadPoolExecutor.DiscardOldestPolicy中,如果执行程序尚未关闭,

则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。

 

定义和使用其他种类的 RejectedExecutionHandler类也是可能的,但这样做需要非常小心,

尤其是当策略仅用于特定容量或排队策略时。

这些只是jdk中的介绍,要想更好的使用线程池,就需要对这些参数有所了解。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics