しおメモ

雑多な技術系ブログです。ニッチな内容が多いです。

RxSwiftのmaterialize/dematerialize

他言語含めても、なかなか使われているサンプルも少ないmaterializeですが、 そんなに悪さをするオペレーターでもないので書いてみました。

こんなオペレーターもあるよ的な感じです。

materialize/dematerializeの機能

materialize/dematerializeRxSwiftに限らず、各言語のRxライブラリに存在するオペレーターです。

reactivex.io

materialize

materializenext,error,completedの各イベントを、enum(Event)に詰め直し、すべてnextとして流すObservableを生成します。

Observable<Int>Observable<Event<Int>>に変換する例を見てみます。

let hoge: Observable<Int> = Observable.create { observer in
    observer.onNext(1)
    observer.onNext(2)
    observer.onCompleted()

    return Disposables.create()
}

hoge
    .materialize()
    .debug("hoge")
    .subscribe()

この場合next2つと、completedEventにラップされ、next(Event.next(1)),next(Event.next(2)),next(Event.completed)として流れます。

hoge -> subscribed
hoge -> Event next(next(1))
hoge -> Event next(next(2))
hoge -> Event next(completed)
hoge -> Event completed
hoge -> isDisposed

元のObservableerrorcompletedが流れた時点で、materializeで変換したObservableにもcompletedが流れます。

変換後のストリームにerrorが流れることはありません。

dematerialize

dematerializematerializeの逆変換をするオペレーターで、Observable<Event<Element>>Observable<Element>に戻すことができます。

先程の例に、dematerializeを追加してみます。

hoge
    .materialize()
    .dematerialize()
    .debug("fuga")
    .subscribe()

元のObservableが復元されていることが確認できます。

fuga -> subscribed
fuga -> Event next(1)
fuga -> Event next(2)
fuga -> Event completed
fuga -> isDisposed

ありえそうな使い方

単純なエラーハンドルであれば、catchErrorcatchErrorJustReturnなどを使ったほうがシンプルにかけると思うので、 基本的には、completedが絡む処理をしたい場合に役立つことがありそうだと考えています。

イベントの種類を変えるようなエラーハンドリングをする

catchErrorなどと同様に、イベントの種類を変えるような使い方もできます。

エラーハンドルだけであれば、mapcatchErrorでも書けるのですが、 completedに対しても処理ができるので、その点ではcatchErrorより多少柔軟な変換ができます。

let hoge: Observable<Int> = Observable.create { observer in
    observer.onNext(1)
    observer.onNext(2)
    observer.onError(SomeError.recoverable)

    return Disposables.create()
}

hoge
    .materialize()
    .compactMap { event -> Event<Int>? in
        switch event {
        case .next:
            return event
        case .error(SomeError.recoverable):
            return .next(0)
        case .error(_):
            return nil
        case .completed:
            return nil
        }
    }
    .dematerialize()
    .debug("fuga")
    .subscribe()
fuga -> subscribed
fuga -> Event next(1)
fuga -> Event next(2)
fuga -> Event next(0)
fuga -> Event completed
fuga -> isDisposed

まとめて1箇所に書けるというのをメリットと捉えることもできますが、若干Rxっぽくない書き方ではあります。

各イベントのログを取る

他の言語でもまず想定されているケースですが、まとめてすべてのイベントのログを取りたいときに利用することができます。

dematerializeの後はストリームが元の状態に戻るので、そのまま続けてオペレーターを書くことができます。

observable
    .materialize()
    .do(onNext: { event in
        // ログを取る処理
    })
    .dematerialize()

next, errorだけを集めたストリームを作る

nextを集めるのはRxSwiftの場合、別のやり方もあるのですが、errorはそのままでは集めることができないので、materializeを挟むとシンプルに集めることができます。

let hoges: [Observable<Int>] = (0...5).map { _ in
    Observable.create { observer in
        observer.onNext(1)
        observer.onNext(2)
        observer.onError(SomeError.unrecoverable)

        return Disposables.create()
    }
}

Observable.merge(hoges.map { $0.materialize().compactMap { $0.element } })
    .debug("hoge")
    .subscribe()

Observable.merge(hoges.map { $0.materialize().compactMap { $0.error } })
    .debug("fuga")
    .subscribe()
hoge -> subscribed
hoge -> Event next(1)
hoge -> Event next(2)
  ...
hoge -> Event next(1)
hoge -> Event next(2)
hoge -> Event completed
hoge -> isDisposed
fuga -> subscribed
fuga -> Event next(unrecoverable)
  ...
fuga -> Event next(unrecoverable)
fuga -> Event completed
fuga -> isDisposed

この使い方は、ライブラリなどでも見かけます。

github.com

おわりに

もっとイケてる使い方があったらこっそり教えて下さい。