RxSwift 简介 RxSwift 是一个用于 iOS 平台的响应式编程库。它基于 ReactiveX,提供了一种优雅、简洁的方法来处理异步操作。在这篇文章中,我们将深入探讨 RxSwift 进行响应式编程的实现过程。Rx中文文档
Rx的几个核心部件的官网文档描述:
Observable :它代表一个可观察的序列,可以发出三种类型的事件:Next(携带一个元素)、Error(代表错误发生)和 Completed(代表序列完成)。Observable 是 RxSwift 的核心概念,几乎所有的操作都是基于 Observable 进行的。
创建 Observable :RxSwift 提供了多种创建 Observable 的方法,如 just
、from
、create
、interval
等。这些方法可以帮助你根据需求创建合适的 Observable。
订阅 Observable :当你创建了一个 Observable,你需要订阅它以便接收事件。你可以使用 subscribe
、subscribe(onNext:)
、subscribe(onError:)
、subscribe(onCompleted:)
等方法进行订阅。
DisposeBag :在 RxSwift 中,订阅会创建一个 Disposable 对象,用于管理订阅的生命周期。为了避免内存泄漏,你需要将 Disposable 添加到 DisposeBag 中。当 DisposeBag 被销毁时,它会自动取消所有与之关联的订阅。
操作符 :RxSwift 提供了许多操作符,用于对 Observable 进行变换、过滤、合并等操作。这些操作符是 RxSwift 的核心功能之一,可以帮助你轻松地处理复杂的事件流。一些常用的操作符有:map
、filter
、flatMap
、merge
、zip
、combineLatest
、debounce
、throttle
等。
Schedulers :Schedulers 是 RxSwift 中的调度器,用于指定 Observable 的执行线程。Schedulers 可以帮助你在不同的线程上执行任务,避免阻塞主线程。一些常用的 Schedulers 有:MainScheduler
(主线程)、ConcurrentDispatchQueueScheduler
(并发队列)等。
Subjects :Subjects 是 RxSwift 中的一种特殊类型,它们既是 Observable,又是 Observer。有四种类型的 Subjects:PublishSubject
、BehaviorSubject
、ReplaySubject
和 AsyncSubject
。它们可以作为代理或中介,在数据发送者和订阅者之间建立连接。
RxCocoa :RxCocoa 是基于 RxSwift 的一个 UI 绑定库。它提供了一些扩展方法和特殊的 Observable 类型(如 Driver、Signal),使得在 UI 层面的数据绑定和事件处理变得更加简单。 RxCocoa也需要导入RxRelay,PublishRelay和BehaviorRelay。这些 Relay 类型的主要优点是它们不会发射 onError 或 onComplete 事件,因此可以避免在某些情况下可能导致应用程序崩溃的问题。
错误处理 :RxSwift 提供了一些操作符用于处理错误,如 catch
、retry
等。这些操作符可以帮助你在处理错误时更加优雅地恢复或终止事件流。
写一个简单的发布-订阅模型 发布-订阅模型Demo1 (模仿一对多的通知模型) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 protocol ObserverType { associatedtype Element func onEvent (_ element : Element ) }class Observer <Element >: ObserverType { private let onEvent: (Element ) -> Void init (_ onEvent : @escaping (Element ) -> Void ) { self .onEvent = onEvent } func onEvent (_ element : Element ) { onEvent(element) } }protocol Disposable { func cancel () }class Observable <PreElement > { private var observers: [XXXObserverID : Observer <PreElement >] = [:] func subscribeOn (observer : Observer <PreElement >) -> Disposable { let id = XXXObserverID observers[id] = observer return Subscription { [weak self ] in self ? .observers.removeValue(forKey: id) } } func send (event : PreElement ) { for observer in observers.values { observer.onEvent(event) } } }class Subscription : Disposable { private let onCancel: () -> Void init (_ onCancel : @escaping () -> Void ) { self .onCancel = onCancel } func cancel () { onCancel() } }class DisposeBag { private var disposables: [Disposable ] = [] func add (_ disposable : Disposable ) { disposables.append(disposable) } deinit { for disposable in disposables { disposable.cancel() } } }
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ... static let observable = Observable <String >()... let disposeBag = DisposeBag ()let observer1 = Observer <String > { event in print ("Observer 1: \(event) " ) }let observer2 = Observer <String > { event in print ("Observer 2: \(event) " ) }let subscription1 = observable.subscribeOn(observer: observer1)let subscription2 = observable.subscribeOn(observer: observer2) disposeBag.add(subscription1) disposeBag.add(subscription2) observable.send(event: "Hello, world!" )
tips:Swift 中泛型的使用,如果一个 protocol 有泛型关联值,这个 protocol 是不允许通过 Protocol 声明变量类型的。另外两个泛型可以通过 where 添加泛型约束
本示例展示了一个简单的发布-订阅模型(观察者模式),类似iOS的通知机制。
通过disposeBag释放observer。
模仿Rx的流处理响应者模型 模型Demo2(模仿Rx的1对1流处理模型) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 protocol Disposable { var isDisposed: Bool { get } func dispose () }class AnonymousDisposable : Disposable { private(set) var isDisposed: Bool = false private let _disposeHandler: () -> Void init (_ disposeClosure : @escaping () -> Void ) { _disposeHandler = disposeClosure } func dispose () { isDisposed = true _disposeHandler() } }enum Event <Element > { case next(Element ) case error(Error ) case completed }protocol ObserverType { associatedtype Element func on (event : Event <Element >) }class Observer <Element >: ObserverType { private let _handler: (Event <Element >) -> Void init (handler : @escaping (Event <Element >) -> Void ) { _handler = handler } func on (event : Event <Element >) { _handler(event) } }class Observable <Element > { private let _eventGenerator: (Observer <Element >) -> Void init (_ eventGenerator : @escaping (Observer <Element >) -> Void ) { _eventGenerator = eventGenerator } func subscribe <O : ObserverType >(observer : O ) -> Disposable where O .Element == Element { let disposeObj = AnonymousDisposable () { debugPrint ("AnonymousDisposable ===" ) } _eventGenerator(Observer { (event) in guard ! disposeObj.isDisposed else { return } observer.on(event: event) switch event { case .error(_ ), .completed: disposeObj.dispose() default: break } }) return disposeObj } }
使用方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 let observable = Observable <Int > { (observer) in print ("send 0" ) observer.on(event: .next(0 )) print ("send 1" ) observer.on(event: .next(1 )) print ("send 2" ) observer.on(event: .next(2 )) print ("send 3" ) observer.on(event: .next(3 )) DispatchQueue .main.asyncAfter(deadline: .now() + 3 ) { print ("send completed" ) observer.on(event: .completed) } }let observer = Observer <Int > { (event) in switch event { case .next(let value): print ("recive \(value) " ) case .error(let error): print ("recive \(error) " ) case .completed: print ("recive completed" ) } }let disposable = observable.subscribe(observer: observer)DispatchQueue .main.asyncAfter(deadline: .now() + 2 ) { disposable.dispose() }
最需要注意的点就是,中间层observer
的应用,中间层observer
事件内部持有 disposable
对象可以根据事件类型 设置 disposable
的标志位决定事件是否应该发送给原始 observer
。可见,如果想对原始事件做加工,需要重新封装一个 observer
,这个 observer
必须引用原始observer
。
通过Sink对象,把Dispose逻辑进行封装 Demo3 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 class Sink <O : ObserverType >: Disposable { typealias Element = O .Element var observer: O private(set) var isDisposed: Bool = false init (observer : O ) { self .observer = observer } func forward (event : Event <O .Element >) { guard ! self .isDisposed else { return } self .observer.on(event: event) switch event { case .error(_ ), .completed: self .dispose() default : break } } func run (parent : Observable <Element >) { let observer = Observer <Element >(handler: forward) parent.eventGenerator(observer) } func dispose () { isDisposed = true } }class Observable <Element >: ObservableType { let eventGenerator: (Observer <Element >) -> Void init (_ eventGenerator : @escaping (Observer <Element >) -> Void ) { self .eventGenerator = eventGenerator } func subscribe <Observer : ObserverType >(observer : Observer ) -> Disposable where Observer .Element == Element { let sink = Sink (observer: observer) sink.run(parent: self ) return sink } }
tips:forward
函数被用作一个参数,传递给 Observer<O.Element>
的初始化方法。这允许 Observer
对象在接收到事件时调用 forward
函数来进行处理。这是一个在 Swift 中使用函数作为参数的典型示例。
为 Observable 添加操作符 Demo3
给一个 Observable
添加一个操作符,返回的必须还是 Observable
,以实现链式调用。
需要(Element)->RetElement
的 transform block
,加工上级传入的事件,得到的结果最后发送给下一级observer
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 extension Observable { func map <Result , Observer : ObserverType >(originObserver : Observer , _ transform : @escaping (Element ) throws -> Result ) where Observer .Element == Element -> Dispose { let mapObserver = Observer { (event) in switch event { case .next(let element): do { try originObserver.on(event: .next(transform(element))) } catch { originObserver.on(event: .error(error)) } case .error(let error): originObserver.on(event: .error(error)) case .completed: originObserver.on(event: .completed) } } let dispose = self .subscribe(observer: mapObserver) return dispose } }
以上方法,让self
也就是根observable
绑定了新创建的mapObserver
。 新创建的mapObserver
把element
转化之后,发送给原始observer
。 缺陷:不能实现链式调用,通过map
方法订阅不合适
链式调用的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 extension Observable { func map <Result >(_ transform : @escaping (Element ) throws -> Result ) -> Observable <Result > { let mapObservable = Observable <Result > { (originObserver) in let mapObserver = Observer { (event) in switch event { case .next(let element): do { try originObserver.on(event: .next(transform(element))) } catch { originObserver.on(event: .error(error)) } case .error(let error): originObserver.on(event: .error(error)) case .completed: originObserver.on(event: .completed) } } let dispose = self .subscribe(observer: mapObserver) return dispose } return mapObservable } }let observable = Observable <Int > { observer in observer.on(event: .next(0 )) }let observer = Observer <Int > { (event) in switch event { case .next(let value): print ("recive \(value) " ) case .error(let error): print ("recive \(error) " ) case .completed: print ("recive completed" ) } } observable.map { $0 * $0 }.subscribe(observer)
上面的例子把map操作写在了一坨,真实源码map方法会创建新对象MapObservable和MapSink,这么设计主要是解耦角色以及逻辑分层。
Sink 作为 Observer 的基类,实现操作符 Demo5 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class Sink <O : ObserverType >: Disposable { var isDisposed: Bool = false private let _forward: O init (forward : O ) { _forward = forward } func forward (event : Event <O .Element >) { guard ! isDisposed else { return } _forward.on(event: event) switch event { case .completed, .error(_ ): dispose() default : break } } func dispose () { isDisposed = true print ("dispose execute" ) } }
Observer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 enum Event <Element > { case next(Element ) case error(Error ) case completed }protocol ObserverType { associatedtype Element func on (event : Event <Element >) }struct Observer <Element > : ObserverType { public typealias EventHandler = (Event <Element >) -> Void private let handler: EventHandler public init <Observer : ObserverType >(_ observer : Observer ) where Observer .Element == Element { self .handler = observer.on } public init (_ handler : @escaping EventHandler ) { self .handler = handler } public func on (event : Event <Element >) { self .handler(event) } }class AnonymousObserver <O : ObserverType >: Sink <O >, ObserverType { override init (forward : O ) { super .init (forward: forward) } func on (event : Event <O .Element >) { self .forward(event: event) } }class MapObserver <Source , O : ObserverType >: Sink <O >, ObserverType { typealias Element = Source typealias Result = O .Element typealias Transform = (Source ) throws -> Result private let _transform: Transform init (forward : O , transform : @escaping Transform ) { self ._transform = transform super .init (forward: forward) } func on (event : Event <Element >) { switch event { case .next(let element): do { let mappedElement = try _transform(element) self .forward(event: .next(mappedElement)) } catch { self .forward(event: .error(error)) } case .error(let error): self .forward(event: .error(error)) self .dispose() case .completed: self .forward(event: .completed) self .dispose() } } }
MapObserver
必须实现ObserverType协议
和继承Sink
,这里需要注意,MapObserver
本事并不是一个Observer
对象,但是因为实现了ObserverType协议
可以方便的转换成Observer
对象。
Observable 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class Observable <Element > { func subscribe <Observer : ObserverType >(observer : Observer ) -> Disposable where Observer .Element == Element { rxAbstractMethod() } func rxAbstractMethod (file : StaticString = #file , line : UInt = #line ) -> Swift .Never { fatalError ("Abstract Method" , file: file, line: line) } }class AnonymousObservable <Element >: Observable <Element > { let _eventGenerator: (Observer <Element >) -> Disposable init (eventGenerator : @escaping (Observer <Element >) -> Disposable ) { self ._eventGenerator = eventGenerator } override func subscribe <O : ObserverType >(observer : O ) -> Disposable where O .Element == Element { let sink = AnonymousObserver (forward: observer) _ = self ._eventGenerator(Observer (sink)) return sink } }extension Observable { static func create (_ eventGenerator : @escaping (Observer <Element >) -> Disposable ) -> Observable <Element > { return AnonymousObservable (eventGenerator: eventGenerator) } }extension Observable { func map <Result >(_ transform : @escaping (Element ) throws -> Result ) -> Observable <Result > { return MapObservable (source: self , transform: transform) } }class MapObservable <Source , Result >: Observable <Result > { typealias Transform = (Source ) throws -> Result private let _transform: Transform private let _source: Observable <Source > init (source : Observable <Source >, transform : @escaping Transform ) { self ._source = source self ._transform = transform } override func subscribe <O : ObserverType >(observer : O ) -> Disposable where O .Element == Result { let sink = MapObserver (forward: observer, transform: self ._transform) _ = self ._source.subscribe(observer: sink) return sink } }
上面的代码模拟了真实源码,防止操作符代码写在一坨,分层设计了一下。
AnonymousObservable
: 持有eventGenerator
。
AnonymousObserver
:继承自Sink
,持有下一级Observer
,直接转发到下一级Observer
,没有除了dispose之外的加工。
MapObserver
:继承自Sink
,持有下一级Observer
,通过Transform
过滤转发。
MapObservable
:每个对象都必须引用上一级Observable
,持有Transform
用以创建对应的Observer
。
最后一步subscribe
方法,通过self._source.subscribe
层层递归到最上一级的subscribe
方法,找到AnonymousObservable
,然后通过Observer(sink)
,把sink
转换成Observer
对象,最后执行_eventGenerator(Observer(sink))
,因为 Sink
对象保存了下一级的 observertype
,_eventGenerator(Observer(sink))
内部会又从上到下一路处理event
,处理到最后一级,至此完毕。
总结 Observable Observable 是一个惰性的事件源,它持有一个 eventGenerator 闭包(通常称为订阅逻辑)。当调用 subscribe 方法时,eventGenerator 会接收一个符合 ObserverType 协议的对象,并向其发射事件。
subscribe 方法是响应式实现的核心。
Observer Observer 是事件的处理终端,实现 ObserverType 协议并定义 on(event:) 方法。它可以是:
最终的数据消费者(如 UI 更新)
中间操作符(如 map 中的转换逻辑)
Sink Sink 是订阅的中间代理,主要负责:
转发事件:将 Observable 的事件传递给 Observer
生命周期管理:通过 Disposable 模式控制资源释放,在错误或完成时自动取消订阅
线程安全:可能包含锁或队列以保证线程安全的事件传递
Sink 是单一职责原则的体现,其设计让:
observable 只关心事件产生
sink 只关心是否发送和如何发送给 observer 作为决策层
observer 只关心如何应对事件
一个订阅关系返回的就是一个 sink 对象(即 disposable 对象)。
操作符 操作符(如 map)通过返回新的 Observable 实现链式调用。
新 Observable 的 eventGenerator 会:
创建一个中间 Observer(如 MapObserver),用于转换或过滤事件
让原始 Observable 订阅该中间 Observer,形成反向绑定
将处理后的事件转发给最终的 Observer
每个操作符都有上游和下游:
上游是 observable
下游是 observer
自己是 observable
连接三者的方式是通过创建新的 observer 绑定上游,创建新的 observable(ret)绑定下游。 下游 Observer ← 【新 Observable(ret) ← 中间 Observer】 ← 上游 Observable
数据流向示意: 订阅方向:从下游到上游(下游 → ret → 中间 Observer → 上游)。 事件流动方向:从上游到下游(上游 → 中间 Observer → ret → 下游)。
惰性执行:只有下游订阅时,才会触发上游的事件生成。 资源管理:取消订阅时,能从下游逐级释放上游资源。
下篇文章,会看看iOS
中常用的一些rx
实践。