//
import Foundation
import RxSwift
public final class SequenceQueue: Hashable {
public static func == (lhs: SequenceQueue, rhs: SequenceQueue) -> Bool {
lhs === rhs
}
public func hash(into hasher: inout Hasher) {
ObjectIdentifier(self).hash(into: &hasher)
}
fileprivate let queueSubject = PublishSubject<Single<Void>>()
private var current: Disposable?
private let lock = NSLock()
public init() {
purgeCurrentItem()
}
public func purgeCurrentItem() {
lock.lock(); defer { lock.unlock() }
self.current?.dispose()
self.current = queueSubject
.map {
$0.asObservable().materialize()
}
.concat()
.subscribe()
}
}
extension PrimitiveSequence where Trait == SingleTrait {
public func wait(in queue: SequenceQueue) -> Single<Element> {
let booleanDisposable = BooleanDisposable()
var connection: Disposable?
let connectable = self.asObservable().share().replay(1)
let wrapped = Single<Void>.create { observer in
guard !booleanDisposable.isDisposed else {
observer(.success(()))
return Disposables.create {}
}
connection = connectable.connect()
return Disposables.create([
connectable.map { _ in }.asSingle().subscribe(observer),
connection
].compactMap { $0 })
}
queue.queueSubject.onNext(wrapped)
return Single<Element>.create { observer in
return Disposables.create([
booleanDisposable,
connectable.asSingle().subscribe(observer),
Disposables.create {
connection?.dispose()
}
].compactMap { $0 })
}
}
}