Upgrade to Pro — share decks privately, control downloads, hide ads and more …

RxBlocking Deep Dive

RxBlocking Deep Dive

Otemachi.swift #02で行ったトークのスライドです。
RxBlockingがスレッドをブロックする仕組みについて解説しています。

関連記事をQiitaにて公開しています。
https://qiita.com/takehilo/items/be30a730599ac0774137

Otemachi.swift #02
https://nikkei.connpass.com/event/98887/

Takehiro Kaneko

October 15, 2018
Tweet

More Decks by Takehiro Kaneko

Other Decks in Programming

Transcript

  1. ඇಉظίʔυͷςετΛָʹॻͨ͘ΊͷϥΠϒϥϦ • toBlocking() • BlockingObservableܕʹม׵͢Δ • toArray() • ΧϨϯτεϨουΛϒϩοΫ͠ɺObservableΛαϒεΫϥΠϒ͠ɺશ ͯͷΠϕϯτΛड৴ͨ͠Βͦͷશͯͷཁૉฦͯ͠ϒϩοΫΛղআ͢Δ

    // ඇಉظʹΠϕϯτ͕ൃߦ͞ΕΔObservable let asyncObservable = Observable.of(10, 20, 30) .observeOn(SerialDispatchQueueScheduler(qos: .background)) let elements = try! asyncObservable.toBlocking().toArray() expect(elements).to(equal([10, 20, 30]))
  2. ඇಉظίʔυͷςετΛָʹॻͨ͘ΊͷϥΠϒϥϦ • toBlocking() • BlockingObservableܕʹม׵͢Δ • toArray() • ΧϨϯτεϨουΛϒϩοΫ͠ɺObservableΛαϒεΫϥΠϒ͠ɺશ ͯͷΠϕϯτΛड৴ͨ͠Βͦͷશͯͷཁૉฦͯ͠ϒϩοΫΛղআ͢Δ

    // ඇಉظʹΠϕϯτ͕ൃߦ͞ΕΔObservable let asyncObservable = Observable.of(10, 20, 30) .observeOn(SerialDispatchQueueScheduler(qos: .background)) let elements = try! asyncObservable.toBlocking().toArray() expect(elements).to(equal([10, 20, 30])) ͜͜Λਂ͘۷ͬͯ ͍͚͹Αͦ͞͏ʂ
  3. extension BlockingObservable { public func toArray() throws -> [E] {

    let results = materializeResult() return try elementsOrThrow(results) } } ͞Βʹਂ͘
  4. extension BlockingObservable { fileprivate func materializeResult(max: Int? = nil, predicate:

    @escaping (E) throws -> Bool = { _ in true }) -> MaterializedSequenceResult<E> { var elements: [E] = Array<E>() var error: Swift.Error? let lock = RunLoopLock(timeout: timeout) let d = SingleAssignmentDisposable() defer { d.dispose() } lock.dispatch { let subscription = self.source.subscribe { event in if d.isDisposed { return } switch event { case .next(let element): do { if try predicate(element) { elements.append(element) } if let max = max, elements.count >= max { d.dispose() lock.stop() } } catch (let err) { error = err d.dispose() lock.stop() } case .error(let err): error = err d.dispose() lock.stop() case .completed: d.dispose() lock.stop() } } d.setDisposable(subscription) } do { try lock.run() } catch (let err) { error = err } if let error = error { return MaterializedSequenceResult.failed(elements: elements, error: error) } return MaterializedSequenceResult.completed(elements: elements) } }
  5. func materializeResult() -> MaterializedSequenceResult<E> { let lock = RunLoopLock(timeout: timeout)

    lock.dispatch { /*શͯͷΠϕϯτΛऔಘͨ͠Β࣮ߦϧʔϓΛऴྃ͢Δ*/ } lock.run() return MaterializedSequenceResult.completed(elements: elements) } ௕͍ͷͰϙΠϯτ ͱͳΔ෦෼͚ͩൈਮ
  6. func materializeResult() -> MaterializedSequenceResult<E> { let lock = RunLoopLock(timeout: timeout)

    lock.dispatch { /*શͯͷΠϕϯτΛऔಘͨ͠Β࣮ߦϧʔϓΛऴྃ͢Δ*/ } lock.run() return MaterializedSequenceResult.completed(elements: elements) } ࣮ߦϧʔϓΛ ੍ޚ͢ΔΫϥε
  7. func materializeResult() -> MaterializedSequenceResult<E> { let lock = RunLoopLock(timeout: timeout)

    lock.dispatch { /*શͯͷΠϕϯτΛऔಘͨ͠Β࣮ߦϧʔϓΛऴྃ͢Δ*/ } lock.run() return MaterializedSequenceResult.completed(elements: elements) } ࣮ߦϧʔϓ಺ͷΩϡʔʹ ΫϩʔδϟΛొ࿥
  8. func materializeResult() -> MaterializedSequenceResult<E> { let lock = RunLoopLock(timeout: timeout)

    lock.dispatch { /*શͯͷΠϕϯτΛऔಘͨ͠Β࣮ߦϧʔϓΛऴྃ͢Δ*/ } lock.run() return MaterializedSequenceResult.completed(elements: elements) } ࣮ߦϧʔϓΛ։࢝ ͜͜ͰεϨου͕ϒϩοΫ͞ΕΔ
  9. func materializeResult() -> MaterializedSequenceResult<E> { let lock = RunLoopLock(timeout: timeout)

    lock.dispatch { /*શͯͷΠϕϯτΛऔಘͨ͠Β࣮ߦϧʔϓΛऴྃ͢Δ*/ } lock.run() return MaterializedSequenceResult.completed(elements: elements) } 0CTFSWBCMF͕ൃߦͨ͠ શΠϕϯτΛฦ͢
  10. ͜ΜͳΠϝʔδ εϨου lock.dispatch {} Ωϡʔ ࣮ߦϧʔϓ lock.run() return start stop

    process શͯͷΠϕϯτΛऔಘͨ͠Β ࣮ߦϧʔϓΛऴྃ͢Δ
  11. func dispatch(_ action: @escaping () -> ()) { CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw)

    { action() } } func stop() { CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw) { CFRunLoopStop(self._currentRunLoop) } } func run() throws { CFRunLoopRun() } ͦΕͧΕϙΠϯτ͚ͩൈਮ
  12. func dispatch(_ action: @escaping () -> ()) { CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw)

    { action() } } func stop() { CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw) { CFRunLoopStop(self._currentRunLoop) } } func run() throws { CFRunLoopRun() } ࣮ߦϧʔϓ಺ͷΩϡʔʹ ΫϩʔδϟΛొ࿥
  13. func dispatch(_ action: @escaping () -> ()) { CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw)

    { action() } } func stop() { CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw) { CFRunLoopStop(self._currentRunLoop) } } func run() throws { CFRunLoopRun() } ࣮ߦϧʔϓΛऴྃ͢Δ
  14. func dispatch(_ action: @escaping () -> ()) { CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw)

    { action() } } func stop() { CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw) { CFRunLoopStop(self._currentRunLoop) } } func run() throws { CFRunLoopRun() } ࣮ߦϧʔϓΛ։࢝͢Δ $'3VO-PPQ4UPQ ͕ݺ͹ΕΔ·Ͱ ࣮ߦ͠ଓ͚Δ
  15. func dispatch(_ action: @escaping () -> ()) { CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw)

    { action() } } func stop() { CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw) { CFRunLoopStop(self._currentRunLoop) } } func run() throws { CFRunLoopRun() } ͍͕ͭ͜εϨουΛϒϩοΫ͢Δ ࢓૊Έͷਖ਼ମʂʂ