RxSwift 简介

RxSwift 是一个用于 iOS 平台的响应式编程库。它基于 ReactiveX,提供了一种优雅、简洁的方法来处理异步操作。在这篇文章中,我们将深入探讨 RxSwift 进行响应式编程的实现过程。
Rx中文文档

Rx的几个核心部件的官网文档描述:

  1. Observable:它代表一个可观察的序列,可以发出三种类型的事件:Next(携带一个元素)、Error(代表错误发生)和 Completed(代表序列完成)。Observable 是 RxSwift 的核心概念,几乎所有的操作都是基于 Observable 进行的。

  2. 创建 Observable:RxSwift 提供了多种创建 Observable 的方法,如 justfromcreateinterval 等。这些方法可以帮助你根据需求创建合适的 Observable。

  3. 订阅 Observable:当你创建了一个 Observable,你需要订阅它以便接收事件。你可以使用 subscribesubscribe(onNext:)subscribe(onError:)subscribe(onCompleted:) 等方法进行订阅。

  4. DisposeBag:在 RxSwift 中,订阅会创建一个 Disposable 对象,用于管理订阅的生命周期。为了避免内存泄漏,你需要将 Disposable 添加到 DisposeBag 中。当 DisposeBag 被销毁时,它会自动取消所有与之关联的订阅。

  5. 操作符:RxSwift 提供了许多操作符,用于对 Observable 进行变换、过滤、合并等操作。这些操作符是 RxSwift 的核心功能之一,可以帮助你轻松地处理复杂的事件流。一些常用的操作符有:mapfilterflatMapmergezipcombineLatestdebouncethrottle 等。

  6. Schedulers:Schedulers 是 RxSwift 中的调度器,用于指定 Observable 的执行线程。Schedulers 可以帮助你在不同的线程上执行任务,避免阻塞主线程。一些常用的 Schedulers 有:MainScheduler(主线程)、ConcurrentDispatchQueueScheduler(并发队列)等。

  7. Subjects:Subjects 是 RxSwift 中的一种特殊类型,它们既是 Observable,又是 Observer。有四种类型的 Subjects:PublishSubjectBehaviorSubjectReplaySubjectAsyncSubject。它们可以作为代理或中介,在数据发送者和订阅者之间建立连接。

  8. RxCocoa:RxCocoa 是基于 RxSwift 的一个 UI 绑定库。它提供了一些扩展方法和特殊的 Observable 类型(如 Driver、Signal),使得在 UI 层面的数据绑定和事件处理变得更加简单。 RxCocoa也需要导入RxRelay,PublishRelay和BehaviorRelay。这些 Relay 类型的主要优点是它们不会发射 onError 或 onComplete 事件,因此可以避免在某些情况下可能导致应用程序崩溃的问题。

  9. 错误处理:RxSwift 提供了一些操作符用于处理错误,如 catchretry 等。这些操作符可以帮助你在处理错误时更加优雅地恢复或终止事件流。

写一个简单的发布-订阅模型

发布-订阅模型Demo1 (模仿一对多的通知模型)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 观察者协议
protocol ObserverType {
associatedtype Element

func onEvent(_ element: Element)
}

class Observer<Element>: ObserverType {
private let onEvent: (Element) -> Void

init(_ onEvent: @escaping (Element) -> Void) {
self.onEvent = onEvent
}

func onEvent(_ element: Element) {
onEvent(element)
}
}

// 可被取消的订阅协议
protocol Disposable {
func cancel()
}

// 发布者协议
// 遵循 ObservableType 的类或结构体
class Observable<PreElement> {
private var observers: [XXXObserverID: Observer<PreElement>] = [:]

func subscribeOn(observer: Observer<PreElement>) -> Disposable {
let id = XXXObserverID
observers[id] = observer
return Subscription { [weak self] in
self?.observers.removeValue(forKey: id)
}
}

// 发送事件给所有订阅者
func send(event: PreElement) {
for observer in observers.values {
observer.onEvent(event)
}
}
}

// 订阅关系类
class Subscription: Disposable {
private let onCancel: () -> Void

init(_ onCancel: @escaping () -> Void) {
self.onCancel = onCancel
}

func cancel() {
onCancel()
}
}

// DisposeBag 类,用于管理订阅资源
class DisposeBag {
private var disposables: [Disposable] = []

func add(_ disposable: Disposable) {
disposables.append(disposable)
}

deinit {
for disposable in disposables {
disposable.cancel()
}
}
}

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
...
static let observable = Observable<String>()
...

let disposeBag = DisposeBag()
let observer1 = Observer<String> { event in
print("Observer 1: \(event)")
}
let observer2 = Observer<String> { event in
print("Observer 2: \(event)")
}

