协程的行为是否正确?

我开始kotlin,不知道我已经正确实施了kotlin协程的生产者和消费者模式

@RestrictsSuspension interface Producer<in T> { suspend fun yield(value: T); } fun <T> produce(context: CoroutineContext = EmptyCoroutineContext, building: suspend Producer<T>.() -> Unit): Supplier<T> { val (NOT_READY, READY, DONE) = arrayOf(-1, 2, 3); val producer = object : Producer<T>, Continuation<Unit>, Supplier<T> { var value: T? = null; var step: Continuation<Unit>? = null; var state: Int = NOT_READY; override fun get(): T { when (state) { READY -> return pop(); DONE -> throw NoSuchElementException(); } val step = step!!; this.step = null; step.resume(Unit); return get(); } private fun pop(): T { state = NOT_READY; val it = value as T; value = null; return it; } override fun resume(value: Unit) { state = DONE; } override suspend fun yield(value: T) { this.state = READY; this.value = value; return suspendCoroutine<Unit> { step = it; }; } override fun resumeWithException(exception: Throwable) { throw exception; } override val context = EmptyCoroutineContext; }; producer.step = building.createCoroutine(producer, producer).run { return@run context[Async]?.start(this) ?: this; }; return producer; } 

那么我可以使用生产者和消费者模式如下:

 var steps = ArrayBlockingQueue<Int>(1); fun <T> ArrayBlockingQueue<T>.await() = this.poll(100, MILLISECONDS); val it = produce { steps.add(1); yield("foo"); steps.add(2); }; steps.await();//return null; it.get();//return "foo" steps.await();//return 1; it.get();//throws NoSuchElementException steps.await();//return 2; 

那么我只写一个Async上下文来resume Continuation 。 我把其余的resume发给kotlin系统。

  interface Async : CoroutineContext.Element { companion object Key : CoroutineContext.Key<Async>; fun <T> initialize(completion: Continuation<T>): Continuation<T>; fun start(completion: Continuation<Unit>): Continuation<Unit> { return initialize(completion).apply { resume(Unit); }; } } open class ThreadPoolCoroutineContext : AbstractCoroutineContextElement(Async), Async { override fun <T> initialize(completion: Continuation<T>): Continuation<T> { val pool by lazy(ForkJoinPool::commonPool); return object : Continuation<T> { override val context: CoroutineContext = EmptyCoroutineContext; var task: Future<*>? = null; override fun resume(value: T) { dispatcher(completion::resume).invoke(value); } override fun resumeWithException(exception: Throwable) { dispatcher(completion::resumeWithException).invoke(exception); } private fun <T> dispatcher(resume: (T) -> Unit): (T) -> Unit { when (task) { null -> return { value -> task = pool.submit { resume(value) }; } else -> return { task!!.get(); } } } }; } } object CommonPool : ThreadPoolCoroutineContext(); 

那么我可以在第一次使用CommonPool启动一个协程:

 val it = produce(CommonPool) { steps.add(1); yield("foo"); steps.add(2); }; steps.await();//return 1; it.get();//return "foo" steps.await();//return null; it.get();//throws NoSuchElementException steps.await();//return 2; 

如果我的假设是错误的,谁能告诉我一些建议吗? 这是一个实现自定义协程的好方法吗? 我知道buildSequence生产者和消费者模式的另一种用法,但是我有一点点不同。

Interesting Posts