Tag: RX java

Kotlin:具有可空值的BiFunction无法编译

这(大大简化)代码无法为我编译。 不知道为什么。 返回types是Entry? null对我来说似乎是一个有效的价值。 val foo = BiFunction<Int, List, Entry?> { foo:Int, bar:List -> null } 错误消息是Null can not be a value of a non-null type Entry 谁能告诉我我错过了什么? 我在用: ext.kotlin_version = ‘1.2.10’ compile “io.reactivex.rxjava2:rxjava:2.1.8” compile ‘io.reactivex.rxjava2:rxandroid:2.0.1’ 我欢迎任何建议。 新年快乐!

RxJava zip使用Android Studio 3.0在Kotlin中遇到IDE错误

我想用Interval Observable创建一个Observable,它包含Observable,Observable包含Observable对象的列表,这样,来自第一个Observable的Observable将会有一定的时间延迟。 这是我的实现: val just1 = ArrayList() 填充一些项目 fun populateJust1() { just1.add(SomeClass1(“23”, 23)) just1.add(SomeClass1(“24”, 24)) just1.add(SomeClass1(“25”, 25)) } populateJust1() 并以间隔Observable压缩 Observable.fromIterable(just1) .zipWith(Observable.interval(2, TimeUnit.SECONDS)) { item: SomeClass1, interval: Long -> item } .subscribe(Consumer { someClass1 -> Log.v(“someClass1”, someClass1.toString()) }) 然而,IDE,Android Studio 3.0以zipWith运算符为红色,并表示: 所提供的参数都不能调用以下函数。 (((观察者:观察者)→单位)!,((t1:SomeClass1,t2:长)→R)!)其中R不能被推断; U = Long for fun zipWith(other:((observers:Observer)→Unit)!,zipper:((t1:SomeClass1,t2:U)→R)!):Observable! 在io.reactivex.Observable zipWith(ObservableSource!,BiFunction!)中定义,其中R无法推断; U =长! 为了好玩zipWith(其他:ObservableSource!,拉链:BiFunction!):可观察! 定义在io.reactivex.Observable […]

Android Rx-java + Kotlin的Retrofit2错误

我正在尝试构建一个MVVM模式的Android应用程序。 除了rx-java部分,一切都很好。 在订阅中使用Observer,我有这样的错误。 Error:(28, 18) None of the following functions can be called with the arguments supplied: public final fun subscribe(p0: ((Flyer!) -> Unit)!): Subscription! defined in rx.Observable public final fun subscribe(p0: Observer!): Subscription! defined in rx.Observable public final fun subscribe(p0: Subscriber!): Subscription! defined in rx.Observable public final fun subscribe(p0: Action1!): Subscription! defined in rx.Observable […]

Kotlin高阶函数和单方法接口的行为?

