RxSwift(一)上篇文章主要讲了,Rx 的 Observable、Observer以及操作符的实现方式。这篇文章主要看看Rx是如何实现一些iOS常用元素的监听的。

通知的监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 外部调用
NotificationCenter.default.rx.notification(name).subscribe(onNext: block)

// 关键源码 1
extension NSObject: ReactiveCompatible { }
public var rx: Reactive<Self> {
get { Reactive(self) }
// this enables using Reactive to "mutate" base object
// swiftlint:disable:next unused_setter_value
set { }
}

// 关键源码 2
public func notification(_ name: Notification.Name?, object: AnyObject? = nil) -> Observable<Notification> {
return Observable.create { [weak object] observer in
let nsObserver = self.base.addObserver(forName: name, object: object, queue: nil) { notification in
observer.on(.next(notification))
}

return Disposables.create {
self.base.removeObserver(nsObserver)
}
}
}

  1. 通过NotificationCenter.default.rx得到一个baseNotificationCenterReactive对象。
  2. 通过notification方法,创建了一个Observableevent generator 中通过NotificationCenteraddObserver方法监听某个通知,然后把监听到的事件发送给ObserverDisposable中已经处理了移除通知的逻辑。

UITextField的监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// 外部调用:作为Observable
phoneNumberTF.rx.text.orEmpty.distinctUntilChanged().subscribe(onNext: { [weak self] text in
self?.textDidChange(text)
}).disposed(by: disposeBag)

// 外部调用:作为observer
let newText = BehaviorSubject<String>(value: "")
updateButton.rx.tap
.map { "Updated at \(Date())" }
.bind(to: newText)
.disposed(by: disposeBag)
newText
.bind(to: textField.rx.text)
.disposed(by: disposeBag)

// 关键源码 1
/// Reactive wrapper for `text` property.
public var value: ControlProperty<String?> {
return base.rx.controlPropertyWithDefaultEvents(
getter: { textField in
textField.text
},
setter: { textField, value in
// This check is important because setting text value always clears control state
// including marked text selection which is imporant for proper input
// when IME input method is used.
if textField.text != value {
textField.text = value
}
}
)
}

// 关键源码 2
public func controlProperty<T>(
editingEvents: UIControl.Event,
getter: @escaping (Base) -> T,
setter: @escaping (Base, T) -> Void
) -> ControlProperty<T> {
let source: Observable<T> = Observable.create { [weak weakControl = base] observer in
guard let control = weakControl else {
observer.on(.completed)
return Disposables.create()
}

observer.on(.next(getter(control)))

let controlTarget = ControlTarget(control: control, controlEvents: editingEvents) { _ in
if let control = weakControl {
observer.on(.next(getter(control)))
}
}

return Disposables.create(with: controlTarget.dispose)
}
.take(until: deallocated)

let bindingObserver = Binder(base, binding: setter)

return ControlProperty<T>(values: source, valueSink: bindingObserver)
}

// 关键源码3 ControlTarget
...
let selector: Selector = #selector(ControlTarget.eventHandler(_:))
weak var control: Control?
var callback: Callback?
init(control: Control, controlEvents: UIControl.Event, callback: @escaping Callback) {
MainScheduler.ensureRunningOnMainThread()

self.control = control
self.controlEvents = controlEvents
self.callback = callback

super.init()

control.addTarget(self, action: selector, for: controlEvents)

let method = self.method(for: selector)
if method == nil {
rxFatalError("Can't find method")
}
}
...

  1. phoneNumberTF.rx.text返回的是ControlProperty对象。
  2. ControlProperty对象,持有tfgettersetter方法以及事件变化类型的数组[.allEditingEvents, .valueChanged],通过这些输入最终转化成了sourcebindingObserver
  3. ControlProperty实现了ObservableTypeObserverType协议,意味着可以作为输入也可以作为输出,当做为输出的时候source起作用,当作为输入的时候bindingObserver起作用。

tips: Binder可以看作一种Observer,接收两个参数一个是其本身

总结:本质上就是通过封装getter方法EventsObservable,封装setter方法作为Observer

PublishSubject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 外部调用
let publishSubject = PublishSubject<String>()
let subscription = publishSubject.subscribe(onNext: { value in
print("接收到的值: \(value)")
})
publishSubject.onNext("Hello, RxSwift!")

// 关键源码 1
public func on(_ event: Event<Element>) {
dispatch(self.synchronized_on(event), event)
}

func synchronized_on(_ event: Event<Element>) -> Observers {
switch event {
case .next:
if self.isDisposed || self.stopped {
return Observers()
}

return self.observers
case .completed, .error:
if self.stoppedEvent == nil {
self.stoppedEvent = event
self.stopped = true
let observers = self.observers
self.observers.removeAll()
return observers
}

return Observers()
}
}

