无情 @ Wed Aug 17 14:26:33 CST 2016 阅读(2279)
concurrent Executor



一:线程池的用途


平时的业务中,如果要使用多线程,那么我们会在业务开始前创建线程,业务结束后,销毁线程。但是对于业务来说,线程的创建和销毁是与业务本身无关的,只关心线程所执行的任务。因此希望把尽可能多的cpu用在执行任务上面,而不是用在与业务无关的线程创建和销毁上面。而线程池则解决了这个问题,线程池的作用就是将线程进行复用。


二:java并发包提供的线程池


1:线程池的类(Executors类)


类图

newFixedThreadPool 固定数量的线程池,线程池中的线程数量是固定的,不会改变。

newSingleThreadExecutor 单一线程池,线程池中只有一个线程。执行完成一个再执行另外一个,其实有点把多线程变成了单线程

newCachedThreadPool 缓存线程池,线程池中的线程数量不固定,会根据需求的大小进行改变。它的线程回自动释放默认时间是60s

newScheduledThreadPool 计划任务调度的线程池,用于执行计划任务,比如每隔5分钟怎么样,

newWorkStealingPool   创建一个 work-stealing 线程池,使用目前机器上可用的处理器作为它的并行级别。  jdk1.8新增的


public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
}
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
}
 public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

    

    

    

ThreadPoolExecutor 构造函数中参数的含义。

corePoolSize  线程池中核心线程数的数目

maximumPoolSize 线程池中最多能容纳多少个线程

keepAliveTime 当现在线程数目大于corePoolSize时,超过keepAliveTime时间后,多出corePoolSize的那些线程将被终结。

unit keepAliveTime的单位

workQueue 当任务数量很大,线程池中线程无法满足时,提交的任务会被放到阻塞队列中,线程空闲下来则会不断从阻塞队列中取数据。


FixedThreadPool,它的线程的核心数目和最大容纳数目都是一样的,以至于在工作期间,并不会创建和销毁线程。当任务数量很大,线程池中的线程无法满足时,任务将被保存到LinkedBlockingQueue中,而LinkedBlockingQueue的大小是Integer.MAX_VALUE。这就意味着,任务不断地添加,会使内存消耗越来越大。


CachedThreadPool,它的核心线程数量是0,最大容纳数目是Integer.MAX_VALUE,它的阻塞队列是SynchronousQueue,这是一个特别的队列,它的大小是0。由于核心线程数量是0,所以必然要将任务添加到SynchronousQueue中,这个队列只有一个线程在从中添加数据,同时另一个线程在从中获取数据时,才能成功。独自往这个队列中添加数据会返回失败。当返回失败时,则线程池开始扩展线程,这就是为什么CachedThreadPool的线程数目是不固定的。当60s该线程仍未被使用时,线程则被销毁。


相关演示代码

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecutorsDome {
    public static void main(String[] args) {
        //ExecutorService executorService = Executors.newCachedThreadPool();//默认是60s 的线程时间
       ExecutorService executorService = Executors.newFixedThreadPool(10);//固定数量的线程池,线程池中的线程数量是固定的,不会改变。
       // ExecutorService executorService = Executors.newSingleThreadExecutor();//线程池中只有一个线程,执行完成一个在执行另外一个,其实有点把多线程变成了单线程
        //ExecutorService executorService =Executors.newScheduledThreadPool(6);
      // ExecutorService executorService=Executors.newWorkStealingPool();
        executorService.execute(new ThreadPools(1));
        executorService.execute(new ThreadPools(2));
        executorService.execute(new ThreadPools(3));
        executorService.execute(new ThreadPools(4));
        executorService.execute(new ThreadPools(5));
        executorService.submit(new ThreadPools(6));
    }
}
class ThreadPools implements Runnable {
    private int i=0;
    
    public ThreadPools(int i){
        this.i=i;
    }
    public void run() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            
            e.printStackTrace();
        }
        System.out.println("ThreadPools:"+i);
    }
}

==============================

定时的demo

public class ScheduledExecutorServiceDemo {
    
    public static void main(String[] args) throws Exception, ExecutionException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);  
        ScheduledFuture scheduledFuture = executorService.schedule(new Callable() {  
            public String call() throws Exception {  
                return "call";  
            }  
        }, 10, TimeUnit.SECONDS);  
        System.out.println(scheduledFuture.get());  
        executorService.shutdown(); 
    }
}


结果是10s后返回"call"的结果



2:拒绝(饱和)策略


ThreadPoolExecutor的构造函数里面有一个参数是RejectedExecutionHandler,它是一个接口实现类有:AbortPolicy,CallerRunsPolicy,DiscardOldestPolicy,DiscardPolicy,这这个分别代表了4种策略



AbortPolicy:如果不能接受任务了,则抛出异常。默认拒绝策略是它


CallerRunsPolicy:如果不能接受任务了,则让调用的线程去完成。


DiscardOldestPolicy:如果不能接受任务了,则丢弃最老的一个任务,由一个队列来维护。


DiscardPolicy:如果不能接受任务了,则丢弃任务。



当然我们也可以自己实现RejectedExecutionHandler接口来自己定义拒绝策略。



3:ForkJoin


fork/join(jdk1.7新增)框架是ExecutorService接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在于能够使用所有可用的运算能力来提升你的应用的性能。类似于ExecutorService接口的其他实现,fork/join框架会将任务分发给线程池中的工作线程。fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。fork/join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务。

他的思想可以用一张图来了解


演示代码

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
//1*1+2*2+3*3....100*100
public class ForkJoinPoolDemo {
    public static void main(String[] args) {
        MyTask mt = new MyTask(1);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Future result = forkJoinPool.submit(mt);
        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        forkJoinPool.shutdown();
        
        A.a();
    }
}
//ForkJoin算法
class MyTask extends RecursiveTask {
    int i;
    public MyTask(int i){
        this.i = i;
    }
    @Override
    protected Integer compute() {
        if (i >= 100) {
            return i * i;
        }
        MyTask newTask2 = new MyTask(i + 1);
        newTask2.fork();
        return i * i + newTask2.join();
    }
}
//普通算法
class A { 
    public static void a() {
        int j=0;
        for (int i = 1; i <= 100; i++) {  
            j=j+(i*i);
            
        }
        System.out.println(j);
    }
}


以上两种算法的结果是一样的都是 338350。更深入的了解请网上超资料。