Extender RxJava Observable en Kotlin con eliminación adecuada

Intento crear un nuevo operador para RxJava que funcione de manera similar a la scan , pero puede tomar dos types diferentes. Esto es similar a la implementación tradicional de fold o networkinguce donde si se da un valor pnetworkingeterminado, los arguments para la function de plegado pueden ser de diferentes types siempre que el tipo de retorno coincida con el acumulador.

Esto es lo que tengo hasta ahora.

 fun <T, R> Observable<T>.fold(acc: R, f: (R, T) -> R): Observable<R> = Observable.create { o -> var accm = acc subscribe({ accm = f(accm, it) o.onNext(accm) }, { o.onError(it) }, { o.onComplete() }) } 

Me suscribo a this dentro de un bloque Observable.create , y actualizo el observador con nuevos valores puestos a través de la function de acumulador. El problema es que la suscripción no se limpiará cuando se elimine. ¿Que debería hacer?