let subscription1 = observable.subscribeOn(observer: observer1)
let subscription2 = observable.subscribeOn(observer: observer2)
disposeBag.add(subscription1)
disposeBag.add(subscription2)

// 当 DisposeBag 实例被销毁时,所有订阅关系将自动取消
observable.send(event: "Hello, world!")

tips:Swift 中泛型的使用,如果一个 protocol 有泛型关联值,这个 protocol 是不允许通过 Protocol 声明变量类型的。另外两个泛型可以通过 where 添加泛型约束

  1. 本示例展示了一个简单的发布-订阅模型(观察者模式),类似iOS的通知机制。
  2. 通过disposeBag释放observer。

模仿Rx的流处理响应者模型

模型Demo2(模仿Rx的1对1流处理模型)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
protocol Disposable {
var isDisposed: Bool { get }
func dispose()
}

/// 封装了一个标识位以及一个dispose的handler
/// 如果不需要handler,只用一个标志位就行
class AnonymousDisposable: Disposable {
// 判断是否已销毁(取消订阅)的标志位
private(set) var isDisposed: Bool = false

// AnonymousDisposable 封装了 取消订阅 的闭包
private let _disposeHandler: () -> Void

init(_ disposeClosure: @escaping () -> Void) {
_disposeHandler = disposeClosure
}

func dispose() {
isDisposed = true
_disposeHandler()
}
}

// Event
enum Event<Element> {
case next(Element)
case error(Error)
case completed
}

protocol ObserverType {
associatedtype Element
func on(event: Event<Element>)
}

// Observer保存的就是接受 element 的执行代码块
// 操作符和 dispose 中间层都是靠包一个 observer 中间层实现的
class Observer<Element>: ObserverType {
// 处理事件的闭包
private let _handler: (Event<Element>) -> Void
init(handler: @escaping (Event<Element>) -> Void) {
_handler = handler
}
// 实现 监听事件 的协议,内部处理事件
func on(event: Event<Element>) {
// 处理事件
_handler(event)
}
}

// Observable保存的就是数据序列的发生器 block
// 该 block 接收observer 作为参数,把 element 序列发送给 observer
class Observable<Element> {
// 定义 发布事件 的闭包
private let _eventGenerator: (Observer<Element>) -> Void

init(_ eventGenerator: @escaping (Observer<Element>) -> Void) {
_eventGenerator = eventGenerator
}

// 实现 订阅操作 的协议
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
let disposeObj = AnonymousDisposable() {
debugPrint("AnonymousDisposable ===")
}
// 通过一个中间 Observer 对原始 Observer 进行封装,用于过滤事件的传递。
_eventGenerator(Observer { (event) in
guard !disposeObj.isDisposed else { return }
// 事件传递给原始 observer
observer.on(event: event)
// 通过 composite 管理 error、completed 时,自动取消订阅
switch event {
case .error(_), .completed:
disposeObj.dispose()
default:
break
}
})
return disposeObj
}
}

使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
let observable = Observable<Int> { (observer) in
print("send 0")
observer.on(event: .next(0))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("send completed")
observer.on(event: .completed)
}
}

let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}

let disposable = observable.subscribe(observer: observer)

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
disposable.dispose()
}

最需要注意的点就是,中间层observer 的应用,中间层observer 事件内部持有 disposable 对象可以根据事件类型 设置 disposable 的标志位决定事件是否应该发送给原始 observer。可见,如果想对原始事件做加工,需要重新封装一个 observer,这个 observer 必须引用原始observer

通过Sink对象,把Dispose逻辑进行封装 Demo3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Sink<O: ObserverType>: Disposable {
typealias Element = O.Element
var observer: O
private(set) var isDisposed: Bool = false

init(observer: O) {
self.observer = observer
}

func forward(event: Event<O.Element>) {
guard !self.isDisposed else { return }
self.observer.on(event: event)
switch event {
case .error(_), .completed:
self.dispose()
default:
break
}
}

// 相当于原始Observable订阅了新的Observer
func run(parent: Observable<Element>) {
let observer = Observer<Element>(handler: forward)
parent.eventGenerator(observer)
}

func dispose() {
isDisposed = true
}
}

class Observable<Element>: ObservableType {
// 定义 发布事件 的闭包
let eventGenerator: (Observer<Element>) -> Void

init(_ eventGenerator: @escaping (Observer<Element>) -> Void) {
self.eventGenerator = eventGenerator
}

// 实现 订阅操作 的协议,ObserverType 为泛型约束
func subscribe<Observer: ObserverType>(observer: Observer) -> Disposable where Observer.Element == Element {
let sink = Sink(observer: observer)
sink.run(parent: self)
return sink
}
}

