iOS 13 or later • Essential for building data flows in SwiftUI • Typed Errors, no hot / cold Observable type separation • Rx operators as generic types • Supports Non-blocking Backpressure
Output> : Publisher where l { let upstream: Upstream let transform: (Upstream.Output) -> Output } // Used in `func append` / `func prepend`. struct Concatenate<Prefix, Suffix> : Publisher where l { let prefix: Prefix let suffix: Suffix } }
/// by not even wrapping with a single `Map` at all. /// (This is a `Sequence` to `Sequence` mapping function!) func map<T>(_ transform: (Elements.Element) -> T) -> Publishers.Sequence<[T], Failure> { return Publishers.Sequence( sequence: sequence.map(transform) ) } }
back pressure (New!) • Slow Subscriber can request values from fast Publisher at its own pace manually (Interactive Pull) • Initiative found since 2013 • Implemented in RxJava 2 Flowable, Akka Streams, etc • Interface is supported in Java 9 Flow API
Generic interface V.S. Protocol associatedtype • Combine has more type-safe interfaces (e.g. Demand) • Combine does not rely on subclassing (vtable) • Combine only supports backpressure-able types • More difficult for 3rd party to implement new Rx operators with backpressure support
Sequence<Elements, Failure> : Publisher where Elements : Sequence, Failure : Error { ... func receive<S: Subscriber>(subscriber: S) where l { for value in sequence where !isCancelled { subscriber.receive(value) // push inside the loop } subscriber.receive(completion: .finished) } }
Can work 1 issue per day Publishers.Sequence(infiniteIssues) .subscribe(me) // Goodbye, cruel world Immediate infinite tasks will kill me block the thread.
} // For `Buffer`. enum BufferingStrategy<Failure> where Failure : Error { case dropNewest case dropOldest } // For `CollectByTime`. enum TimeGroupingStrategy<Context> where Context : Scheduler { case byTime(Context, Context.SchedulerTimeType.Stride) case byTimeOrCount(Context, Context.SchedulerTimeType.Stride, Int) }
Subscribers.Demand = .unlimited, _ transform: @escaping (Self.Output) -> P ) -> Publishers.FlatMap<P, Self> where T == P.Output, P : Publisher, Self.Failure == P.Failure } (Almost) Same API as RxJava's flatMap(mapper, maxConcurrency, bufferSize)
Publisher where l { let upstream: Upstream let maxPublishers: Subscribers.Demand let transform: (Upstream.Output) -> NewPublisher func receive<S: Subscriber>(subscriber: S) where l { let mergeSubscriber = MergeSubscriber( upstream: upstream, maxPublishers: maxPublishers, transform: transform, downstream: subscriber ) upstream.subscribe(mergeSubscriber) } }
stream pipeline at compile time with the help of Swift type system • Backpressure • A mechanism for slow subscriber to talk to fast publisher • Conforms to Reactive Streams specification • Difficult to implement Queue-Drain model