老余博客上线了!!!

线程池的使用

热点新闻 老余 10℃ 0评论

线程池的使用 1、线程池的使用场景 等待返回任务的结果的多步骤的处理场景, 批量并发执行任务,总耗时是单个步骤耗时最长的那个,提供整体的执行效率, 最终一致性,异步执行任务,无需等待,快速返回 2、线程池的关键参数说明 一般情况下我们是通过ThreadPoolEx...

线程池的使用

1、线程池的使用场景

  • 等待返回任务的结果的多步骤的处理场景, 批量并发执行任务,总耗时是单个步骤耗时最长的那个,提供整体的执行效率,

  • 最终一致性,异步执行任务,无需等待,快速返回

2、线程池的关键参数说明

一般情况下我们是通过ThreadPoolExecutor来构造我们的线程池对象的。

* 阿里巴巴的开发规范文档是禁止直接使用Executors静态工厂类来创建线程池的,原因是

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端如下:
(1) FixedThreadPool 和 SingleThreadPool :
允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。
(2) CachedThreadPool 和 ScheduledThreadPool :
允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
       
  }

参数说明:

  • corePoolSize:核心线程数,线程池最低的线程数
  • maximumPoolSize:允许的最大的线程数
  • keepAliveTime:当前线程数超过corePoolSize的时候,空闲线程保留的时间
  • unit: keepAliveTime线程保留的时间的单位
  • workQueue: 任务缓冲区
  • threadFactory: 线程的构造工厂
  • handler: 线程池饱含时候的处理策略

3、线程池的分类

Java通过Executors提供四种线程池,分别为:

  • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
3.1、newCachedThreadPool
public static ExecutorService newCachedThreadPool(){
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>());
}

它是一个可以无限扩大的线程池;

  • 它比较适合处理执行时间比较小的任务;
  • corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;

  • keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;

  • 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

3.2、newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
  • 它是一种固定大小的线程池;corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
  • keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;
  • 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
  • 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。
3.3、ScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
  • 定时任务的使用
3.4、SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(){
    return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
  • 它只会创建一条工作线程处理任务;
  • 采用的阻塞队列为LinkedBlockingQueue;
3.5、总结
线程池 特点 建议使用场景
newCachedThreadPool 1、线程数无上限
2、空闲线程存活60s
3、阻塞队列
1、任务执行时间短
2、任务要求响应时间短
newFixedThreadPool 1、线程数固定
2、无界队列
1、任务比较平缓
2、控制最大的线程数
newScheduledThreadPool 核心线程数量固定、非核心线程数量无限制(闲置时马上回收) 执行定时 / 周期性 任务
newSingleThreadExecutor 只有一个核心线程(保证所有任务按照指定顺序在一个线程中执行,不需要处理线程同步的问题) 不适合并发但可能引起IO阻塞性及影响UI线程响应的操作,如数据库操作,文件操作等

4、使用线程池容易出现的问题

现象 原因
整个系统影响缓慢,大部分504 1、为设置最大的线程数,任务积压过多,线程数用尽
oom 1、队列无界或者size设置过大
使用线程池对效率并没有明显的提升 1、线程池的参数设置过小,线程数过小或者队列过小,或者是服务器的cpu核数太低

5、线程池的监控

5.1、为什么要对线程池进行监控

  • 线程池中线程数和队列的类型及长度对线程会造成很大的影响,而且会争夺系统稀有资源,线程数。设置不当,或是没有最大的利用系统资源,提高系统的整体运行效率,或是导致整个系统的故障。典型的场景是线程数被占满,其他的请求无响应。或是任务积压过多,直接oom
  • 方便的排查线程中的故障以及优化线程池的使用

5.2、监控的原理

  • 另起一个定时单线程数的线程池newSingleThreadScheduledExecutor

  • 调用scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)定时执行监控任务;

  • 定时任务内 通过ThreadPoolExecutor对象获取监控的对象信息,比如t线程池需要执行的任务数、线程池在运行过程中已完成的任务数、曾经创建过的最大线程数、线程池里的线程数量、线程池里活跃的线程数量、当前排队线程数

  • 根据预设的日志或报警策略,进行规则控制

