Tag: RX java

RxJava(Kotlin),Observable.amb和PublishSubject没有开火

我们试图观察15s的时间间隔,或者在我们的主题refreshEventsSubject上触发refreshEventsSubject ,但是没有成功。 主题是这样启动的 private val refreshEventsSubject = PublishSubject<Long>() 然后我们尝试像这样观察它 Observable.merge(Observable.interval(0, 15, TimeUnit.SECONDS), refreshEventsSubject) .subscribe { … } 我们每隔15s就会收到一次这个事件,但是跑步之后这个话题还没有开始 refreshEventsSubject.onNext(0) 任何想法赞赏。 (一切都是用Kotlin写的)

Kotlin:如何从RxJava订阅者继承

我希望我的NewsListSubscriber继承自使用泛型类型的RxJava订阅服务器,但是当我调用UseCase execute方法时,出现“类型不匹配”错误。 我从Kotlin文档中多次阅读泛型页面,但找不到解决方案。 这是我的UseCase: abstract class UseCase(private val threadExecutor: IThreadExecutor, private val postExecutionThread: IPostExecutionThread) { private var subscription = Subscriptions.empty() fun execute(UseCaseSubscriber: rx.Subscriber<Any>) { subscription = buildUseCaseObservable() .subscribeOn(Schedulers.from(threadExecutor)) .observeOn(postExecutionThread.getScheduler()) .subscribe(UseCaseSubscriber) } protected abstract fun buildUseCaseObservable(): Observable<out Any> fun unsubscribe() { if (!subscription.isUnsubscribed) { subscription.unsubscribe() } } } 这就是我所说的: override fun loadNewsList() { getNewsListInteractor.execute(NewsListSubscriber()) } private […]

Kotlin类型不匹配编译错误:需要成功<T>,找到MyError

我遇到了以下代码不能在kotlin中编译的问题。 // StateModel.kt sealed class StateModel class Loading : StateModel() data class Success<T: Any>(val data: T) : StateModel() data class MyError(val message: String) : StateModel() // StateModelTransformer.kt class StateModelTransformer<T: Any> : FlowableTransformer<T, StateModel> { override fun apply(upstream: Flowable<T>): Publisher<StateModel> { return upstream .map { data -> Success(data) } .onErrorReturn { error -> MyError(error.message) // compile […]

在Kotlin函数类型中表示“超级”泛型?

我试图移植一个RxJava库,并利用Kotlin的扩展功能。 fun <T,R: MutableCollection<T>> Observable<T>.collectWhile(factory: (() -> R), condition: (R,T) -> Boolean) = compose(Transformers.collectWhile(factory,condition)) Transformers.collectWhile()是用Java编写的,具有以下签名: public static <T, R extends Collection<T>> Transformer<T, R> collectWhile(final Func0<R> factory, final Action2<? super R, ? super T> collect) 但是,我正在collect关于collect参数的映射问题,而且我不擅长泛型。 我如何用功能类型来表达super ? UPDATE 我愚蠢的错误。 我不应该在深夜发帖。 我其实是针对这个 public static <T, R extends Iterable<?>> Transformer<T, R> collectWhile(final Func0<R> factory, final Action2<? super […]

Kotlin – 如何创建RxJava flatmap()的别名函数?

我试图创建一个别名函数Flowable.flatmap()如下,但编译错误。 fun <T, R> Flowable<T>.then(mapper: Function<T, Publisher<R>>): Flowable<R> { return flatMap(mapper) } 错误是: 在kotlin中定义的接口Function<out R>有一个类型参数 有什么想法? 谢谢!

kotlin grpc.StreamObserver去rx.PublishSubject

每当我们声明流API时使用GRPC rpc heartBeat(Empty) returns (stream ServiceStatus){} 我们已经为观察者模式StreamObserver简单的接口(这是protobuf会为我们生成的) public interface StreamObserver<V> { void onNext(V var1); void onError(Throwable var1); void onCompleted(); } 现在你想要做的就是将其转换为一个实际的Observable并且只有在它被传递之后才能继续使用。 override fun heartBeat(arg: Empty): Observable<ServiceStatus> { // we create rx java subject val subject = PublishSubject.create<ServiceStatus>() // we create grpc observer and delegate all calls to rx java val observer = object : StreamObserver<ServiceStatus> […]

用Kotlin在RxJava中处理可空类型

在Java中使用RxJava2时,我有map()过滤自动发出空值的好处。 然而,Kotlin中的可空类型最终会做这样的事情: val loadConferencesSingle = service.getConferences() .map { it.conferences ?: listOf<Conference>() } 在这种情况下调用service.getConferences()是一个单一的发射ConferecesResponse看起来像 data class ConferencesResponse(val conferences: List<Conference?>? = null) 因此,如果完整的会议发布清单是空的,我最终使用猫王操作员发出一个空列表。 在map发射null是不可能的。 有谁有线索,如何用Kotlin在RxJava中更好地处理可空类型? 我已经喜欢这个解决方案和它的简洁,但是我觉得有更好的方法来处理这个问题。

MissingMethodInvocationException在Kotlin中测试已打开的类

我正在尝试使用Kotlin + RxJava和MockWebServer进行一些集成测试。 我在测试方面是一个新手,我是Kotlin学徒。 我知道Mockito和final类的局限性,但我不应该嘲笑我正在测试的类,所以我不知道真正的问题在哪里: 错误如下: Apr 16, 2016 9:59:49 PM okhttp3.mockwebserver.MockWebServer$3 execute INFO: MockWebServer[54260] starting to accept connections Apr 16, 2016 9:59:50 PM okhttp3.mockwebserver.MockWebServer$3 acceptConnections INFO: MockWebServer[54260] done accepting connections: Socket closed org.mockito.exceptions.misusing.MissingMethodInvocationException: when() requires an argument which has to be 'a method call on a mock'. For example: when(mock.getArticles()).thenReturn(articles); Also, this error might […]

Kotlin四倍,五倍等解构

我正在寻找一种干净的方式来创建内联可破坏的对象。 kotlin.Pair和kotlin.Triple覆盖了很多用例,但是有时需要传递更多的对象。 一个示例用例是RX的zip函数,其中几个I / O调用的结果需要映射到另一个对象: Single .zip(repositoryA.loadData(someId), repositoryB.loadData(someId), repositoryC.loadAll(), repositoryD.loadAll()), { objectA, objectB, objectsC, objectsD -> /*some Kotlin magic*/ } ) .map { (objectA, objectB, objectsC, objectsD) -> /*do the mapping*/ } 我想弄清楚“Kotlin的一些魔力”部分会发生什么。 如果只有三个版本库,那将是 Triple(objectA, objectB, objectsC) 我是否需要为此创建一个新的数据类,对于任何n元组情况,还是有另一种方法?

显式类型参数Kotlin和RxJava

这是在Kotlin中使用RxJava与ThreadTransformer compose的唯一可能的方式吗? 我只是不喜欢compose功能中的<MyType>部分。 有没有可能省略它? override fun call(): Observable<MyType> { return Observable.just(getData()) .compose(threadTransformer.applySchedulers<MyType>()) }