kotlin grpc.StreamObserver去rx.PublishSubject

每当我们声明流API时使用GRPC

rpc heartBeat(Empty) returns (stream ServiceStatus){} 

我们已经为观察者模式StreamObserver简单的接口(这是protobuf会为我们生成的)

 public interface StreamObserver<V> { void onNext(V var1); void onError(Throwable var1); void onCompleted(); } 

现在你想要做的就是将其转换为一个实际的Observable并且只有在它被传递之后才能继续使用。

 override fun heartBeat(arg: Empty): Observable<ServiceStatus> { // we create rx java subject val subject = PublishSubject.create<ServiceStatus>() // we create grpc observer and delegate all calls to rx java val observer = object : StreamObserver<ServiceStatus> { override fun onNext(value: ServiceStatus) { subject.onNext(value) } override fun onError(error: Throwable) { subject.onError(error) } override fun onCompleted() { subject.onCompleted() } } // we use grpc observer for generated api asyncStub.heartBeat(arg, observer) // but we pass rx observable (subject) to client code return subject } 

现在我是新的Kotlin,但我不能找出现有的委托功能有没有办法使StreamObserver的主题委托? 在Kotlin中写这段代码有没有更富有表现力的方法?

我将创建一个创建StreamObserver的通用方法,将其传递给其lambda参数,并将结果封装在Observable

 inline fun <T> asObservable( crossinline body: (StreamObserver<T>) -> Unit): Observable<T> { return Observable.create { subscription -> val observer = object : StreamObserver<T> { override fun onNext(value: T) { subscription.onNext(value) } override fun onError(error: Throwable) { subscription.onError(error) } override fun onCompleted() { subscription.onCompleted() } } body(observer) } } 

那么你可以用下面的方法实现RPC方法。

 override fun heartBeat(arg: Empty): Observable<ServiceStatus> = asObservable { asyncStub.heartBeat(arg, it) }