Tag: 反应式编程

如何在VertX中从工厂创建Verticle?

我正在开发一个Web服务使用vertx网络,我正在使用垂直任何数据库操作。 我有几个verticle,每个对应一个域。 我怎样才能使用VerticleFactory而不是手动实例化Verticle类? 什么是正确的方法? 我研究了一些文档,发现有一些Verticle服务工厂使用配置和构建系统来创建主要的verticile。 我有一个WebVerticle为主要服务。 但是,我的存储库模式垂直访问数据库呢? 我应该如何实例化它们?

RxJava在Flowable和Observable之间使用Window和Groupby的不同输出

我使用的是RxJava2,代码如下: val whitespaceRegex = Regex("\\s+") val queryRegex = Regex("query=([^&]+)", RegexOption.IGNORE_CASE) val dateTimeFormatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME @JvmStatic fun main(args: Array<String>) { val cnt = AtomicLong() val templateStr = "|date| /ignored/ query=|query|" val random = ThreadLocalRandom.current() var curDate = ZonedDateTime.of(LocalDate.of(2016, Month.JANUARY, 1), LocalTime.MIDNIGHT, ZoneId.of("UTC")) val generator = Flowable.generate<String> { emitter -> // normally these are read from a […]

使用Flux而不是for循环,有什么好处?

我试图让我的脑袋围绕反应式编程,所以我想问问在这里使用Flux是否有任何好处: override fun notifyObserversOnMessage(message: Message) { Flux.fromStream(observers.stream()) .map { observer -> Mono.just(observer.reactOnMessage(message)) } .subscribe() } 代替: override fun notifyObserversOnMessage(message: Message) { for (observer in observers) { observer.reactOnMessage(message) } } 这是否取决于每个观察员正在做的工作,如果这是IO还是不?

在Rx中设置调度程序的顺序

我对Rx很新,只是想知道subscribeOn的顺序如何影响Observable //This will not print anything Observable.just("whatever") .flatMap { s -> Observable.just(s.length) } .subscribeOn(Schedulers.newThread()) .subscribe(::println) //This prints the length Observable.just("whatever") .subscribeOn(Schedulers.newThread()) .flatMap { s -> Observable.just(s.length) } .subscribe(::println) 发生了什么事情,为什么?

用observable替换每个double

我有两个列表: List<Foo>和List<Bar> 。 对于List<Foo>每个项目,我必须检查List<Bar>所有项目,并且如果匹配,则设置一个值。 今天我使用这个代码: fooList.forEach { foo -> barList.forEach { bar -> if(foo.id == bar.id) { bar.name = foo.name } } } return barList 我想改变它,因为现在我有一个Observable<Foo>和一个Observable<Bar> 。 我怎样才能取代我的代码来使用反应式编程?

用rxJava和Retrofit重复请求登录表单

我想在rxJava上进行登录表单并进行改造。 但是,如果我在请求服务器时出错,取消订阅从rx的底部调用 我有使用rxAndroidBinding lib实现的ui方法 fun validEmail(): Observable<CharSequence> //last well formated login fun validPassword(): Observable<CharSequence> //last password of length fun clicks(): Observable<Unit> //clicks on login buttons 我已经通过Retrofit fun authorize(email:String, pass:String): Observable<Unit>实现了注册方法fun authorize(email:String, pass:String): Observable<Unit> 如果两个输入都有效,并且点击登录按钮,我想提出请求 “` val validPair = rx.Observable.combineLatest(iView.validEmail(), iView.validPassword(), ::ValidLoginPair) .doOnNext { iView.setLoginButtonEnabled(true) } subscription = rx.Observable.combineLatest(validPair, iView.clicks(), { pair, unit -> pair }) […]

我怎样才能暂停一个事件流经一个可观察的?

我正在尝试创建一个可以暂停的Observable,这样一来,物品就会停止通过可观察物体,直到它变为未被暂停为止。 在这一点上,我希望它恢复处理所有未处理的项目。 我的数据源来自课堂之外,所以我现在看起来像这样: class Agent { val publisher = PublishSubject.create<Event>() val subscription = createSubscription() fun trackEvent(e: Event) { publisher.onNext(e) } fun pause() { // ??? } fun resume() { // ??? } private fun createSubscription(): Subscription { return publisher .map { stringify(it) } .buffer(10L, TimeUnit.SECONDS, 500) // capture 500 events or 10 seconds worth, whichever […]

RxJava – 随时接受更多观测数据的合并Observable?

我遇到需要一个Observable实现,它持有一个或多个Observable并将它们合并。 但是这里有一个启发:我想随时添加更多的可观察对象,我想这也可以支持将它们移除。 为了真正有效,所有订阅者都必须接收来自新订阅者的通知,这些订阅者是在订阅后添加的。 除非所有合并的Observable都很冷,并调用onComplete() ,那么我想可以让订阅取消订阅,即使添加了更多的Observable。 这是更多的合并多个无限的热Observables,并能够在任何时候添加更多。 MergableObservable<MyEvent> allSources = new MergableObservable<>(); //later in application Observable<MyEvent> eventSource1 = … allSources.add(eventSource1); //and later again Observable<MyEvent> eventSource2 = … allSources.add(eventSource2 ); //and so on Observable<MyEvent> eventSource3 = … allSources.add(eventSource3); 我知道有合并操作符,但我需要一个可变的结构。 我是否缺少已经存在的东西? 除非这种情况绝对适合,否则我宁愿不使用主题。

以递归方式将Rx选项组合成Obserbles

比方说,我有一个名为s_0的Single ,它可以从T类型发出一个元素t_0 ,也可以失败(在某些语言中,这可能是Single<T> )。 那是: s_0: — t_0 // Success OR s_0: — X // Failure T类型的实例有一个next()方法,它返回T类型的一个可选的Single ( Kotlin中的Single<T>? )。 这种行为导致一个能够发射一个T实例链的Single实例链,其中每个单独的s_i可以发出一个能够返回下一个单独的s_i+1的元素t_i+1 ,这将发出一个元素t_i+1等等,直到最后一个元素t_n-1没有返回单个或任何单数失败: s_0: — t_0 ↓ s_1: — t_1 ↓ s_2: — t_2 … ↓ s_n-1: — t_n-1 ↓ null OR s_0: — t_0 ↓ s_1: — t_1 ↓ s_2: — t_2 … ↓ s_i: […]

Spring Web Flux(反应式)功能路由与Kotlin无法正常工作

你好有兴趣在Kotlin编写Spring应用程序的人。 我正在玩Spring Boot 2.0.0快照和spring-webflux 。 这段代码: @Component class TestRouter() : RouterFunction<ServerResponse> { override fun route(request: ServerRequest) = route(request) { "/".route { GET("/hello") { ServerResponse.ok().body(BodyInserters.fromObject("World")) } "/{id}".route { GET("/hello") { ServerResponse.ok().body(BodyInserters.fromObject("World ${request.pathVariable("id")}")) } } } } } 不按预期工作(至少如我所料:)) ➜ ~ curl -i http://localhost:8080/hello HTTP/1.1 200 OK transfer-encoding: chunked Content-Type: text/plain;charset=UTF-8 World 但: ➜ ~ curl -i […]