定时任务类ScheduledThreadPoolExecutor和心跳机制 2019-10-02 作者:Hap Tool ScheduledThreadPoolExecutor类并不是什么新出的功能了,在JDK1.5的时候就已经加入进来,开发他的正式并发大神Doug Lea。大概三四年前有整体学习过Java并发的源码。这次在给公司做分布式心跳检测系统的时候,花了点时间重新读了一遍。我很庆幸自己这么做了,虽然并不是完整的读完了,但是真的受益良多。首先看下ScheduledThreadPoolExecutor的类图: 类图并不复杂,Executor接口定义了execute方法执行Runnable的任务。 ExecutorService在他的基础上扩展了任务停止(shutdown, shutdownNow),状态判断(isShutdown, isTerminated, awaitTermination),任务提交(submit)和批量任务提交(invokeAll, invokeAny)。 ScheduledExecutorService接口则定义了我们一般常用的方法任务延迟执行(schedule),定时任务(scheduleAtFixedRate, scheduleWithFixedDelay),我重点要说的也就是schedule和scheduleWithFixedDelay。至于scheduleAtFixedRate,和scheduleWithFixedDelay只是在任务重复执行的方式上些许不同,不细表述,可以很容易的查到。 先看个简单的schedule方法使用的例子: package com.henry.concurrent; import java.util.Date; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ScheduledUnit implements Runnable { private ScheduledExecutorService scheduledExecutorService; private void showTime() { scheduledExecutorService = new ScheduledThreadPoolExecutor(1); System.out.println(Thread.currentThread().getName() + ": " + new Date()); scheduledExecutorService.schedule(new ScheduledUnit(), 10, TimeUnit.SECONDS); scheduledExecutorService.shutdown(); } @Override public void run() { System.out.println(Thread.currentThread().getName() + ": " + new Date()); } public static void main(String[] args) { new ScheduledUnit().showTime(); } } 那么我们再看下运行结果: 线程池中的线程刚好延迟了10s。 好的,那么我们再看下scheduleWithFixedDelay方法,方法的使用也非常简单,只比schedule多了一个delay参数,表示上一次线程和本次线程运行的时间间隔。稍微修改下schedule代码如下: scheduledExecutorService.scheduleWithFixedDelay(new ScheduledUnit(), 10, 2, TimeUnit.SECONDS); 还有记得将下面代码注释调用,否则线程池的单个线程结束后,主线程就结束了。 scheduledExecutorService.shutdown(); 运行的结果如下: 结果也完全符合我们之前的设想,第一次线程池运行时间延迟了10s,后面的每一次都延迟2s。那么如果线程池运行的时间比较长呢,甚至超过了2s,那么会是什么样的情况。可以在线程池里面增加一个线程延迟代码。 @Override public void run() { try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ": " + new Date()); } 我们再看下结果,在之前的结果之上,时间间隔都增加了5s,也就是说我们在scheduleWithFixedDelay方法中定义的2s的间隔是上个线程的结束时间和下个线程的开始时间。 那么我们分析一种情况,如果线程执行很长时间呢,是不是下一个线程始终无法启动。所以在目前开源的心跳代码中一般都不直接使用scheduleWithFixedDelay方法。之前有聊过Eureka的心跳机制,可以看看。 简单写个自己的心跳,代码如如下: package com.henry.concurrent; import java.util.Date; import java.util.concurrent.*; public class HeartBeatUnit implements Runnable { private ScheduledExecutorService scheduler; private ExecutorService executor; private Runnable task; public HeartBeatUnit() { scheduler = new ScheduledThreadPoolExecutor(1); executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); task = new HeartBeatThread(); } @Override public void run() { Future future = null; try { future = executor.submit(task); future.get(2, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } finally { if(future != null) { future.cancel(true); } scheduler.schedule(this, 10, TimeUnit.SECONDS); } } public static void main(String[] args) { new HeartBeatUnit().run(); } } class HeartBeatThread implements Runnable { @Override public void run() { try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + " - " + Thread.currentThread().getName() + ": take a beat......."); } } 在HeartBeatThread的run方法中,我加了一个5s的睡眠时间,但是future里面我配置的是2s即超时了。看下运行结果: 这里非常明显的超时异常,每个心跳的打印的时间之间的间隔也为12s,没有按照心跳线程里面设置的5s睡眠时间来超时。这样在心跳代码中,不会因为对方应用长时间的不响应,而需要己方不停的等待。