BoundedPriorityBlockingQueue – 线程安全,阻塞和有界?

Java中有一个优先级队列,就像LinkedBlockingQueue一样吗?

PriorityBlockingQueue没有被阻塞,因为它是无界的。

您可以尝试谷歌Guava的 MinMaxPriorityQueue ,并将最大尺寸设置为下一个:

 Queue users = Queues.synchronizedQueue( MinMaxPriorityQueue.orderedBy(userComparator) .maximumSize(1000) .create() ); 

注意:因为MinMaxPriorityQueue不是线程安全的,所以你需要使用装饰器Queues.synchronizedQueue(Queue)来允许线程安全。

因为你需要一个BlockingQueue你将不得不自己实现这个装饰器,这是不难实现的。

这是它应该是这样的:

 public class SynchronizedBlockingQueue implements BlockingQueue { private final BlockingQueue queue; public SynchronizedBlockingQueue(BlockingQueue queue) { this.queue = queue; } @Override public synchronized boolean add(final Object o) { return this.queue.add(o); } @Override public synchronized boolean offer(final Object o) { return this.offer(o); } ... } 

那么创建BlockingQueue的代码将是:

 BlockingQueue users = new SynchronizedBlockingQueue( MinMaxPriorityQueue.orderedBy(userComparator) .maximumSize(1000) .create() ); 

如果你不需要一个完整的BlockingQueue接口实现,那么你可以使用Semaphore和类似的东西(在Kotlin中):

 interface BlockingBag { @Throws(InterruptedException::class) fun put(element: E) @Throws(InterruptedException::class) fun take(): E } class BlockingPriorityBag(val capacity: Int) : BlockingBag { init { require(capacity >= 1) { "$capacity must be 1 or greater" } } private val queue = PriorityBlockingQueue() private val semaphore = Semaphore(capacity) override fun take(): E { val item = queue.take() semaphore.release() return item } override fun put(element: E) { semaphore.acquire() queue.put(element) } } 
Interesting Posts