tips:forward 函数被用作一个参数,传递给 Observer<O.Element> 的初始化方法。这允许 Observer 对象在接收到事件时调用 forward 函数来进行处理。这是一个在 Swift 中使用函数作为参数的典型示例。

为 Observable 添加操作符 Demo3

  1. 给一个 Observable 添加一个操作符,返回的必须还是 Observable,以实现链式调用。
  2. 需要(Element)->RetElementtransform block,加工上级传入的事件,得到的结果最后发送给下一级observer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
extension Observable {
func map<Result, Observer: ObserverType>(originObserver: Observer, _ transform: @escaping (Element) throws -> Result) where Observer.Element == Element -> Dispose {
let mapObserver = Observer { (event) in
switch event {
case .next(let element):
do {
try originObserver.on(event: .next(transform(element)))
} catch {
originObserver.on(event: .error(error))
}
case .error(let error):
originObserver.on(event: .error(error))
case .completed:
originObserver.on(event: .completed)
}
}
// subscribe:绑定发布者和订阅者
let dispose = self.subscribe(observer: mapObserver)
return dispose
}
}

以上方法,让self也就是根observable绑定了新创建的mapObserver。 新创建的mapObserverelement转化之后,发送给原始observer
缺陷:不能实现链式调用,通过map方法订阅不合适

链式调用的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
extension Observable {
func map<Result>(_ transform: @escaping (Element) throws -> Result) -> Observable<Result> {
// 初始化新的observable
let mapObservable = Observable<Result> { (originObserver) in
let mapObserver = Observer { (event) in
switch event {
case .next(let element):
do {
try originObserver.on(event: .next(transform(element)))
} catch {
originObserver.on(event: .error(error))
}
case .error(let error):
originObserver.on(event: .error(error))
case .completed:
originObserver.on(event: .completed)
}
}
// subscribe:绑定原始发布者和mapObserver
let dispose = self.subscribe(observer: mapObserver)
return dispose
}
return mapObservable
}
}

// 使用步骤
let observable = Observable<Int> { observer in
observer.on(event: .next(0))
}
let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}

observable.map { $0*$0 }.subscribe(observer)

上面的例子把map操作写在了一坨,真实源码map方法会创建新对象MapObservable和MapSink,这么设计主要是解耦角色以及逻辑分层。

Sink 作为 Observer 的基类,实现操作符 Demo5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Sink<O: ObserverType>: Disposable {
var isDisposed: Bool = false
private let _forward: O

init(forward: O) {
_forward = forward
}

func forward(event: Event<O.Element>) {
guard !isDisposed else { return }
// 事件传递给原始 observer
_forward.on(event: event)
// 通过 composite 管理 error、completed 时,自动取消订阅
switch event {
case .completed, .error(_):
dispose()
default:
break
}
}

func dispose() {
isDisposed = true
print("dispose execute")
}
}

Observer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
enum Event<Element> {
case next(Element)
case error(Error)
case completed
}

protocol ObserverType {
associatedtype Element
func on(event: Event<Element>)
}

/// 创建接收一个ObserverType的初始化方法
struct Observer<Element> : ObserverType {
public typealias EventHandler = (Event<Element>) -> Void
private let handler: EventHandler
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.handler = observer.on
}
public init(_ handler: @escaping EventHandler) {
self.handler = handler
}
public func on(event: Event<Element>) {
self.handler(event)
}
}

// 作为基类
class AnonymousObserver<O: ObserverType>: Sink<O>, ObserverType {
// forward 为原始订阅者
override init(forward: O) {
super.init(forward: forward)
}

func on(event: Event<O.Element>) {
self.forward(event: event)
}
}

class MapObserver<Source, O: ObserverType>: Sink<O>, ObserverType {
typealias Element = Source
typealias Result = O.Element
typealias Transform = (Source) throws -> Result
private let _transform: Transform

init(forward: O, transform: @escaping Transform) { // forward 为原始订阅者
self._transform = transform
super.init(forward: forward)
}

func on(event: Event<Element>) {
// 对原始事件进行 map 转换,对结果进行转发
switch event {
case .next(let element):
do {
let mappedElement = try _transform(element)
self.forward(event: .next(mappedElement))
} catch {
self.forward(event: .error(error))
}
case .error(let error):
self.forward(event: .error(error))
self.dispose()
case .completed:
self.forward(event: .completed)
self.dispose()
}
}
}

