[RxSwift] Mutual Sequence Exclusion

Updated
Jun 15, 2020 8:29 AM
Created
Jun 15, 2020 7:23 AM
Tags
SwiftRxSwift
Keywords
Date

//

import Foundation

import RxSwift

public final class SequenceQueue: Hashable {

  public static func == (lhs: SequenceQueue, rhs: SequenceQueue) -> Bool {
    lhs === rhs
  }

  public func hash(into hasher: inout Hasher) {
    ObjectIdentifier(self).hash(into: &hasher)
  }

  fileprivate let queueSubject = PublishSubject<Single<Void>>()

  private var current: Disposable?

  private let lock = NSLock()

  public init() {
    purgeCurrentItem()
  }

  public func purgeCurrentItem() {

    lock.lock(); defer { lock.unlock() }
    self.current?.dispose()
    self.current = queueSubject
      .map {
        $0.asObservable().materialize()
    }
    .concat()
    .subscribe()
  }
}

extension PrimitiveSequence where Trait == SingleTrait {

  public func wait(in queue: SequenceQueue) -> Single<Element> {

    let booleanDisposable = BooleanDisposable()

    var connection: Disposable?

    let connectable = self.asObservable().share().replay(1)

    let wrapped = Single<Void>.create { observer in

      guard !booleanDisposable.isDisposed else {
        observer(.success(()))
        return Disposables.create {}
      }

      connection = connectable.connect()
      return Disposables.create([
        connectable.map { _ in }.asSingle().subscribe(observer),
        connection
        ].compactMap { $0 })
    }

    queue.queueSubject.onNext(wrapped)

    return Single<Element>.create { observer in

      return Disposables.create([
        booleanDisposable,
        connectable.asSingle().subscribe(observer),
        Disposables.create {
          connection?.dispose()
        }
        ].compactMap { $0 })

    }

  }
}