// 关键源码 2
func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
let key = self.observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}

  1. PublishSubjectControlProperty类似,也实现了ObservableTypeObserverType协议,意味着可以作为输入也可以作为输出。
  2. 当作为输出时,synchronized_on起作用,dispatch(self.synchronized_on(event), event)self.synchronized_on(event)会返回所有的observers,然后通过dispatch方法遍历传入event
  3. 从源码中可见, PublishSubject对之前的元素不做任何保存,订阅此Subject只有新的event发出才会被执行到。其他的几种Subject大体结构与此类似,只是对event有不同的处理方式。

rx.observe(keypath)

1
2
3
4
5
6
7
8
9
10
11
12
13
// 外部调用
webView.rx.observe(\.estimatedProgress).subscribe(onNext: { [weak self] _ in
self?.observeWebViewProgress()
}).disposed(by: disposeBag)

// 关键源码1 创建方法
Observable<Element>.create { [weak base] observer in
let observation = base?.observe(keyPath, options: options) { obj, _ in
observer.on(.next(obj[keyPath: keyPath]))
}
return Disposables.create { observation?.invalidate() }
}
.take(until: base.rx.deallocated)
  1. 基于swiftKVOAPI,当subscribe的时候会调用系统kvo方法监听base自身的属性变化,在回调中把element发送给observer执行,但也受限于swiftkvo方式,必须强持有。
  2. dispose里面设置kvoinvalidate

rx.observe(Bool.self, “hidden”)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// 外部调用,一个rx扩展监听view的隐藏事件
public extension Reactive where Base: UIView {
var isHidden: Observable<Bool> {
return base.layer.rx.observe(Bool.self, "hidden").compactMap { $0 }
}
}

// 关键源码1 创建方法
public func observe<Element>(_ type: Element.Type,
_ keyPath: String,
options: KeyValueObservingOptions = [.new, .initial],
retainSelf: Bool = true) -> Observable<Element?> {
KVOObservable(object: self.base, keyPath: keyPath, options: options, retainTarget: retainSelf).asObservable()
}

// 关键源码2 KVOObservable
private final class KVOObservable<Element>
: ObservableType
, KVOObservableProtocol {
typealias Element = Element?

unowned var target: AnyObject
var strongTarget: AnyObject?

var keyPath: String
var options: KeyValueObservingOptions
var retainTarget: Bool

init(object: AnyObject, keyPath: String, options: KeyValueObservingOptions, retainTarget: Bool) {
self.target = object
self.keyPath = keyPath
self.options = options
self.retainTarget = retainTarget
if retainTarget {
self.strongTarget = object
}
}

func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element? {
let observer = KVOObserver(parent: self) { value in
if value as? NSNull != nil {
observer.on(.next(nil))
return
}
observer.on(.next(value as? Element))
}

return Disposables.create(with: observer.dispose)
}

}
// 关键源码4 KVOObserver
private final class KVOObserver
: _RXKVOObserver
, Disposable {
typealias Callback = (Any?) -> Void

var retainSelf: KVOObserver?

init(parent: KVOObservableProtocol, callback: @escaping Callback) {
super.init(target: parent.target, retainTarget: parent.retainTarget, keyPath: parent.keyPath, options: parent.options.nsOptions, callback: callback)
self.retainSelf = self
}

override func dispose() {
super.dispose()
self.retainSelf = nil
}
}

- (void)dispose {
[self.target removeObserver:self forKeyPath:self.keyPath context:nil];
self.target = nil;
self.retainedTarget = nil;
}

  1. KVOObservable会持有basekeypath,当subscribe一个observer的时候,会创建KVOObserver对象。
  2. KVOObserver继承自OC文件_RXKVOObserver,对OC的kvo方法做了block回调封装。
  3. retainTarget表示是否retain target,observeWeakly就是 false 的。
  4. 默认retainTargettrue,确保了不会因为target被释放导致的kvo崩溃(被观察者变化的时候,观察者target已经不在了但是被观察者的observe没有移除就会崩溃)。
  5. dispose的时候会移除observevertarget = nil,确保了内部释放。如果retainTargetfalse就会面临kvo崩溃风险。
  6. 默认retainTargettrue的情况和系统kvo api一样(感觉没区别)。

rx.weakobserve(Bool.self, “hidden”)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112

private func observeWeaklyKeyPathFor(_ target: NSObject, keyPath: String, options: KeyValueObservingOptions) -> Observable<AnyObject?> {
let components = keyPath.components(separatedBy: ".").filter { $0 != "self" }

let observable = observeWeaklyKeyPathFor(target, keyPathSections: components, options: options)
.finishWithNilWhenDealloc(target)

if !options.isDisjoint(with: .initial) {
return observable
}
else {
return observable
.skip(1)
}
}

