最近看了线程池的部分源码,所以进行一个简单总结。首先线程池有四种,固定大小的、缓存的、单线程的和有调度的这四种。其中前三种都是返回一个ThreadPoolExecutor对象,这个对象与ExecutorService和Executor的关系如图所示。对于这个类的构造函数,参数含义如下:
int corePoolSize, //核心线程数int maximumPoolSize, //最大线程数long keepAliveTime, //存活时间TimeUnit unit, //时间的单位BlockingQueueworkQueue, //阻塞队列ThreadFactory threadFactory, //制造线程池线程的工厂RejectedExecutionHandler handler //拒绝策略处理者
还有几个重要变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //在运行时动态变化,表示当前线程池的情况(二进制下前3位是线程池状态,后29位是线程池中的线程数)private static final int COUNT_BITS = Integer.SIZE - 3; //表示后29位是线程数private static final int CAPACITY = (1 << COUNT_BITS) - 1; //线程池大小,2^29 - 1private static final int RUNNING = -1 << COUNT_BITS; //运行private static final int SHUTDOWN = 0 << COUNT_BITS; //调用shutdown()方法进入这个状态,之后进入TIDYING,最后到TERMINATED
private static final int STOP = 1 << COUNT_BITS; //调用shutdownNow()方法进入这个状态,之后进入TIDYING,最后到TERMINATED
private static final int TIDYING = 2 << COUNT_BITS; //在线程池关闭(调用shutdown()或shutdownNow()) 时到这个状态,之后到TERMINATED
private static final int TERMINATED = 3 << COUNT_BITS; //线程池已关闭
00011111111111111111111111111111 536870911 CAPACITY (1 << COUNT_BITS) - 1
11100000000000000000000000000000 -536870912 RUNNING -1 << COUNT_BITS00000000000000000000000000000000 0 SHUTDOWN 0 << COUNT_BITS00100000000000000000000000000000 536870912 STOP 1 << COUNT_BITS01000000000000000000000000000000 1073741824 TIDYING 2 << COUNT_BITS01100000000000000000000000000000 1610612736 TERMINATED 3 << COUNT_BITS之后是execute()方法,对于这个方法,按照以下步骤
首先根据workerCountOf(c)获取线程池中线程数。
1.如果线程数小于核心线程数,则创建并启动一个新线程并把当前任务作为初始任务来执行。2.如果线程数大于等于核心线程数并且阻塞队列未满,则加入阻塞队列。3.如果线程数大于等于核心线程数并且阻塞队列已满并且线程数小于最大线程数,则创建并启动一个新线程并把当前任务作为初始任务来执行。4.如果线程数大于等于最大线程数,执行拒绝策略。如果使用shutdown()或shutdownNow()进入SHUTDOWN/STOP/TIDYING/TERMINATED这四个状态之一,再次执行execute()时直接执行拒绝策略。
对于addWorker()方法,根据第二个参数决定在比较当前线程数是否超过限制时是跟核心线程数还是最大线程数比较,如果第二个参数为true,和核心线程数比较,为false和最大线程数比较。第一个参数是要执行的任务,作为新创建线程的第一个任务。
使用Worker类对线程池线程进行了封装,运行时会从阻塞队列中取任务执行,当取不到任务时停止运行。
测试程序如下:
public class ThreadPoolTest { public static void main(String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2)); for (int i = 0; i < 10; i++) { final int index = i; threadPool.execute(new Runnable() { public void run() { System.out.println(Thread.currentThread().getName() + " start!" + " index=" + index); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " end!" + " index=" + index); } }); } }}
结果: