使用Kotlin的RxJava – 如何同步2个异步方法,从Java重构
我有2个集合,哪个缓冲区位置更新事件:
private List<LocationGeoEvent> mUpdateGeoEvents = new ArrayList<>(); private List<LocationRSSIEvent> mUpdateRSSIEvents = new ArrayList<>();
在我的代码中也有:
private final ScheduledExecutorService mSaveDataExecutor = Executors.newSingleThreadScheduledExecutor(); private boolean mSaveDataScheduled; private final Object mEventsMonitor = new Object(); private ScheduledFuture<?> mScheduledStopLocationUpdatesFuture; private final ScheduledExecutorService mStopLocationUpdatesExecutor = Executors.newSingleThreadScheduledExecutor();
我添加这样的事件:
public void appendGeoEvent(LocationGeoEvent event) { synchronized (mEventsMonitor) { mUpdateGeoEvents.add(event); scheduleSaveEvents(); } }
RSSI事件也是如此
现在,scheduleSaveEvents方法如下所示:
private void scheduleSaveEvents() { synchronized (mSaveDataExecutor) { if (!mSaveDataScheduled) { mSaveDataScheduled = true; mSaveDataExecutor.schedule( new Runnable() { @Override public void run() { synchronized (mSaveDataExecutor) { saveEvents(false); mSaveDataScheduled = false; } } }, 30, TimeUnit.SECONDS); } } }
问题是,我需要同步停止更新的其他方法。 它是这样触发的:
private void scheduleStopLocationUpdates() { synchronized (mStopLocationUpdatesExecutor) { if (mScheduledStopLocationUpdatesFuture != null) mScheduledStopLocationUpdatesFuture.cancel(true); mScheduledStopLocationUpdatesFuture = mStopLocationUpdatesExecutor.schedule( new Runnable() { @Override public void run() { synchronized (mStopLocationUpdatesExecutor) { stopLocationUpdates(); saveEvents(true); cleanAllReadingsData(); } } }, 45, TimeUnit.SECONDS); } }
在saveEvents方法我做:
private void saveEvents(boolean locationUpdatesAboutToStop) { synchronized (mEventsMonitor) { if (mUpdateGeoEvents.size() > 0 || mUpdateRSSIEvents.size() > 0) { //do something with the data from buffered collection arrayLists and with the boolean locationUpdatesAboutToStop mUpdateGeoEvents.clear(); mUpdateRSSIEvents.clear(); } } }
有没有办法使用Kotlin重构这个简单的RxJava?
UPDATE
这里是我的appendRSSIevents方法:
private fun appendRSSIEvent(event: LocationRSSIEvent) { synchronized(mEventsMonitor) { if (!shouldSkipRSSIData(event.nexoIdentifier)) { mUpdateRSSIEvents.add(event) acknowledgeDevice(event.nexoIdentifier) scheduleSaveEvents() startLocationUpdates() } else removeExpiredData() } }
您可以缓冲两个数据流,然后将它们组合起来进行保存。 而且,您也可以使用缓冲区触发器来停止更新。
PublishSubject<LocationGeoEvent> mUpdateGeoEventsSubject = PublishSubject.create(); PublishSubject<LocationRSSIEvent> mUpdateRSSIEventsSubject = PublishSubject.create(); public void appendGeoEvent(LocationGeoEvent event) { mUpdateGeoEventsSubject.onNext( event ); triggerSave.onNext( Boolean.TRUE ); }
和RSS源相同。
现在我们需要触发器来驱动保存步骤。
PublishSubject<Boolean> triggerSave = PublishSubject.create(); PublishSubject<Boolean> triggerStopAndSave = PublishSubject.create(); Observable<Boolean> normalSaveTrigger = triggerSave.debounce( 30, TimeUnit.SECONDS ); Observable<Boolean> trigger = Observable.merge( normalSaveTrigger, triggerStopAndSave );
当正常的保存过程触发或者我们正在停止保存时, trigger
可观察的触发。
private void saveEvents( List<LocationGeoEvent> geo, List<LocationRSSIEvent> rss, boolean locationUpdatesAboutToStop) { synchronized (mEventsMonitor) { if (geo.size() > 0 || rss.size() > 0) { //do something with the data from buffered collection arrayLists and with the boolean locationUpdatesAboutToStop } } } private void scheduleStopLocationUpdates() { stopLocationUpdates(); triggerStopAndSave.onNext( Boolean.FALSE ); cleanAllReadingsData(); } Observable.zip( mUpdateGeoEventsSubject.buffer( trigger ), mUpdateRSSIEventsSubject.buffer( trigger ), trigger, (geo, rss, trgr) -> saveEvents(geo, rss, trgr) ) .subscribe();
您仍然需要针对多线程和安全性进行一些调整。 第一步是将各个主题转换为SerializedSubject
以便多个线程可以发出事件。
如果要使saveEvents
在特定的调度程序上运行,则需要添加中间数据结构(三元组),以通过observeOn()
运算符传递参数,或将observeOn()
运算符应用于每个zip()
参数。