我之前在使用RxJava和Kotlin时遇到了一些问题。 我做了一些有趣的发现,我仍然感到困惑。 RxJava中有简单的Func1接口 public interface Func1 extends Function { R call(T t); } 我试图添加一个扩展方法到一个Observable ,也是一个RxJava类。 这将收集排放到谷歌Guava ImmutableListMulitmap使用Func1来映射每个项目的关键。 fun Observable.toImmutableListMultimap(keyMapper: Func1): Observable<ImmutableListMultimap> { return this.collect({ ImmutableListMultimap.builder()},{ b, t -> b.put(keyMapper.call(t), t)}).map { it.build() } } 当我试图调用这个扩展方法时,我无法编译它,它根本不理解lambdaexpression式。 ScheduledItem.all.flatMap { it.rebuildSoftTransactions } .toImmutableListMultimap { it.id /*compile error */ } .cache() 然而,当我修改扩展方法使用函数types时,发生了最奇怪的事情。 fun Observable.toImmutableListMultimap(keyMapper: (T) -> K): Observable<ImmutableListMultimap> { […]

Kotlin和RxJavatypes推断失败

我试图执行重试当我的Observable时,会出现一个时间,但我有奇怪的错误下划线在IDE Android Studio 3.0 这里是代码: rxRssiRepository.onRssiUpdate() //returns Observable .timeout(10, TimeUnit.MILLISECONDS) .retryWhen { t: Observable -> t.flatMap { error: Throwable -> if (error is TimeoutException) { stopLocationUpdates() Log.v(“TIMEOUT”, “TIMEOUT RSSI EVENTS”) Observable.just(Observable.empty()) } else { Observable.error(error) } } } .concatMap { t: RssiEvent -> appendRssiEvent(t) } .publish() IDE强调了.flatMap运算符的红色,并说: types推断失败:没有足够的信息来推断参数R in fun flatMap(mapper:((t:Throwable)→ObservableSource)!):Observable! 请明确指定。 “if”运算符也是如此: 控制流expression式的types推断失败。 请明确指定其types […]

RxJava 2.0和Kotlin Single.zip()以及单曲列表

我有我无法解决的问题。 我试图将.zip(列表,)多个单打合并为一个使用Kotlin和我提供的函数作为第二个参数适合。 fun getUserFriendsLocationsInBuckets(token: String) { roomDatabase.userFriendsDao().getUserFriendsDtosForToken(token).subscribe( { userFriends: List -> Single.zip(getLocationSingleForEveryUser(userFriends), Function<Array<List>, List> { t: Array<List> -> listOf() }) }, { error: Throwable -> } ) } private fun getLocationSingleForEveryUser(userFriends: List): List<Single<List>> = userFriends.map { serverRepository.locationEndpoint.getBucketedUserLocationsInLast24H(it.userFriendId) }

Rx和Kotlintypes推断中的generics函数引用失败

我在Kotlin写了一个方法: fun fetchDepositSession(): Completable = Observable.fromIterable(session.accounts) .map(DepositSession::DepositAccount) .toList() .doOnSuccess(depositSession::depositAccounts::set) .flatMapObservable(Observable::fromIterable) .map(DepositSession.DepositAccount::account::get) .toCompletable() 该行.flatMapObservable(Observable::fromIterable)正在导致一个错误:

使用flatMap的Rx Java Retrofit只运行一次

我试图从一个按钮点击流,然后平面地图与另一个可观察的改造,以请求一个端点,但它只能运行一次观察! 当我再次点击时,不要通过平面地图方法。 这是奇怪的原因,当平面地图返回其他observable它运行正常,但与改造之一,它只能运行一次。 override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) var retrofit:Retrofit = Retrofit.Builder() .baseUrl("<SERVER_IP>") .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.createWithScheduler(Schedulers.io())) .build() var testService:TestService = retrofit.create(TestService::class.java) var buttonObservable:Observable<Void> = RxView.clicks(btnRequest) buttonObservable .observeOn(Schedulers.newThread()) .subscribeOn(AndroidSchedulers.mainThread()) .flatMap { Log.i("debug", "run flatmap") var request:Request = Request() request.accessToken = "<acess_token>" testService.test(request) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ user -> Log.i("debug", "user ") }, { err -> […]

RxJava Observable.create包装可观察订阅

我使用了Observable.create,所以当某些数据可用时,我可以通知用户。 我有点不确定订阅在我的创建方法内的observables。 这些嵌套的订阅会给我什么问题吗? 我不完全熟悉使用Observable.create创建observables,所以我想确保我没有做任何不寻常的事情或滥用它。 先谢谢你! abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider) { abstract fun fetchFromApi(): Single<ApiType> abstract fun fetchFromDb(): Observable<Optional<DbType>> abstract fun saveToDb(apiType: ApiType?) abstract fun shouldFetchFromApi(cache: DbType?): Boolean fun fetch(): Observable<Optional<DbType>> { return Observable.create<Optional<DbType>> { val subscriber = it fetchFromDb() .subscribe({ subscriber.onNext(it) if(shouldFetchFromApi(it.get())) { fetchFromApi() .observeOn(schedulerProvider.io()) .map { saveToDb(it) it } .observeOn(schedulerProvider.ui()) […]

弹簧5反应堆 – 每1秒发射一次物品

我试图每秒发出一次价值 Flux.just(User("A"), User("B"), User("C")).delayElements(Duration.ofSeconds(1)) 但是它以1秒的启动延迟立刻发出一切。 我怎样才能引入每个元素发射的延迟?