RxJava2发布

有什么区别

ObservableTransformer { Observable.merge( it.ofType(x).compose(transformerherex), it.ofType(y).compose(transformerherey) ) } 

 ObservableTransformer { it.publish{ shared -> Observable.merge( shared.ofType(x).compose(transformerherex), shared.ofType(y).compose(transformerherey) ) } } 

当我使用这两个运行我的代码时,我得到了相同的结果。 这里发布什么?

不同之处在于,顶层变压器将从下游订购两次上游,重复上游的任何通常不需要的副作用:

 Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3) .doOnSubscribe(s -> System.out.println("Subscribed!")); mixedSource.compose(f -> Observable.merge( f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)), f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase())) ) ) .subscribe(System.out::println); 

将打印

 Subscribed! 2 3 4 Subscribed! A B C 

这里表示的副作用是打印输出Subscribed! 根据实际来源的实际工作情况,这可能意味着发送一封电子邮件两次,检索表格的两行。 通过这个特殊的例子,你可以看到,即使源值是以它们的类型交错的,输出也会分别包含它们。

相比之下, publish(Function)将为每个最终用户建立一个订阅源,因此源端的任何副作用只发生一次。

 mixedSource.publish(f -> Observable.merge( f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)), f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase())) ) ) .subscribe(System.out::println); 

打印

 Subscribed! A 2 B 3 C 4 

因为源被订阅了一次,每个项目被多播到.ofType().compose()的两个“武器”。

publish操作符将您的Observable转换为Connectable Observable

让我们来看看Connectable Observable是什么意思:假设你想多次订阅一个observable,并且想要为每个订阅者提供相同的条目。 您需要使用Connectable Observable

例:

 var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period).Publish(); observable.Connect(); observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i)); Thread.Sleep(period); observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i)); 

输出:

 first subscription : 0 first subscription : 1 second subscription : 1 first subscription : 2 second subscription : 2 

在这种情况下,我们很快就可以在第一个项目发布之前订阅,但只能在第一个订阅上订阅。 第二次订阅订阅太晚,错过了第一次发布。

我们可以移动Connect()方法的调用,直到完成所有订阅。 这样,即使对Thread.Sleep的调用,我们也不会真正订阅基础,直到两个订阅都完成为止。 这将如下完成:

 var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period).Publish(); observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i)); Thread.Sleep(period); observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i)); observable.Connect(); 

输出:

 first subscription : 0 second subscription : 0 first subscription : 1 second subscription : 1 first subscription : 2 second subscription : 2 

所以使用Completable Observable,我们有办法控制什么时候让Observable放射物品。

取自以下网页的示例: http : //www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect

编辑根据180个幻灯片在这个链接 :

发布的另一种性质是,如果任何观察者在10秒钟后开始观察开始发射物体,观察者将只获得10秒后(在订购时)发射的物体而不是全部物体。 所以在双方,我可以理解,发布是用于UI事件。 而且任何观察者都应该只接收那些在订阅了之前没有发生过的所有事件之后所执行的事件。

希望它有帮助。