Tag: rx kotlin

与RxKotlin获取NetworkOnMainThreadException

我试图使用RxKotlin做网络请求,但不断得到一个NetworkOnMainThreadException我在主线程订阅,所以我不知道为什么它没有把它从UI线程。 这里是我订阅Observable的地方 weatherInteractor.getWeather(lat, lng) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( {response -> try { val jsonData = response.body().string() val currentWeather = getCurrentWeatherData(jsonData) view!!.displayCurrentWeather(currentWeather) } catch (e: JSONException) { Log.d("Present JSON Exception", e.message) } catch (e: IOException) { Log.d("Present IO Exception", e.message) } }, { error -> error.printStackTrace() } ) } 这里是我创建我的Observable的地方 fun getWeather(lat: Double, lng: Double): Observable<Response> { […]

CountdownTimer完成时如何通知Observable

我有一个自定义的Android TextView,通过CountDownTimer显示游戏中剩余的时间 class CountdownTextView(context: Context, attrs: AttributeSet) : TextView(context, attrs) { private lateinit var countDownTimer: CountDownTimer private lateinit var onFinishObservable: Observable<Unit> fun setTime(initTime: Int) { this.text = "$initTime:00" countDownTimer = object : CountDownTimer((initTime *1000).toLong(), 1000) { override fun onTick(millisUntilFinished: Long) { val minutes = millisUntilFinished / 60000 val seconds = (millisUntilFinished % 60000) / 1000 […]

异步调用集合中的每个项目

我有一个问题,迄今为止我无法解决,我是RxKotlin的新手,所以可能很简单。 请看代码: override fun infos(): Stream<Info> = client.infoAboutItem(identifier) .map { val itemId = it.itemId ?: "" val item = client.itemForId(itemId) ClientInfo(client, it, source, item) as Info } .let { AccessStream(it) } 流哪里是我们自制的收藏。 地图是一种方法,允许您迭代该集合中的每个项目。 这里的问题是 client.itemForId(itemId) 是一个http调用,返回一个不理想的Single。 我想创建一个异步调用内部映射,将返回项目而不是单一,然后将其传递给ClientInfo。 到目前为止我尝试过的事情是使用在地图内部的订阅和使用blockingGet()方法,但是这阻止了主线程,即使我观察和订阅不同的线程 所以它涉及到对集合中的每一件事进行异步调用。 感谢帮助

RxJava2如果没有元素,可能返回空Observable

有没有比flatMap和try/catch使用JavaRx 2中Maybe类型的更好/更习惯的方法? 以下示例使用Maybe<User>并尝试为它们预订一个航班的随机票。 如果用户不存在,则返回一个空的Observable 。 fun bookRandomTicketFor(userId: UUID): Observable<Ticket> { val agencies = travelAgents() // Observable<TravelAgency> val user = findById(userId) // Maybe<User> val location = locate() // Observable<GeoLocation> return Observable .just(user.toObservable()) .flatMap { usr -> try { usr.zipWith(location, { aUser, location -> agencies .flatMap { agency -> agency .search(aUser, location) // Observable<Flight>. .toList() // Convert […]

RxAndroid – 重试观察点击

我在Android应用程序中使用rxAndroid和rxKotlin来异步处理网络请求。 现在我想重试一个失败的网络请求,只有点击Snackbar按钮后。 我的代码现在: val citiesService = ApiFactory.citiesService citiesService.cities() .subscribeOn(Schedulers.newThread()) // fetch List<String> .flatMap { Observable.from(it) } // convert to sequence of String .flatMap { city -> citiesService.coordinates(city) // fetch DoubleArray .map { City(city, it) } // convert to City(String, DoubleArray) } .toList() .observeOn(AndroidSchedulers.mainThread()) .doOnNext { listView.setOnItemClickListener { adapterView, view, position, id -> onItemClick(it[position]) } } […]

Kotlin lambda语法混淆

我被Kotlin lambda语法弄糊涂了。 起初,我有 .subscribe( { println(it) } , { println(it.message) } , { println("completed") } ) 这工作正常 。 然后,我将onNext移动到另一个名为GroupRecyclerViewAdapter的类,该类实现了Action1<ArrayList<Group>> 。 .subscribe( view.adapter as GroupRecyclerViewAdapter , { println(it.message) } , { println("completed") } ) 但是,我得到了错误: Error:(42, 17) Type mismatch: inferred type is () -> ??? but rx.functions.Action1<kotlin.Throwable!>! was expected Error:(42, 27) Unresolved reference: it Error:(43, 17) […]

当订阅TestSubscriber时,RxKotlin – Single.just()不会发出

我以为这是如何工作,似乎我失去了一些东西.. @Test fun singleCompletes() { val testSubscriber = TestSubscriber<Boolean>() Single.just(true) .subscribeOn(Schedulers.immediate()) .subscribe { testSubscriber } testSubscriber.assertNoErrors() testSubscriber.assertValue(true) } java.lang.AssertionError:项目数量不匹配。 提供:1当前:0。

RxKotlin(RxJava2)timeout()不会抛出TimeoutException

我试图得到一个使用两个不同超时值的示例工作。 第一个排放量的初始值较大,随后的所有排放量则为较短的值。 这个例子是从RxJava v1x的Java转换为Kotlin,虽然我试图这是v2x(不知道这是否有任何区别)。 问题是第一个事件的超时不会引发TimeoutException 。 设置的值低于500毫秒,我期待一个堆栈跟踪打印,但我得到的输出,如果没有超时已经发生(后续排放超时设置为40毫秒导致堆栈跟踪如预期)。 以下示例阻止成功初始超时有什么问题? fun nextSolarEclipse(after: LocalDate): Observable<LocalDate> { return Observable .just( LocalDate.of(2016, Month.MARCH, 9), LocalDate.of(2016, Month.SEPTEMBER, 1), LocalDate.of(2017, Month.FEBRUARY, 26), LocalDate.of(2017, Month.AUGUST, 21), LocalDate.of(2018, Month.FEBRUARY, 15), LocalDate.of(2018, Month.JULY, 13), LocalDate.of(2018, Month.AUGUST, 11), LocalDate.of(2019, Month.JANUARY, 6), LocalDate.of(2019, Month.JULY, 2), LocalDate.of(2019, Month.DECEMBER, 26) ) .skipWhile { date -> !date.isAfter(after) } .zipWith( Observable.interval(500, […]

Kotlin高阶函数和单方法接口的行为?

我之前在使用RxJava和Kotlin时遇到了一些问题。 我做了一些有趣的发现,我仍然感到困惑。 RxJava中有简单的Func1接口 public interface Func1<T, R> extends Function { R call(T t); } 我试图添加一个扩展方法到一个Observable ,也是一个RxJava类。 这将收集排放到谷歌Guava ImmutableListMulitmap使用Func1来映射每个项目的关键。 fun <K,T> Observable<T>.toImmutableListMultimap(keyMapper: Func1<T, K>): Observable<ImmutableListMultimap<K,T>> { return this.collect({ ImmutableListMultimap.builder<K,T>()},{ b, t -> b.put(keyMapper.call(t), t)}).map { it.build() } } 当我试图调用这个扩展方法时,我无法编译它,它根本不理解lambda表达式。 ScheduledItem.all.flatMap { it.rebuildSoftTransactions } .toImmutableListMultimap { it.id /*compile error */ } .cache() 然而,当我修改扩展方法使用函数类型时,发生了最奇怪的事情。 fun <K,T> Observable<T>.toImmutableListMultimap(keyMapper: (T) […]

用RxJava重复对状态的操作

这是在Kotlin,但我认为任何编写Java的人都能理解。 我正在尝试用Rx制作一个秒表,而且我在实际停止和启动时遇到了一些麻烦。 最大的问题是,我不知道如何保持现在的时间,而修改它作为不同的行动(开始和停止)进来。这是我现在所拥有的。 fullTime.switchMap { startTime -> controlCommands.switchMap { command -> when (command) { ControlState.PLAY -> Observable.interval(1L, TimeUnit.SECONDS).map { ControlState.PLAY } ControlState.PAUSE -> Observable.just(ControlState.PAUSE) else -> Observable.just(ControlState.STOP) } } } fullTime和controlCommands是Observable ,它们分别发出有关当前开始时间的事件,从中倒数,然后说下一步该做什么。 我想链接controlCommands并能够保持状态在startTime开始,当PLAY事件出现时将倒计时, PAUSE出现时PAUSE ,并在出现STOP时在startTime复位。 scan几乎工作,但我不知道如何停止计时器命中0和PLAY仍然每秒钟发送,因为它会发送重复的信息。 也不允许状态和观察值之间的分离。 所以我scan积累的值必须是Observable内部的值(如果有意义的话)。 任何想法我应该做什么?