private func observeWeaklyKeyPathFor(
_ target: NSObject,
keyPathSections: [String],
options: KeyValueObservingOptions
) -> Observable<AnyObject?> {

weak var weakTarget: AnyObject? = target

let propertyName = keyPathSections[0]
let remainingPaths = Array(keyPathSections[1..<keyPathSections.count])

let property = class_getProperty(object_getClass(target), propertyName)
// 原始target和第一级property必须存在,不存在压根就往下走
if property == nil {
return Observable.error(RxCocoaError.invalidPropertyName(object: target, propertyName: propertyName))
}
let propertyAttributes = property_getAttributes(property!)

// should dealloc hook be in place if week property, or just create strong reference because it doesn't matter
let isWeak = isWeakProperty(propertyAttributes.map(String.init) ?? "")

// 这个和上面的例子一致了,只不过注意传的retainTarget是false
let propertyObservable = KVOObservable(object: target, keyPath: propertyName, options: options.union(.initial), retainTarget: false) as KVOObservable<AnyObject>

// KVO recursion for value changes
return propertyObservable
.flatMapLatest { (nextTarget: AnyObject?) -> Observable<AnyObject?> in
if nextTarget == nil {
return Observable.just(nil)
}
let nextObject = nextTarget! as? NSObject

let strongTarget: AnyObject? = weakTarget

if nextObject == nil {
return Observable.error(RxCocoaError.invalidObjectOnKeyPath(object: nextTarget!, sourceObject: strongTarget ?? NSNull(), propertyName: propertyName))
}

// if target is alive, then send change
// if it's deallocated, don't send anything
if strongTarget == nil {
return Observable.empty()
}

let nextElementsObservable = keyPathSections.count == 1
? Observable.just(nextTarget)
: observeWeaklyKeyPathFor(nextObject!, keyPathSections: remainingPaths, options: options)

if isWeak {
return nextElementsObservable
.finishWithNilWhenDealloc(nextObject!)
}
else {
return nextElementsObservable
}
}
}

// 监听了target的deallocating调用 dealloc hook
func finishWithNilWhenDealloc(_ target: NSObject)
-> Observable<AnyObject?> {
let deallocating = target.rx.deallocating

return deallocating
.map { _ in
return Observable.just(nil)
}
.startWith(self.asObservable())
.switchLatest()
}

// KVOObserver retainself的操作
private final class KVOObserver
: _RXKVOObserver
, Disposable {
typealias Callback = (Any?) -> Void

var retainSelf: KVOObserver?

init(parent: KVOObservableProtocol, callback: @escaping Callback) {
super.init(target: parent.target, retainTarget: parent.retainTarget, keyPath: parent.keyPath, options: parent.options.nsOptions, callback: callback)
self.retainSelf = self
}

override func dispose() {
super.dispose()
self.retainSelf = nil
}

deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
}

flat就是降维的意思,不管是 flatmap 还是 flatMapLatest,接受的 event是单个element或者element序列,前者会包装成新Observable后者直接返回,flatMapLatest只会保留最后一个 element 进行Observable化的操作,flatMapLatest只会让最后一次生成的Observable生效。

流程图如下:

  1. 这个例子中,一旦触发了subscribe,会遍历检查keypath链,检查每一级property属性是否weak或者nilweakhook dealloc动作做.completed事件释放,nil发送empty,也对应的是.completed事件
  2. 每次递归都会检查targetkeypath的第一级必须存在,不存在直接抛Observable.error(RxCocoaError.invalidPropertyName(object: target, propertyName: propertyName))
  3. 这套监听操作检查了整个keypath路径的合法性,如果异常就会抛出结束事件。

思考:如何确保递归被调用到?
因为每次遍历targetkeypath的第一级必须存在,当subscribe的时候会生成_RXKVOObserver对象监听,第一级存在所以observeValueForKeyPath回调事件一定会被触发,然后执行keypath递归,另外递归的值本身就是下一级的target确保了递归的可行。

思考:中间级的kvo监听会释放吗,还是只存在最后一级?
不会被释放,源码中发现KVOObserver把自己retain了只有主动dispose才会被释放,测试打印发现KVOObservable(flatMap)都会被释放,也就是流程图中标注虚线的部分。一次递归下来只有KVOObserver和其持有的Sink不会被释放。

疑问:flatMapLatest,不可用flatMap吗?
感觉没关系,之前的序列(Observable.just(xxx)或者KVOObservable.flatmap)打印验证到用完就释放了,并没有再发出来数据,只是用flatMapLatest应该性能高一些。