Java自带消息队列Queue的使用教程详细讲解

2023-05-20 队列 自带 讲解

阻塞队列和非阻塞队列

非阻塞队列

  • ConcurrentLinkedQueue

单向链表结构的无界并发队列, 非阻塞队列,由CAS实现线程安全,内部基于节点实现

  • ConcurrentLinkedDeque

双向链表结构的无界并发队列, 非阻塞队列,由CAS实现线程安全

  • PriorityQueue

内部基于数组实现,线程不安全的队列

阻塞队列

  • DelayQueue

一个支持延时获取元素的无界阻塞队列

  • LinkedTransferQueue

一个由链表结构组成的无界阻塞队列。

  • ArrayBlockingQueue

有界队列,阻塞式,初始化时必须指定队列大小,且不可改变;,底层由数组实现;

  • SynchronousQueue

最多只能存储一个元素,每一个put操作必须等待一个take操作,否则不能继续添加元素

  • PriorityBlockingQueue

一个带优先级的队列,而不是先进先出队列。元素按优先级顺序被移除,而且它也是无界的,也就是没有容量上限,虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError 错误;

以ArrayBlockingQueue为例实现阻塞队列,非阻塞队列这里就不说了,方法都是差不多的。

抛出异常

适用场景极少。因为程序就是要稳定的运行,尽量不要抛出异常。

private static final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
	//抛出异常
    @Test
    public void test(){
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove()); //移除队列当中的元素,如果有则返回移除的元素,没有则抛出异常:java.util.NoSuchElementException。
//        System.out.println(blockingQueue.element()); //获取队列当中的元素,如果有则返回,没有则抛出异常:java.util.NoSuchElementException。
//        System.out.println(blockingQueue.add("d")); //当队列已满时,往队列里添加元素,则会抛出异常:Queue full。
    }

特殊值

新消息添加进队列时,如果当前队列已满,则会被直接抛弃,导致消息丢失。

private static final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
	//特殊值
    @Test
    public void test2(){
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.offer("d")); //往队列里面添加元素,超出队列长度则返回false。
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll()); //移除队列里面的元素,如果有元素,则返回元素,没有则返回null。
    }

阻塞

消息处理过快会一直阻塞,这个没什么问题。但是如果消息处理过慢,则会消息大量积压,导致cpu占用过高程序崩溃。

private static final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
	//阻塞
    @Test
    public void test3() throws InterruptedException {
        blockingQueue.put("a");
        blockingQueue.put("b");
        blockingQueue.put("c");
//        blockingQueue.put("d"); //如果队列已经满了,则一直阻塞,直到队列有位置。
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take()); //如果队列没有元素,则一直阻塞,直到队列有元素。
    }

超时

如果队列已满,则在指定时间之后再次尝试往队列里面设置元素,设置成功返回true,失败返回false。这相比较上面的来说会好很多,相当于有2次机会往队列里面设置元素。移除元素也是同理。

private static final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
	//超时
    @Test
    public void test4() throws InterruptedException {
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.offer("d",3, TimeUnit.SECONDS)); //往队列里面添加元素,如果队列已满,则等待指定时间再次往队列里面设置元素,设置成功返回true,失败返回false。
        System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS)); //移除队列里面的元素,如果队列里面有元素,则返回元素,没有则等待指定时间再次移除队列里面的元素,如果有元素则返回元素,没有则返回null
    }

总结

使用队列时应该考虑队列的作用是一边积压消息,一边处理消息,而且要保证队列不会造成消息大量积压。所以我们可以让队列不停的进行设置元素,然后写个定时任务,按照指定的时间,不停的消费队列里面的全部元素。这个指定时间要根据业务量去设置,如果业务量太少,时间间隔可以设置的长一点。业务量大设置的短一点,如:1s执行一次,一次全部取完队列里的元素。这种情况就不建议使用固定长度的消息队列,而应该使用自动扩容的消息队列,因为你无法直到消息队列的具体长度是多少。

到此这篇关于Java自带消息队列Queue的使用教程详细讲解的文章就介绍到这了,更多相关Java自带消息队列Queue内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

相关文章