Kotlin使用RxJava2来观察观察者
Android Studio 3.0 Beta2
我创建了两个方法,一个创建观察者,另一个创建订阅者。
不过,我有一个问题试图让订阅者订阅观察。 在Java中这将工作,我正在努力让它在Kotlin工作。
在我的onCreate(..)方法,我试图设置这个。 这是正确的方法来做到这一点?
class MainActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) /* CANNOT SET SUBSCRIBER TO SUBCRIBE TO THE OBSERVABLE */ createStringObservable().subscribe(createStringSubscriber()) } fun createStringObservable(): Observable<String> { val myObservable: Observable<String> = Observable.create { subscriber -> subscriber.onNext("Hello, World!") subscriber.onComplete() } return myObservable } fun createStringSubscriber(): Subscriber<String> { val mySubscriber = object: Subscriber<String> { override fun onNext(s: String) { println(s) } override fun onComplete() { println("onComplete") } override fun onError(e: Throwable) { println("onError") } override fun onSubscribe(s: Subscription?) { println("onSubscribe") } } return mySubscriber } }
非常感谢您的任何建议,
密切关注类型。
Observable.subscribe()
有三个基本的变体:
- 一个不接受参数
- 几个接受
io.reactivex.functions.Consumer
- 一个接受一个
io.reactivex.Observer
你在你的例子中试图订阅的类型是org.reactivestreams.Subscriber
(定义为Reactive Streams Specification的一部分)。 你可以参考文档来得到这个类型的更全面的计算,但是可以说它与任何重载的Observable.subscribe()
方法都不兼容。
这里是你的createStringSubscriber()
方法的一个修改的例子,它将允许你的代码编译:
fun createStringSubscriber(): Observer<String> { val mySubscriber = object: Observer<String> { override fun onNext(s: String) { println(s) } override fun onComplete() { println("onComplete") } override fun onError(e: Throwable) { println("onError") } override fun onSubscribe(s: Disposable) { println("onSubscribe") } } return mySubscriber }
事情变了:
- 这将返回一个
Observer
类型(而不是Subscriber
) -
onSubscribe()
传递Disposable
(而不是Subscription
)
..正如“Vincent Mimoun-Prat”所提到的,lambda语法可以真正缩短你的代码。
override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) // Here's an example using pure RxJava 2 (ie not using RxKotlin) Observable.create<String> { emitter -> emitter.onNext("Hello, World!") emitter.onComplete() } .subscribe( { s -> println(s) }, { e -> println(e) }, { println("onComplete") } ) // ...and here's an example using RxKotlin. The named arguments help // to give your code a little more clarity Observable.create<String> { emitter -> emitter.onNext("Hello, World!") emitter.onComplete() } .subscribeBy( onNext = { s -> println(s) }, onError = { e -> println(e) }, onComplete = { println("onComplete") } ) }
我希望有帮助!
看看RxKotlin ,这将简化很多事情,并使代码更加简洁。
val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon") list.toObservable() // extension function for Iterables .filter { it.length >= 5 } .subscribeBy( // named arguments for lambda Subscribers onNext = { println(it) }, onError = { it.printStackTrace() }, onComplete = { println("Done!") } )