使用Kotlin的RxJava – 如何同步2个异步方法,从Java重构

我有2个集合,哪个缓冲区位置更新事件:

private List<LocationGeoEvent> mUpdateGeoEvents = new ArrayList<>(); private List<LocationRSSIEvent> mUpdateRSSIEvents = new ArrayList<>(); 

在我的代码中也有:

  private final ScheduledExecutorService mSaveDataExecutor = Executors.newSingleThreadScheduledExecutor(); private boolean mSaveDataScheduled; private final Object mEventsMonitor = new Object(); private ScheduledFuture<?> mScheduledStopLocationUpdatesFuture; private final ScheduledExecutorService mStopLocationUpdatesExecutor = Executors.newSingleThreadScheduledExecutor(); 

我添加这样的事件:

  public void appendGeoEvent(LocationGeoEvent event) { synchronized (mEventsMonitor) { mUpdateGeoEvents.add(event); scheduleSaveEvents(); } } 

RSSI事件也是如此

现在,scheduleSaveEvents方法如下所示:

  private void scheduleSaveEvents() { synchronized (mSaveDataExecutor) { if (!mSaveDataScheduled) { mSaveDataScheduled = true; mSaveDataExecutor.schedule( new Runnable() { @Override public void run() { synchronized (mSaveDataExecutor) { saveEvents(false); mSaveDataScheduled = false; } } }, 30, TimeUnit.SECONDS); } } } 

问题是,我需要同步停止更新的其他方法。 它是这样触发的:

  private void scheduleStopLocationUpdates() { synchronized (mStopLocationUpdatesExecutor) { if (mScheduledStopLocationUpdatesFuture != null) mScheduledStopLocationUpdatesFuture.cancel(true); mScheduledStopLocationUpdatesFuture = mStopLocationUpdatesExecutor.schedule( new Runnable() { @Override public void run() { synchronized (mStopLocationUpdatesExecutor) { stopLocationUpdates(); saveEvents(true); cleanAllReadingsData(); } } }, 45, TimeUnit.SECONDS); } } 

在saveEvents方法我做:

  private void saveEvents(boolean locationUpdatesAboutToStop) { synchronized (mEventsMonitor) { if (mUpdateGeoEvents.size() > 0 || mUpdateRSSIEvents.size() > 0) { //do something with the data from buffered collection arrayLists and with the boolean locationUpdatesAboutToStop mUpdateGeoEvents.clear(); mUpdateRSSIEvents.clear(); } } } 

有没有办法使用Kotlin重构这个简单的RxJava?

UPDATE

这里是我的appendRSSIevents方法:

  private fun appendRSSIEvent(event: LocationRSSIEvent) { synchronized(mEventsMonitor) { if (!shouldSkipRSSIData(event.nexoIdentifier)) { mUpdateRSSIEvents.add(event) acknowledgeDevice(event.nexoIdentifier) scheduleSaveEvents() startLocationUpdates() } else removeExpiredData() } } 

您可以缓冲两个数据流,然后将它们组合起来进行保存。 而且,您也可以使用缓冲区触发器来停止更新。

 PublishSubject<LocationGeoEvent> mUpdateGeoEventsSubject = PublishSubject.create(); PublishSubject<LocationRSSIEvent> mUpdateRSSIEventsSubject = PublishSubject.create(); public void appendGeoEvent(LocationGeoEvent event) { mUpdateGeoEventsSubject.onNext( event ); triggerSave.onNext( Boolean.TRUE ); } 

和RSS源相同。

现在我们需要触发器来驱动保存步骤。

 PublishSubject<Boolean> triggerSave = PublishSubject.create(); PublishSubject<Boolean> triggerStopAndSave = PublishSubject.create(); Observable<Boolean> normalSaveTrigger = triggerSave.debounce( 30, TimeUnit.SECONDS ); Observable<Boolean> trigger = Observable.merge( normalSaveTrigger, triggerStopAndSave ); 

当正常的保存过程触发或者我们正在停止保存时, trigger可观察的触发。

 private void saveEvents( List<LocationGeoEvent> geo, List<LocationRSSIEvent> rss, boolean locationUpdatesAboutToStop) { synchronized (mEventsMonitor) { if (geo.size() > 0 || rss.size() > 0) { //do something with the data from buffered collection arrayLists and with the boolean locationUpdatesAboutToStop } } } private void scheduleStopLocationUpdates() { stopLocationUpdates(); triggerStopAndSave.onNext( Boolean.FALSE ); cleanAllReadingsData(); } Observable.zip( mUpdateGeoEventsSubject.buffer( trigger ), mUpdateRSSIEventsSubject.buffer( trigger ), trigger, (geo, rss, trgr) -> saveEvents(geo, rss, trgr) ) .subscribe(); 

您仍然需要针对多线程和安全性进行一些调整。 第一步是将各个主题转换为SerializedSubject以便多个线程可以发出事件。

如果要使saveEvents在特定的调度程序上运行,则需要添加中间数据结构(三元组),以通过observeOn()运算符传递参数,或将observeOn()运算符应用于每个zip()参数。