目 录CONTENT

文章目录

DelayQueue - 延时阻塞队列

FatFish1
2024-10-24 / 0 评论 / 0 点赞 / 65 阅读 / 0 字 / 正在检测是否收录...

概述

常用于定时任务,如:定时关机。常用方法

  • int compareTo(Delayed o):比较大小,自动升序。比较方法建议和getDelay方法配合完成。如果任务是需要按时完成的计划任务,必须配合getDelay方法完成。

  • long getDelay(TimeUnit unit):获取计划时长的方法,根据参数TimeUnit来决定,如何返回结果值。

delayQueueworkQueue

延迟队列DelayQueue是一个无界阻塞队列,它的队列元素只能在该元素的延迟已经结束或者说过期才能被出队,DelayQueue就是基于PriorityQueue实现的,DelayQueue队列实际上就是将队列元素保存到内部的一个PriorityQueue实例中的(所以也不支持插入null值)。

DelayedWorkQueue也是一种设计为定时任务的延迟队列,它的实现和DelayQueue一样,不过是将优先级队列和DelayQueue的实现过程迁移到本身方法体中,从而可以在该过程当中灵活的加入定时任务特有的方法调用

delayQueue实现延迟任务

实现DelayQueue入参Delayed接口的getDelay方法和compareTo方法,构建延迟任务。delayQueue基于PriorityBlockingQueue实现。

案例

@Slf4j
public class DelayQueueDemo {
    public static void main(String[] args) {
        DelayQueue<SanYouTask> sanYouTaskDelayQueue = new DelayQueue<>();
        new Thread(() -> {
            while (true) {
                try {
                    SanYouTask sanYouTask = sanYouTaskDelayQueue.take();
                    log.info("获取到延迟任务:{}", sanYouTask.getTaskContent());
                } catch (Exception e) {
                }
            }
        }).start();

        log.info("提交延迟任务");
        sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日记5s", 5L));
        sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日记3s", 3L));
        sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日记8s", 8L));
    }
}

@Getter
public class SanYouTask implements Delayed {
    private final String taskContent;
    private final Long triggerTime;
    public SanYouTask(String taskContent, Long delayTime) {
        this.taskContent = taskContent;
        this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
    }
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        return this.triggerTime.compareTo(((SanYouTask) o).triggerTime);
    }
}

queue.offer方法提交任务时会根据compareTo的实现对任务进行排序,最先需要被执行的任务放到队列头;queue.take方法获取任务的时候会拿队列头部的元素,通过getDelay返回值判断任务是否需要立刻执行,如果需要就返回任务,不需要则等待延迟时间到了返回任务。

源码分析

offer - 入队

q.offer(e);
if (q.peek() == e) {
    leader = null;
    available.signal();
}

上锁后执行内部PriorityQueue的offer方法,构造小顶堆。

入队后激活出队的condition

take/poll - 阻塞出队非阻塞出队

for (;;) {
    E first = q.peek();
    if (first == null)
        available.await();
    else {
        long delay = first.getDelay(NANOSECONDS);
        if (delay <= 0L)
            return q.poll();

出队走一个无限自旋,如果没有first元素,证明已经空了,这里阻塞,如果成功拿到了first元素,这里要判断延迟,如果延迟到了,才能出队,脱离自旋

first = null; // don't retain ref while waiting
if (leader != null)
    available.await();
else {
    Thread thisThread = Thread.currentThread();
    leader = thisThread;
    try {
        available.awaitNanos(delay);
    } finally {
        if (leader == thisThread)
            leader = null;
    }

如果没到延迟时间,那就说明第一个元素还没到点,这里把引用解锁掉,是为了防止其他线程先拿到,结果本线程又拿一次出问题

然后判断阻塞Thread是否存在,如果不存在,设置为当前线程,然后进行条件等待,同时释放锁,直到到时间,停止阻塞,重新获取锁

这时判断阻塞线程如果是自己,清空,然后重新尝试获取头节点

应用

TimerScheduledThreadPoolExecutor

Timer和ScheduledThreadPoolExecutor是延迟队列的应用,例如:

@Slf4j
public class ScheduledThreadPoolExecutorDemo {
    public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new ThreadPoolExecutor.CallerRunsPolicy());
        log.info("提交延迟任务");
        executor.schedule(() -> log.info("执行延迟任务"), 5, TimeUnit.SECONDS);
    }
}

ScheduledThreadPoolExecutor构造的时候会传入一个DelayedWorkQueue阻塞队列,所以线程池内部的阻塞队列是DelayedWorkQueue

Timer类似封装了DelayQueue,但是是单线程,且没有对运行时异常进行处理,更推荐用scheduledThreadPoolExecutor,具体分析见ScheduledThreadPoolExecutor部分。

0

评论区