计算机系统应用教程网站

网站首页 > 技术文章 正文

阻塞列队详解!让你轻松理解阻塞列队

btikc 2024-10-25 10:51:41 技术文章 11 ℃ 0 评论

很多线程问题可以使用一个或多个队列以优雅而安全的方式来解决。生产者线程向队列插入元素,消费者线程则获取元素。使用队列,可以安全地从一个线程向另一个线程传递数据。例如,考虑银行转账程序,转账线程可以将转账指令对象插入一个队列,而不是直接访问银行对象。另一个线程从队列中取出指令并完成转账。只有这个线程可以访问银行对象的内部。因此不需要同步。(当然,线程安全的队列类的实现者必须考虑锁和条件,但那是他们的问题,而不是你要考虑的问题。)

当试图向队却添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻寒队列(blocking queue)将导致线程阻塞。在协调多个线程的工作时,阻塞队列是一个有用的工具。工作线程可以周期性地将中间结果存储在阻塞队列中。其他工作线程移除中间结果,并进一步修改。队列会自动地平衡负载。如果第一组线程运行得比第二组慢,第二组在等待结果时会阻塞。如果第一组线程运行得更快,队列会填满,直到第二组赶上来。表12-1给出了阻塞队列的方法。

阻塞队列方法分为以下3类,它们的区别在于当队列满或空时它们完成的动作。如果使用队列作为线程管理工具,要用到put和take方法。试图向满队列添加元素或者想从空队列得到队头元素时,add、remove 和element 操作会抛出异常。当然,在一个多线程程序中,队列可能会在任何时候变空或变满,因此,你可能更想使用offer、poll和peek方法。如果不能完成任务,这些方法只是返回一个错误提示而不会抛出异常

注释:poll和 peek方法返回null来指示失败。因此,向这些队列中插入null值是非

法的。

还有带有超时时间的offer方法和poll方法。例如,下面的调用:

boolean success = g.offer(x, 100, TimeUnit.MILLISECONDS) ;

尝试在100毫秒时间内在队尾插入一个元素。如果成功返回true;否则,如果超时,则返回false。类似地,下面的调用:

Object head = q.poll(100, TimeUnit.MILLISECONDS) ;

尝试在100毫秒时间内移除队头元素;如果成功返回队头元素,否则,如果超时,则返回null。

如果队列满,则 put方法阻塞;如果队列空,则 take方法阻塞。它们与不带超时参数的offer和poll方法等效。

java.util.concurrent包提供了阻塞队列的几个变体。默认情况下,LinkedBlockingQueue 的容量没有上界,但是,也可以选择指定一个最大容量。LinkedBlockingDeque是一个双端队列。ArrayBLockinaQueue在构造时需要指定容量,另外可以有一个可选的参数来指定是否需要公平性。若指定了公平性,那么等待了最长时间的线程会优先得到处理。与以往一样,公平性会降低性能,应当在确实非常需要时才使用公平性参数。

PriorityBlockingQueue是一个优先队列,而不是先进先出队列。元素按照它们的优先级顺序移除。这个队列没有容量上限,但是,如果队列是空的,获取元素的操作会阻塞。

DelayQueue包含实现了Delayed接口的对象

interface Delayed extends Comparable<Delayed>

{

long getDelay(TimeUnit unit);

}

getDelay方法返回对象的剩余延迟。负值表示延迟已经结束。元素只有在延迟结束的情况下才能从DelayQueue移除。还需要实现compareTo方法。DelayQueue使用这个方法对元素排序。Java 7增加了一个 TransferQueue 接口,允许生产者线程等待,直到消费者准备就绪可以接收元素。如果生产者调用

q.transfer(item);

这个调用会阻塞,直到另一个线程将元素删除。LinkedTransferQueue类实现了这个接口。

生产者线程枚举所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有上限的话,很快就会包含文件系统中的所有文件。

我们同时启动了大量搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印包含指定关键字的所有行,然后取出下一个文件。我们使用了一个小技巧,从而在没有更多工作时终止这个应用。为了发出完成信号,枚举线程会在队列中放置一个虚拟对象(这就像在行李传送带上放一个标着“last bag”的虚拟行李箱)。当搜索线程取到这个虚拟对象时,将其放回并终止。

注意,这里不需要显式的线程同步。在这个应用中,我们使用了队列数据结构作为一种同步机制。

java.util.concurrent.ArrayBlockingQueue<E> 5

ArrayBlockingQueue(int capacity)

? ArrayBlockingQueue(int capacity, boolean fair)

用指定的容量和公平性设置构造一个阻塞队列。队列实现为一个循环数组。r java.util.concurrent.LinkedBlockingQueue<E> 5

java.util.concurrent.LinkedBlockingDeque<E> 6

LinkedBlockingQueue()

LinkedBlockingDeque()

构造一个无上限的阻塞队列或双向队列,实现为一个链表。

LinkedBlockingQueue(int capacity)

LinkedBlockingDeque(int capacity)

根据指定容量构建一个有上限的阻塞队列或双向队列,实现为一个链表。

Am java.util.concurrent.DelayQueue<E extends Delayed> 5

DelayQueue()

构造一个包含Delayed元素的无上限阻塞队列。只有那些延迟结束的元素可以从队列中移除。

Ajava.util,concurrent.Delayed 5

long getDelay(TimeUnit unit)

得到该对象的延迟,用给定的时间单位度量。

Ajava.util.concurrent.PriorityBlockingQueue<E> 5

PriorityBlockingQueue()

? PriorityBlockingQueue(int initialCapacity)

? PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)

构造一个无上限阻塞优先队列,实现为一个堆。优先队列的默认初始容量为11。如果

没有指定比较器,则元素必须实现Comparable接口。

Ajava.util.concurrent.BlockingQueue<E> 5

void put(E element)

添加元素,在必要时阻塞。

E take()

移除并返回队头元素,必要时阻塞。

boolean offer(E element, long time, TimeUnit unit)

添加给定的元素,如果成功返回true,必要时阻塞,直至元素已经添加或者时间已到。

E poll(long time, TimeUnit unit)

移除并返回队头元素,必要时阻塞,直至元素可用或时间已到。失败时返回null。

Ajava.util.concurrent.BlockingDeque<E> 6

? void putFirst(E element)

? void putLast(E element)

添加元素,必要时阻塞。

E takeFirst()

E takeLast()

移除并返回队头或队尾元素,必要时阻塞。

? boolean offerFirst(E element, long time, TimeUnit unit)

? boolean offerLast(E element, long time, TimeUnit unit)

添加给定的元素,成功时返回true,必要时阻塞,直至元素已经添加或时间已到。? E pollFirst(long time, TimeUnit unit)

?E pollLast(long time, TimeUnit unit)

移除并返回队头或队尾元素,必要时阻塞,直至元素可用或时间已到。失败时返回null。

APIjava.util.concurrent.TransferQueue<E> 7

void transfer(E element)

? boolean tryTransfer(E element, long time, TimeUnit unit)

传输一个值,或者尝试在给定的超时时间内传输这个值,这个调用将阻塞,直到另一

个线程将元素删除。第二个方法会在调用成功时返回true。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表