Kotlin使用RxJava2来观察观察者

Android Studio 3.0 Beta2 

我创建了两个方法,一个创建观察者,另一个创建订阅者。

不过,我有一个问题试图让订阅者订阅观察。 在Java中这将工作,我正在努力让它在Kotlin工作。

在我的onCreate(..)方法,我试图设置这个。 这是正确的方法来做到这一点?

 class MainActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) /* CANNOT SET SUBSCRIBER TO SUBCRIBE TO THE OBSERVABLE */ createStringObservable().subscribe(createStringSubscriber()) } fun createStringObservable(): Observable<String> { val myObservable: Observable<String> = Observable.create { subscriber -> subscriber.onNext("Hello, World!") subscriber.onComplete() } return myObservable } fun createStringSubscriber(): Subscriber<String> { val mySubscriber = object: Subscriber<String> { override fun onNext(s: String) { println(s) } override fun onComplete() { println("onComplete") } override fun onError(e: Throwable) { println("onError") } override fun onSubscribe(s: Subscription?) { println("onSubscribe") } } return mySubscriber } } 

非常感谢您的任何建议,

密切关注类型。

Observable.subscribe()有三个基本的变体:

  • 一个不接受参数
  • 几个接受io.reactivex.functions.Consumer
  • 一个接受一个io.reactivex.Observer

你在你的例子中试图订阅的类型是org.reactivestreams.Subscriber (定义为Reactive Streams Specification的一部分)。 你可以参考文档来得到这个类型的更全面的计算,但是可以说它与任何重载的Observable.subscribe()方法都不兼容。

这里是你的createStringSubscriber()方法的一个修改的例子,它将允许你的代码编译:

 fun createStringSubscriber(): Observer<String> { val mySubscriber = object: Observer<String> { override fun onNext(s: String) { println(s) } override fun onComplete() { println("onComplete") } override fun onError(e: Throwable) { println("onError") } override fun onSubscribe(s: Disposable) { println("onSubscribe") } } return mySubscriber } 

事情变了:

  1. 这将返回一个Observer类型(而不是Subscriber
  2. onSubscribe()传递Disposable (而不是Subscription

..正如“Vincent Mimoun-Prat”所提到的,lambda语法可以真正缩短你的代码。

  override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) // Here's an example using pure RxJava 2 (ie not using RxKotlin) Observable.create<String> { emitter -> emitter.onNext("Hello, World!") emitter.onComplete() } .subscribe( { s -> println(s) }, { e -> println(e) }, { println("onComplete") } ) // ...and here's an example using RxKotlin. The named arguments help // to give your code a little more clarity Observable.create<String> { emitter -> emitter.onNext("Hello, World!") emitter.onComplete() } .subscribeBy( onNext = { s -> println(s) }, onError = { e -> println(e) }, onComplete = { println("onComplete") } ) } 

我希望有帮助!

看看RxKotlin ,这将简化很多事情,并使代码更加简洁。

 val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon") list.toObservable() // extension function for Iterables .filter { it.length >= 5 } .subscribeBy( // named arguments for lambda Subscribers onNext = { println(it) }, onError = { it.printStackTrace() }, onComplete = { println("Done!") } )