Tag: RX java

在Rx中设置调度程序的顺序

我对Rx很新,只是想知道subscribeOn的顺序如何影响Observable //This will not print anything Observable.just("whatever") .flatMap { s -> Observable.just(s.length) } .subscribeOn(Schedulers.newThread()) .subscribe(::println) //This prints the length Observable.just("whatever") .subscribeOn(Schedulers.newThread()) .flatMap { s -> Observable.just(s.length) } .subscribe(::println) 发生了什么事情,为什么?

用observable替换每个double

我有两个列表: List<Foo>和List<Bar> 。 对于List<Foo>每个项目,我必须检查List<Bar>所有项目,并且如果匹配,则设置一个值。 今天我使用这个代码: fooList.forEach { foo -> barList.forEach { bar -> if(foo.id == bar.id) { bar.name = foo.name } } } return barList 我想改变它,因为现在我有一个Observable<Foo>和一个Observable<Bar> 。 我怎样才能取代我的代码来使用反应式编程?

用rxJava和Retrofit重复请求登录表单

我想在rxJava上进行登录表单并进行改造。 但是,如果我在请求服务器时出错,取消订阅从rx的底部调用 我有使用rxAndroidBinding lib实现的ui方法 fun validEmail(): Observable<CharSequence> //last well formated login fun validPassword(): Observable<CharSequence> //last password of length fun clicks(): Observable<Unit> //clicks on login buttons 我已经通过Retrofit fun authorize(email:String, pass:String): Observable<Unit>实现了注册方法fun authorize(email:String, pass:String): Observable<Unit> 如果两个输入都有效,并且点击登录按钮,我想提出请求 “` val validPair = rx.Observable.combineLatest(iView.validEmail(), iView.validPassword(), ::ValidLoginPair) .doOnNext { iView.setLoginButtonEnabled(true) } subscription = rx.Observable.combineLatest(validPair, iView.clicks(), { pair, unit -> pair }) […]

“Subject.asObservable()”和主题本身“主题”之间的任何区别?

如果一个Subject从Observable继承,基于任何Subject的下一个选项有什么区别,比如: private val locationSubject: ReplaySubject<Location> = ReplaySubject.create<Location>() 1.将subject本身作为Observable返回 fun getLocations(): Observable<Location> = locationSubject 2.返回subject.asObservable() 。 fun getLocations(): Observable<Location> = locationSubject.asObservable()

我怎样才能暂停一个事件流经一个可观察的?

我正在尝试创建一个可以暂停的Observable,这样一来,物品就会停止通过可观察物体,直到它变为未被暂停为止。 在这一点上,我希望它恢复处理所有未处理的项目。 我的数据源来自课堂之外,所以我现在看起来像这样: class Agent { val publisher = PublishSubject.create<Event>() val subscription = createSubscription() fun trackEvent(e: Event) { publisher.onNext(e) } fun pause() { // ??? } fun resume() { // ??? } private fun createSubscription(): Subscription { return publisher .map { stringify(it) } .buffer(10L, TimeUnit.SECONDS, 500) // capture 500 events or 10 seconds worth, whichever […]

JavaRX:如何立即返回缓存值并行执行网络请求

在等待接收网络请求时,如何立即产生先前的缓存值? val suggestionsCache = mutableMapOf<String, List<Suggestion>>() override fun requestSuggestions(queryString: String, queryLocation: Location): Observable<List<Suggestion>> { return Observable.merge( getCached(queryString), placesService .requestSuggestions(queryString, queryLocation) .doOnNext { cache(it, queryString) }) } private fun getCached(queryString: String): Observable<List<Suggestion>?>? { return if (suggestionsCache.containsKey(queryString)) Observable.just(suggestionsCache[queryString]) else Observable.never<List<Suggestion>>() } private fun cache(suggestions: List<Suggestion>, queryString: String) { suggestionsCache[queryString] = suggestions } 这是关于requestSuggestions方法。 如果可能的话,它应该从缓存中立即得到结果,并且仍然对placeService运行一个请求,这样用户将立即看到建议,但是在请求终止之后也会得到一个更新的列表。 上面的代码在订阅时不会产生任何元素。 我错过了什么?

当有变化时,RxJava2可观察的不在onNext上进行处理

我是RxJava的初学者。 我想追踪Int变量的变化,并在变化时对它作出反应。 这是我的例子: class MainActivity : AppCompatActivity() { lateinit var button: Button var counter: Int = 0 override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) val observable :Observable<Int> = Observable.just(counter) button = findViewById(R.id.button) button.setOnClickListener(View.OnClickListener { counter++ }) observable.subscribe( Consumer { t -> Log.d("fromObservable", counter.toString()) } ) } } } 但是,日志从不调用时,我点击按钮。 我怎样才能使用RxJava2正确?

RxJava BehaviorSubject不发射最后一项?

我有一个简单的RxJava,使用ReplaySubject,我可以得到结果,所有3个数字打印。 val observable : Observable<Int> = Observable.just(1, 2, 3) val subject = ReplaySubject.create<Int>() observable.subscribe(subject) subject.subscribe{ result -> System.out.println("Start $result in Subscription Result") } 当我改变行为,我期望第三个数字,即3打印,因为我一直认为行为是重播最后发射的项目。 val observable : Observable<Int> = Observable.just(1, 2, 3) val subject = BehaviorSubject.create<Int>() observable.subscribe(subject) subject.subscribe{ result -> System.out.println("Start $result in Subscription Result") } 但它不打印任何东西。 为什么? 我错过了这里重要的事情吗 如果是的话,让我知道如何得到假设发出的最后一个项目(即3)打印。

Android – 使用泛型有一个RxJava翻新调用使用相同的接口返回各种类型

我正在玩泛型,第一次清理一些重复的代码,我有改进的电话。 我能够制作一个通用的改装适配器,所以我的供应商不必每个都有一个独特的创造者,这真是太棒了,令人兴奋。 private fun <T> create(service: Class<T>, baseUrl: String): T { val retrofit = Retrofit.Builder() .client(client) .addConverterFactory(MoshiConverterFactory.create()) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .baseUrl(baseUrl) .build() return retrofit.create(service) } 现在我试图做一个通用的网络调用,但我无法弄清楚,并想知道如果这是甚至可能的改造如何工作? 我现在有5个服务,如下所有,不同的API。 每个人都有不同的终结点,但都接受1个参数,并返回他们的对象,每个API调用略有不同。 interface ServiceA { @GET("v2/ticker") fun getCurrentTradingInfo(@Query("book") orderBook: String): Observable<CurrentTradingInfo> } 所有的响应对象都实现了一个接口来规范响应数据,以便以后可以显示 data class CurrentTradingInfo(val mid: String, val bid: String, val ask: String, val last_price: String, val low: String, val […]

如何上传一个构造函数的返回值

我想解决一个类型不匹配的编译器错误,我从下面的代码,其中类型推断没有做我想要的。 我有一个密封的类和一个rxjava流: sealed class Result { data class Success(val logs: List<Log>) : Result() data class Failure(val throwable: Throwable) : Result() object InFlight : Result() } val logs: Observable<List<Log>> = getLogs() logs.map(Result::Success) .onErrorReturn(Result::Failure) 类型不匹配:推断类型是KFunction1 <@ParameterName Throwable,Result.Failure>但函数! 预计 我可以通过显式指定映射器函数返回类型来修复错误,这种类型是从推断的Result.Success类型的Result.Success类中的Result.Success ,如: logs.map({ logs -> Result.Success(logs) as Result }) 但是我想知道这个上传是否可以用一个构造函数引用,例如Result::Success as Result