5.3、实现的细节

定义线程池并启动监控

 /**
     * 定义线程池的队列的长度
     */
    private final Integer queueSize = 1000;

    /**
     * 定义一个定长的线程池
     */
    private ExecutorService executorService;

    @PostConstruct
    private void initExecutorService() {
        log.info(
                "executorService init with param: threadcount:{} ,queuesize:{}",
                systemConfig.getThreadCount(),
                systemConfig.getThreadQueueSize());
        executorService =
                new ThreadPoolExecutor(
                        systemConfig.getThreadCount(),
                        systemConfig.getThreadCount(),
                        0,
                        TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue(systemConfig.getThreadQueueSize()),
                        new BasicThreadFactory.Builder()
                                .namingPattern("async-sign-thread-%d")
                                .build(),
                        (r, executor) -> log.error("the async executor pool is full!!"));

        /** 启动线程池的监控 */
        ThreadPoolMonitoring threadPoolMonitoring = new ThreadPoolMonitoring();
        threadPoolMonitoring.init();
    }

线程池的监控

/**
     * 功能说明:线程池监控
     *
     * @params
     * @return <br>
     *     修改历史<br>
     *     [2019年06月14日 10:20:10 10:20] 创建方法by fengqingyang
     */
    public class ThreadPoolMonitoring {
        /** 用于周期性监控线程池的运行状态 */
        private final ScheduledExecutorService scheduledExecutorService =
                Executors.newSingleThreadScheduledExecutor(
                        new BasicThreadFactory.Builder()
                                .namingPattern("async thread executor monitor")
                                .build());

        /**
         * 功能说明:自动运行监控
         *
         * @return <br>
         *     修改历史<br>
         *     [2019年06月14日 10:26:51 10:26] 创建方法by fengqingyang
         * @params
         */
        public void init() {
            scheduledExecutorService.scheduleAtFixedRate(
                    () -> {
                        try {
                            ThreadPoolExecutor threadPoolExecutor =
                                    (ThreadPoolExecutor) executorService;
                            /** 线程池需要执行的任务数 */
                            long taskCount = threadPoolExecutor.getTaskCount();
                            /** 线程池在运行过程中已完成的任务数 */
                            long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
                            /** 曾经创建过的最大线程数 */
                            long largestPoolSize = threadPoolExecutor.getLargestPoolSize();
                            /** 线程池里的线程数量 */
                            long poolSize = threadPoolExecutor.getPoolSize();
                            /** 线程池里活跃的线程数量 */
                            long activeCount = threadPoolExecutor.getActiveCount();
                            /** 当前排队线程数 */
                            int queueSize = threadPoolExecutor.getQueue().size();
                            log.info(
                                    "async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}",
                                    taskCount,
                                    completedTaskCount,
                                    largestPoolSize,
                                    poolSize,
                                    activeCount,
                                    queueSize);

                            /** 超过阀值的80%报警 */
                            if (activeCount >= systemConfig.getThreadCount() * 0.8) {
                                log.error(
                                        "async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}",
                                        taskCount,
                                        completedTaskCount,
                                        largestPoolSize,
                                        poolSize,
                                        activeCount,
                                        queueSize);
                                ;
                            }
                        } catch (Exception ex) {
                            log.error("ThreadPoolMonitoring service error,{}", ex.getMessage());
                        }
                    },
                    0,
                    30,
                    TimeUnit.SECONDS);
        }
    }

6、需要注意的事项

  • 线程数要合理设置,一般建议值是核数的2倍。
  • 线程池队列的类型和长度要根据业特性合理设置
  • 不同的业务需要线程池隔离,避免相互影响
  • 未每个线程池增加特有的命名规范以及关键的日志,方便出问题排查和优化

7、后续

更多精彩,敬请关注,

转载请注明:老余博客 » 线程池的使用

读后有收获可以请作者喝咖啡:

喜欢 (0)or分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址