Loading...

定时任务类ScheduledThreadPoolExecutor和心跳机制

2019-10-02

作者:Hap Tool

ScheduledThreadPoolExecutor类并不是什么新出的功能了,在JDK1.5的时候就已经加入进来,开发他的正式并发大神Doug Lea。大概三四年前有整体学习过Java并发的源码。这次在给公司做分布式心跳检测系统的时候,花了点时间重新读了一遍。我很庆幸自己这么做了,虽然并不是完整的读完了,但是真的受益良多。

首先看下ScheduledThreadPoolExecutor的类图:


类图并不复杂,Executor接口定义了execute方法执行Runnable的任务。

ExecutorService在他的基础上扩展了任务停止(shutdown, shutdownNow),状态判断(isShutdown, isTerminated, awaitTermination),任务提交(submit)和批量任务提交(invokeAll, invokeAny)。

ScheduledExecutorService接口则定义了我们一般常用的方法任务延迟执行(schedule),定时任务(scheduleAtFixedRate, scheduleWithFixedDelay),我重点要说的也就是schedule和scheduleWithFixedDelay。至于scheduleAtFixedRate,和scheduleWithFixedDelay只是在任务重复执行的方式上些许不同,不细表述,可以很容易的查到。

先看个简单的schedule方法使用的例子:

package com.henry.concurrent;

import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ScheduledUnit implements Runnable {

    private ScheduledExecutorService scheduledExecutorService;

    private void showTime() {
        scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
        System.out.println(Thread.currentThread().getName() + ": " + new Date());
        scheduledExecutorService.schedule(new ScheduledUnit(), 10, TimeUnit.SECONDS);
        scheduledExecutorService.shutdown();
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ": " + new Date());
    }

    public static void main(String[] args) {
        new ScheduledUnit().showTime();
    }

}

那么我们再看下运行结果:


线程池中的线程刚好延迟了10s。

好的,那么我们再看下scheduleWithFixedDelay方法,方法的使用也非常简单,只比schedule多了一个delay参数,表示上一次线程和本次线程运行的时间间隔。稍微修改下schedule代码如下:

scheduledExecutorService.scheduleWithFixedDelay(new ScheduledUnit(), 10, 2, TimeUnit.SECONDS);

还有记得将下面代码注释调用,否则线程池的单个线程结束后,主线程就结束了。

scheduledExecutorService.shutdown();

运行的结果如下:



结果也完全符合我们之前的设想,第一次线程池运行时间延迟了10s,后面的每一次都延迟2s。那么如果线程池运行的时间比较长呢,甚至超过了2s,那么会是什么样的情况。可以在线程池里面增加一个线程延迟代码。

    @Override
    public void run() {
        try {
            Thread.sleep(5 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + ": " + new Date());
    }

我们再看下结果,在之前的结果之上,时间间隔都增加了5s,也就是说我们在scheduleWithFixedDelay方法中定义的2s的间隔是上个线程的结束时间和下个线程的开始时间。


那么我们分析一种情况,如果线程执行很长时间呢,是不是下一个线程始终无法启动。所以在目前开源的心跳代码中一般都不直接使用scheduleWithFixedDelay方法。之前有聊过Eureka的心跳机制,可以看看。

简单写个自己的心跳,代码如如下:

package com.henry.concurrent;

import java.util.Date;
import java.util.concurrent.*;

public class HeartBeatUnit implements Runnable {

    private ScheduledExecutorService scheduler;

    private ExecutorService executor;

    private Runnable task;

    public HeartBeatUnit() {
        scheduler = new ScheduledThreadPoolExecutor(1);
        executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        task = new HeartBeatThread();
    }

    @Override
    public void run() {
        Future future = null;
        try {
            future = executor.submit(task);
            future.get(2, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(future != null) {
                future.cancel(true);
            }
            scheduler.schedule(this, 10, TimeUnit.SECONDS);
        }
    }

    public static void main(String[] args) {
        new HeartBeatUnit().run();
    }
}

class HeartBeatThread implements Runnable {

    @Override
    public void run() {
        try {
            Thread.sleep(5 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(new Date() + " - " + Thread.currentThread().getName() + ": take a beat.......");
    }
}

在HeartBeatThread的run方法中,我加了一个5s的睡眠时间,但是future里面我配置的是2s即超时了。看下运行结果:


这里非常明显的超时异常,每个心跳的打印的时间之间的间隔也为12s,没有按照心跳线程里面设置的5s睡眠时间来超时。这样在心跳代码中,不会因为对方应用长时间的不响应,而需要己方不停的等待。