将RxJava 1.0代码更新到RxJava 2.0

我正在更新一些旧的Android应用程序代码,并不知道如何将在RxJava / RxAndroid 1.0工作的代码转换为RxJava / RxAndroid 2.0。 我试图转换的代码负责获取信号强度指标,并每秒用它的值更新图表。 我设法在RxJava 2.0中破解的功能根本不起作用,几秒钟后,SIGABORT崩溃。 我无法弄清楚我错过了什么?

我的RxJava 1代码:

rx.Observer myObserver = new rx.Observer<Float>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Float aFloat) { addEntry(aFloat); } }; subscription = rx.Observable.interval(1, TimeUnit.SECONDS) .flatMap(new Func1<Long, rx.Observable<Float>>() { @Override public rx.Observable<Float> call(Long aLong) { return getSignalStrength(); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(myObserver); private rx.Observable<Float> getSignalStrength() { return rx.Observable.create(subscriber -> { WifiManager wifiManager = (WifiManager) getContext().getSystemService(Context.WIFI_SERVICE); Integer value = wifiManager.getConnectionInfo().getRssi(); subscriber.onNext(value.floatValue()); subscriber.onCompleted(); }); } 

而我想要做的RxJava 2.0:

  DisposableObserver<Float> disposableObserver = new DisposableObserver<Float>() { @Override public void onNext(Float value) { addEntry(value); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; Disposable disposable = Observable.interval(1, TimeUnit.SECONDS) .flatMap(new Function<Long, ObservableSource<Float>>() { @Override public ObservableSource<Float> apply(Long aLong) throws Exception { return signalStrengthInfo.getSignalStrength(); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(disposableObserver); private Observable<Float> getSignalStrength() { return Observable.create(subscriber -> { wifiManager = (WifiManager) context.getSystemService(Context.WIFI_SERVICE); Integer signalStrength = wifiManager.getConnectionInfo().getRssi(); subscriber.onNext(Float.valueOf(signalStrength)); subscriber.onComplete(); }); } 

Rx2中的代码确实卷入其中。

只是看到简单的解决方案(抱歉kotlin),但如果你有一些问题,使用java只是ping我评论。

 val disposable = Observable.interval(1, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .flatMap { getSignalStrength() } .doOnNext { addEntry(it) } .subscribe() private fun getSignalStrength(): Observable <Float> { val wifiManager = this.getSystemService(Context.WIFI_SERVICE) as WifiManager val signalStrength = wifiManager.connectionInfo.rssi return Observable.just(signalStrength.toFloat()) }