MapObserver必须实现ObserverType协议和继承Sink,这里需要注意,MapObserver本事并不是一个Observer对象,但是因为实现了ObserverType协议可以方便的转换成Observer对象。

Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class Observable<Element> {
func subscribe<Observer: ObserverType>(observer: Observer) -> Disposable where Observer.Element == Element {
rxAbstractMethod()
}
func rxAbstractMethod(file: StaticString = #file, line: UInt = #line) -> Swift.Never {
fatalError("Abstract Method", file: file, line: line)
}
}

class AnonymousObservable<Element>: Observable<Element> {
// 只有匿名Observable持有生成器闭包
let _eventGenerator: (Observer<Element>) -> Disposable

init(eventGenerator: @escaping (Observer<Element>) -> Disposable) {
self._eventGenerator = eventGenerator
}

override func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
// 订阅发生时,生成一个中间订阅者 AnonymousObserver 来订阅事件,该事件可以是被层层转发到这里的也可以是原始事件本身
let sink = AnonymousObserver(forward: observer)
_ = self._eventGenerator(Observer(sink))
return sink
}
}

extension Observable {
static func create(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(eventGenerator: eventGenerator)
}
}

extension Observable {
func map<Result>(_ transform: @escaping (Element) throws -> Result) -> Observable<Result> {
return MapObservable(source: self, transform: transform)
}
}

class MapObservable<Source, Result>: Observable<Result> {
typealias Transform = (Source) throws -> Result
private let _transform: Transform
private let _source: Observable<Source>

init(source: Observable<Source>, transform: @escaping Transform) {
self._source = source
self._transform = transform
}

override func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Result {
// 订阅发生时调用
let sink = MapObserver(forward: observer, transform: self._transform)
_ = self._source.subscribe(observer: sink)
return sink
}
}

上面的代码模拟了真实源码,防止操作符代码写在一坨,分层设计了一下。

  1. AnonymousObservable: 持有eventGenerator
  2. AnonymousObserver:继承自Sink,持有下一级Observer,直接转发到下一级Observer,没有除了dispose之外的加工。
  3. MapObserver:继承自Sink,持有下一级Observer,通过Transform过滤转发。
  4. MapObservable:每个对象都必须引用上一级Observable,持有Transform用以创建对应的Observer
  5. 最后一步subscribe方法,通过self._source.subscribe层层递归到最上一级的subscribe方法,找到AnonymousObservable,然后通过Observer(sink),把sink转换成Observer对象,最后执行_eventGenerator(Observer(sink)),因为 Sink 对象保存了下一级的 observertype_eventGenerator(Observer(sink))内部会又从上到下一路处理event,处理到最后一级,至此完毕。

总结

Observable

Observable 是一个惰性的事件源,它持有一个 eventGenerator 闭包(通常称为订阅逻辑)。当调用 subscribe 方法时,eventGenerator 会接收一个符合 ObserverType 协议的对象,并向其发射事件。

subscribe 方法是响应式实现的核心。

Observer

Observer 是事件的处理终端,实现 ObserverType 协议并定义 on(event:) 方法。它可以是:

  • 最终的数据消费者(如 UI 更新)
  • 中间操作符(如 map 中的转换逻辑)

Sink

Sink 是订阅的中间代理,主要负责:

  1. 转发事件:将 Observable 的事件传递给 Observer
  2. 生命周期管理:通过 Disposable 模式控制资源释放,在错误或完成时自动取消订阅
  3. 线程安全:可能包含锁或队列以保证线程安全的事件传递

Sink 是单一职责原则的体现,其设计让:

  • observable 只关心事件产生
  • sink 只关心是否发送和如何发送给 observer 作为决策层
  • observer 只关心如何应对事件

一个订阅关系返回的就是一个 sink 对象(即 disposable 对象)。

操作符

操作符(如 map)通过返回新的 Observable 实现链式调用。

新 Observable 的 eventGenerator 会:

  1. 创建一个中间 Observer(如 MapObserver),用于转换或过滤事件
  2. 让原始 Observable 订阅该中间 Observer,形成反向绑定
  3. 将处理后的事件转发给最终的 Observer

每个操作符都有上游和下游:

  • 上游是 observable
  • 下游是 observer
  • 自己是 observable

连接三者的方式是通过创建新的 observer 绑定上游,创建新的 observable(ret)绑定下游。
下游 Observer ← 【新 Observable(ret) ← 中间 Observer】 ← 上游 Observable

数据流向示意:
订阅方向:从下游到上游(下游 → ret → 中间 Observer → 上游)。
事件流动方向:从上游到下游(上游 → 中间 Observer → ret → 下游)。

惰性执行:只有下游订阅时,才会触发上游的事件生成。
资源管理:取消订阅时,能从下游逐级释放上游资源。

下篇文章,会看看iOS中常用的一些rx实践。