创建一个SingleBlockingQueue同步器

我试图创建一个SingleBlockingQueue<T>同步器,允许一个线程offer()一个元素给它,另一个线程将take()它。 在SingleBlockingQueue<T>中一次只保留一个T元素,如果前一个元素正在等待take线程take() ,则推送线程在offer()上被阻塞。 推送线程将继续推送项目,直到它调用setComplete() ,并且当isComplete()为false时,take线程将继续调用take() 。 如果正在等待某个元素,则线程将被阻塞。

这是我迄今为止的同步器。

 import java.util.concurrent.atomic.AtomicBoolean; public final class SingleBlockingQueue<T> { private volatile T value; private final AtomicBoolean isComplete = new AtomicBoolean(false); private final AtomicBoolean isPresent = new AtomicBoolean(false); public void offer(T value) throws InterruptedException { while (isPresent.get()) { this.wait(); } this.value = value; synchronized(this) { this.notifyAll(); } } public boolean isComplete() { return !isPresent.get() && isComplete.get(); } public void setComplete() { isComplete.set(true); } public T take() throws InterruptedException { while (!isPresent.get()) { this.wait(); } T returnValue = value; isPresent.set(false); synchronized(this) { this.notifyAll(); } return returnValue; } } 

这里是Kotlin的一个使用示例

  val queue = SingleBlockingQueue<Int>() thread { for (i in 1..1000) { queue.offer(i) } queue.setComplete() } thread { while (!queue.isComplete) { println(queue.take()) } } Thread.sleep(100000) 

但是,我正在收到一个错误,在这一点上我有点头痛。 由于RxJava,我很久没有做同步器了。 我究竟做错了什么?

 Exception in thread "Thread-1" java.lang.IllegalMonitorStateException at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at com.swa.rm.common.util.SingleBlockingQueue.take(SingleBlockingQueue.java:29) at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:33) at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:8) at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:18) 

正如其他人指出的那样,您可以使用SynchronousQueue的现有实现。

如果你想实现你自己,你是非常接近,你只需要确保wait()的调用是在synchronized块内。

不幸的是,我相信原始代码中的isComplete() / setComplete()机制会受到竞争条件的影响,因为可能会在isComplete()返回false之前,甚至在读取线程正在执行时调用setComplete() take() 。 这可能会挂起阅读线程。

  public final class SingleBlockingQueue<T> { private final Object lock = new Object(); private T value; private boolean present = false; public void offer(T value) throws InterruptedException { synchronized (lock) { while (present) lock.wait(); this.value = value; present = true; lock.notifyAll(); } } public T take() throws InterruptedException { synchronized (lock) { while (!present) lock.wait(); T returnValue = value; value = null; // Should release reference present = false; lock.notifyAll(); return returnValue; } } } 

为了比较,基于SemaphoreCondition对象实现这种队列可能更自然。 这是一个实现使用一对信号来表示空/满的条件。

  public final class SingleBlockingQueue<T> { private volatile T value; private final Semaphore full = new Semaphore(0); private final Semaphore empty = new Semaphore(1); public void offer(T value) throws InterruptedException { empty.acquire(); this.value = value; full.release(); } public T take() throws InterruptedException { full.acquire(); T returnValue = value; value = null; // Should release reference empty.release(); return returnValue; } } 

你不需要自己实现,你可以使用SynchronousQueue

参考文献:

SynchronousQueue javadoc

http://tutorials.jenkov.com/java-util-concurrent/synchronousqueue.html

SynchronousQueue类实现BlockingQueue接口。 阅读BlockingQueue文本以获取有关该接口的更多信息。

SynchronousQueue是一个内部只能包含一个元素的队列。 一个将元素插入队列的线程被阻塞,直到另一个线程从队列中取得该元素为止。 同样,如果一个线程试图获取一个元素并且当前没有元素存在,那么该线程将被阻塞,直到一个线程插入一个元素到队列中。

只需要注意,由于RxJava-JDBC框架中的next()调用的时机, ResultSet向前跳转出现了一些问题。 我用这个实现去修改以前给出的答案。

 public final class SingleBlockingQueue<T> { private volatile T value; private final Semaphore nextGate = new Semaphore(0); private final Semaphore waitGate = new Semaphore(0); private volatile boolean hasValue = true; private volatile boolean isFirst = true; public void offer(T value) throws InterruptedException { if (isFirst) { nextGate.acquire(); isFirst = false; } this.value = value; waitGate.release(); nextGate.acquire(); } public T take() throws InterruptedException { T returnValue = value; value = null; // Should release reference return returnValue; } public boolean next() throws InterruptedException { nextGate.release(); waitGate.acquire(); return hasValue; } public void setDone() { hasValue = false; waitGate.release(); } } 

这就是我使用它的原因:在Kotlin中把一个RxJava Observable<T>变成一个Sequence<T>

 import com.github.davidmoten.rx.jdbc.QuerySelect import rx.Observable import rx.Scheduler import rx.lang.kotlin.subscribeWith import java.io.Closeable class ObservableIterator<T>( observable: Observable<T> ) : Iterator<T>, Closeable { private val queue = SingleBlockingQueue<T>() private val subscription = observable .subscribeWith { onNext { queue.offer(it) } onCompleted { queue.setDone() } onError { queue.setDone() } } override fun hasNext(): Boolean { return queue.next() } override fun next(): T { return queue.take() } override fun close() { subscription.unsubscribe() queue.setDone() } } fun <T> Observable<T>.asSequence() = ObservableIterator(this).asSequence() fun QuerySelect.Builder.asSequence(scheduler: Scheduler) = get { it } .subscribeOn(scheduler) .asSequence()