BoundedPriorityBlockingQueue – 线程安全,阻塞和有界?
Java中有一个优先级队列,就像LinkedBlockingQueue一样吗?
PriorityBlockingQueue没有被阻塞,因为它是无界的。
您可以尝试谷歌Guava的 MinMaxPriorityQueue ,并将最大尺寸设置为下一个:
Queue users = Queues.synchronizedQueue( MinMaxPriorityQueue.orderedBy(userComparator) .maximumSize(1000) .create() );
注意:因为MinMaxPriorityQueue不是线程安全的,所以你需要使用装饰器Queues.synchronizedQueue(Queue)来允许线程安全。
因为你需要一个BlockingQueue
你将不得不自己实现这个装饰器,这是不难实现的。
这是它应该是这样的:
public class SynchronizedBlockingQueue implements BlockingQueue { private final BlockingQueue queue; public SynchronizedBlockingQueue(BlockingQueue queue) { this.queue = queue; } @Override public synchronized boolean add(final Object o) { return this.queue.add(o); } @Override public synchronized boolean offer(final Object o) { return this.offer(o); } ... }
那么创建BlockingQueue
的代码将是:
BlockingQueue users = new SynchronizedBlockingQueue( MinMaxPriorityQueue.orderedBy(userComparator) .maximumSize(1000) .create() );
如果你不需要一个完整的BlockingQueue
接口实现,那么你可以使用Semaphore
和类似的东西(在Kotlin中):
interface BlockingBag { @Throws(InterruptedException::class) fun put(element: E) @Throws(InterruptedException::class) fun take(): E } class BlockingPriorityBag (val capacity: Int) : BlockingBag { init { require(capacity >= 1) { "$capacity must be 1 or greater" } } private val queue = PriorityBlockingQueue () private val semaphore = Semaphore(capacity) override fun take(): E { val item = queue.take() semaphore.release() return item } override fun put(element: E) { semaphore.acquire() queue.put(element) } }