Tag: rx kotlin

如何使用kotlin Android从Url读取JSON?

我正在使用kotlin开发应用程序。现在我想从服务器获取JSON数据。 在Java中实现了Asyntask以及Rxjava从Url读取JSON。 我也搜索谷歌,但我无法得到我的要求适当的细节。 如何使用kotlin从Url读取JSON?

RxJava-将Observable转变为Iterator,Stream或Sequence

我知道这打破了很多Rx规则,但我真的很喜欢RxJava-JDBC ,所以我的队友也是这样。 关系数据库对于我们的工作非常重要,Rx也是如此。 但是有些情况下,我们不希望作为Observable<ResultSet>发出,而只是有一个基于拉的Java 8 Stream<ResultSet>或Kotlin Sequence<ResultSet> 。 但是我们非常习惯于只返回一个Observable<ResultSet>的RxJava-JDBC库。 因此,我想知道是否有一种方法可以使用扩展函数将Observable<ResultSet>转换为Sequence<ResultSet> ,而不执行任何中间集合或toBlocking()调用。 下面是我迄今所有的,但我的头正在旋转,试图连接推拉系统,我不能缓冲,因为每个onNext()调用ResultSet是有状态的。 这是不可能的任务吗? import rx.Observable import rx.Subscriber import java.sql.ResultSet fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() { private var isComplete = false override fun onCompleted() { isComplete = true } override fun onError(e: Throwable?) { throw UnsupportedOperationException() } override fun onNext(rs: ResultSet?) { throw UnsupportedOperationException() } […]

如何在RxJava2中默默跳过异常?

我有这样的数据流: Observable .fromFuture( CompletableFuture.supplyAsync { // First remote call returns Future<List<Type>> listOf(1, 2, 3, 57005, 5) }, Schedulers.computation() ) .flatMap { it.toObservable() } // I turn that list into a stream of single values to process them one by one .map { CompletableFuture.supplyAsync { // This remote call may fail if it does not like […]

相当于RxJava 2 isUnsubscribed

我一直在通过RxJava的“ Reactive Programming ”一书中的示例进行研究,该示例针对版本1而不是2.对无限流的介绍有以下示例(并注意有更好的方法来处理并发): Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> { Runnabler = () -> { BigInteger i = ZERO; while (!subscriber.isUnsubscribed()) { subscriber.onNext(i); i = i.add(ONE); } }; new Thread(r).start(); }); … Subscription subscription = naturalNumbers.subscribe(x -> log(x)); /* after some time… */ subscription.unsubscribe(); 但是,在RxJava 2中,传递给create()方法的lambda表达式的类型为ObservableEmitter ,并且没有isUnsubscribed()方法。 我在2.0中看到了什么是不同的,并且执行了对存储库的搜索,但是找不到任何这样的方法。 在2.0中如何实现这个相同的功能? 编辑包括下面给出的解决方案(使用kotlin的nb): val naturalNumbers = Observable.create<BigInteger> { […]

不能用RxKotlin“观察”主线程

我试图通过使用主线程观察可观察的: // Kotlin Code Observable .observeOn(AndroidSchedulers.mainThread()) 但我得到以下错误: Type Mismatch: Required: rx.Scheduler! Found: io.reactivex.Scheduler! 我订阅的Observable来自一个用Java编写的库,因此使用RxJava。 我是愚蠢的,错过了什么? 我puzzeled:$ 提前致谢 :)

重复发送对象到Rx上的子客户端

是否可以将相同的对象反复发送给Rx中的订阅者? 例如,这个代码(在Kotlin上): val exmp = listOf("А") var observable = exmp.toObservable() observable.subscribeBy( onNext = { it + "1" println(it) }, onError = { it.printStackTrace() }, onComplete = { println("Done!") } ) 我尝试将字符串值“A”重复发送到方法onNext()并获得“A111111”。 Rx库的方法重放()我明白开始发送日期再次为新的潜艇。 在从可观察的日期开始不变的循环for ,只是方法被调用几次

无法使用RxKotlin更改ActionMenuItemView的文本

我试图用Kotlin编写一个Android应用程序。 现在,我想在ActionBar中显示一个计数器。 我为此添加了一个名为show_timer的项目。 每秒钟,它应该数一个: override fun onWindowFocusChanged(hasFocus: Boolean) { val item = findViewById(R.id.show_timer) as ActionMenuItemView PublishSubject.interval(1, java.util.concurrent.TimeUnit.SECONDS, Schedulers.newThread()) .subscribeBy(onNext = {item.text = it.toString()}) super.onWindowFocusChanged(hasFocus) } 但不知何故,这是行不通的。 它将默认文本更新为0,但在此之后它什么都不做。 有人知道为什么这不起作用吗? 先谢谢你, 尼克拉斯

如何有条件地在RxJava流中添加一个异步操作?

这里是我想要做的简化版本(使用Kotlin和RxJava) makeServerCall() .doOnNext { doStuff(it) } //TODO: if it == 0, call asyncOperation() and wait for its callback to fire //before running the rest of the stream. Otherwise immediately run the rest //of the stream .flatMap { observable1(it) observable2(it) Observable.merge( getSpotSearchObservable(observable1), getSpotSearchObservable(observable2) } .subscribeBy(onNext = { allDone() view? }) 如何挤压调用asyncOperation()并使流的其余部分等待其回调触发,但是只有在满足某些条件时才会触发。 这看起来似乎可能是Rx的一个微不足道的操作,但是没有什么明显的解决方案。

注入构造函数和伴随对象

我对Kotlin是新鲜的,我正在尝试注入一个值(在这个例子中,它只是一个Int,但在真正的代码中它是一个Provider类)我在这里做错了什么? 为什么x是一个未解决的参考? class Test @Inject constructor(private val x: Int) { companion object { var y: Int = 0 @BeforeClass @JvmStatic fun beforeClass() { y = x * 2 } } }

如何在RxJava 2和Kotlin中将null传递给可空类型的Observable

我像这样初始化我的变量: val user: BehaviorSubject<User?> user = BehaviorSubject.create() 但我不能这样做。 IDE会引发错误: – user.onNext(null) 这样做,IDE说你永远不会是空的: – user.filter( u -> u!=null)