RxSwift(一) 上篇文章主要讲了,Rx 的 Observable、Observer以及操作符的实现方式。这篇文章主要看看Rx是如何实现一些iOS常用元素的监听的。
通知的监听 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 NotificationCenter .default.rx.notification(name).subscribe(onNext: block)extension NSObject : ReactiveCompatible { }public var rx: Reactive <Self > { get { Reactive (self ) } set { } }public func notification (_ name : Notification .Name ?, object : AnyObject ? = nil ) -> Observable <Notification > { return Observable .create { [weak object] observer in let nsObserver = self .base.addObserver(forName: name, object: object, queue: nil ) { notification in observer.on(.next(notification)) } return Disposables .create { self .base.removeObserver(nsObserver) } } }
通过NotificationCenter.default.rx
得到一个base
是NotificationCenter
的Reactive
对象。
通过notification
方法,创建了一个Observable
,event generator
中通过NotificationCenter
的addObserver
方法监听某个通知,然后把监听到的事件发送给Observer
,Disposable
中已经处理了移除通知的逻辑。
UITextField的监听 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 phoneNumberTF.rx.text.orEmpty.distinctUntilChanged().subscribe(onNext: { [weak self ] text in self ? .textDidChange(text) }).disposed(by: disposeBag)let newText = BehaviorSubject <String >(value: "" ) updateButton.rx.tap .map { "Updated at \(Date()) " } .bind(to: newText) .disposed(by: disposeBag) newText .bind(to: textField.rx.text) .disposed(by: disposeBag)public var value: ControlProperty <String ?> { return base.rx.controlPropertyWithDefaultEvents( getter: { textField in textField.text }, setter: { textField, value in if textField.text != value { textField.text = value } } ) }public func controlProperty <T >( editingEvents : UIControl .Event , getter : @escaping (Base ) -> T , setter : @escaping (Base , T ) -> Void ) -> ControlProperty <T > { let source: Observable <T > = Observable .create { [weak weakControl = base] observer in guard let control = weakControl else { observer.on(.completed) return Disposables .create() } observer.on(.next(getter(control))) let controlTarget = ControlTarget (control: control, controlEvents: editingEvents) { _ in if let control = weakControl { observer.on(.next(getter(control))) } } return Disposables .create(with: controlTarget.dispose) } .take(until: deallocated) let bindingObserver = Binder (base, binding: setter) return ControlProperty <T >(values: source, valueSink: bindingObserver) }... let selector: Selector = #selector (ControlTarget .eventHandler(_ :))weak var control: Control ?var callback: Callback ?init (control : Control , controlEvents : UIControl .Event , callback : @escaping Callback ) { MainScheduler .ensureRunningOnMainThread() self .control = control self .controlEvents = controlEvents self .callback = callback super .init () control.addTarget(self , action: selector, for: controlEvents) let method = self .method(for: selector) if method == nil { rxFatalError("Can't find method" ) } }...
phoneNumberTF.rx.text
返回的是ControlProperty
对象。
ControlProperty
对象,持有tf
的getter
、setter
方法以及事件变化类型的数组[.allEditingEvents, .valueChanged]
,通过这些输入最终转化成了source
和bindingObserver
。
ControlProperty
实现了ObservableType
和ObserverType
协议,意味着可以作为输入也可以作为输出,当做为输出的时候source
起作用,当作为输入的时候bindingObserver
起作用。
tips: Binder可以看作一种Observer,接收两个参数一个是其本身
总结:本质上就是通过封装getter方法
和Events
为Observable
,封装setter方法
作为Observer
。
PublishSubject 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 let publishSubject = PublishSubject <String >()let subscription = publishSubject.subscribe(onNext: { value in print ("接收到的值: \(value) " ) }) publishSubject.onNext("Hello, RxSwift!" )public func on (_ event : Event <Element >) { dispatch(self .synchronized_on(event), event) }func synchronized_on (_ event : Event <Element >) -> Observers { switch event { case .next: if self .isDisposed || self .stopped { return Observers () } return self .observers case .completed, .error: if self .stoppedEvent == nil { self .stoppedEvent = event self .stopped = true let observers = self .observers self .observers.removeAll() return observers } return Observers () } }func synchronized_subscribe <Observer : ObserverType >(_ observer : Observer ) -> Disposable where Observer .Element == Element { let key = self .observers.insert(observer.on) return SubscriptionDisposable (owner: self , key: key) }
PublishSubject
与ControlProperty
类似,也实现了ObservableType
和ObserverType
协议,意味着可以作为输入也可以作为输出。
当作为输出时,synchronized_on
起作用,dispatch(self.synchronized_on(event), event)
中self.synchronized_on(event)
会返回所有的observers
,然后通过dispatch方法
遍历传入event
。
从源码中可见, PublishSubject
对之前的元素不做任何保存,订阅此Subject
只有新的event
发出才会被执行到。其他的几种Subject
大体结构与此类似,只是对event
有不同的处理方式。
rx.observe(keypath) 1 2 3 4 5 6 7 8 9 10 11 12 13 webView.rx.observe(\.estimatedProgress).subscribe(onNext: { [weak self ] _ in self ? .observeWebViewProgress() }).disposed(by: disposeBag)Observable <Element >.create { [weak base] observer in let observation = base? .observe(keyPath, options: options) { obj, _ in observer.on(.next(obj[keyPath: keyPath])) } return Disposables .create { observation? .invalidate() } } .take(until: base.rx.deallocated)
基于swift
的KVOAPI
,当subscribe
的时候会调用系统kvo方法
监听base
自身的属性变化,在回调中把element
发送给observer
执行,但也受限于swift
的kvo
方式,必须强持有。
dispose
里面设置kvo
的invalidate
。
rx.observe(Bool.self, “hidden”) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public extension Reactive where Base : UIView { var isHidden: Observable <Bool > { return base.layer.rx.observe(Bool .self , "hidden" ).compactMap { $0 } } }public func observe <Element >(_ type : Element .Type , _ keyPath : String , options : KeyValueObservingOptions = [.new, .initial], retainSelf : Bool = true ) -> Observable <Element ?> { KVOObservable (object: self .base, keyPath: keyPath, options: options, retainTarget: retainSelf).asObservable() }private final class KVOObservable <Element > : ObservableType , KVOObservableProtocol { typealias Element = Element ? unowned var target: AnyObject var strongTarget: AnyObject ? var keyPath: String var options: KeyValueObservingOptions var retainTarget: Bool init (object : AnyObject , keyPath : String , options : KeyValueObservingOptions , retainTarget : Bool ) { self .target = object self .keyPath = keyPath self .options = options self .retainTarget = retainTarget if retainTarget { self .strongTarget = object } } func subscribe <Observer : ObserverType >(_ observer : Observer ) -> Disposable where Observer .Element == Element ? { let observer = KVOObserver (parent: self ) { value in if value as? NSNull != nil { observer.on(.next(nil )) return } observer.on(.next(value as? Element )) } return Disposables .create(with: observer.dispose) } }private final class KVOObserver : _RXKVOObserver , Disposable { typealias Callback = (Any ? ) -> Void var retainSelf: KVOObserver ? init (parent : KVOObservableProtocol , callback : @escaping Callback ) { super .init (target: parent.target, retainTarget: parent.retainTarget, keyPath: parent.keyPath, options: parent.options.nsOptions, callback: callback) self .retainSelf = self } override func dispose () { super .dispose() self .retainSelf = nil } }- (void)dispose { [self .target removeObserver:self forKeyPath:self .keyPath context:nil ]; self .target = nil ; self .retainedTarget = nil ; }
KVOObservable
会持有base
、keypath
,当subscribe
一个observer
的时候,会创建KVOObserver
对象。
KVOObserver
继承自OC文件_RXKVOObserver
,对OC的kvo
方法做了block
回调封装。
retainTarget
表示是否retain target,observeWeakly
就是 false 的。
默认retainTarget
是true
,确保了不会因为target
被释放导致的kvo崩溃
(被观察者变化的时候,观察者target已经不在了但是被观察者的observe没有移除就会崩溃)。
dispose
的时候会移除observever
和target = nil
,确保了内部释放。如果retainTarget
是false
就会面临kvo崩溃风险。
默认retainTarget
是true
的情况和系统kvo api一样(感觉没区别)。
rx.weakobserve(Bool.self, “hidden”) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 private func observeWeaklyKeyPathFor (_ target : NSObject , keyPath : String , options : KeyValueObservingOptions ) -> Observable <AnyObject ?> { let components = keyPath.components(separatedBy: "." ).filter { $0 != "self" } let observable = observeWeaklyKeyPathFor(target, keyPathSections: components, options: options) .finishWithNilWhenDealloc(target) if ! options.isDisjoint(with: .initial) { return observable } else { return observable .skip(1 ) } }private func observeWeaklyKeyPathFor ( _ target : NSObject , keyPathSections : [String ], options : KeyValueObservingOptions ) -> Observable <AnyObject ?> { weak var weakTarget: AnyObject ? = target let propertyName = keyPathSections[0 ] let remainingPaths = Array (keyPathSections[1 ..< keyPathSections.count]) let property = class_getProperty(object_getClass(target), propertyName) if property == nil { return Observable .error(RxCocoaError .invalidPropertyName(object: target, propertyName: propertyName)) } let propertyAttributes = property_getAttributes(property! ) let isWeak = isWeakProperty(propertyAttributes.map(String .init ) ?? "" ) let propertyObservable = KVOObservable (object: target, keyPath: propertyName, options: options.union(.initial), retainTarget: false ) as KVOObservable <AnyObject > return propertyObservable .flatMapLatest { (nextTarget: AnyObject ?) -> Observable <AnyObject ?> in if nextTarget == nil { return Observable .just(nil ) } let nextObject = nextTarget! as? NSObject let strongTarget: AnyObject ? = weakTarget if nextObject == nil { return Observable .error(RxCocoaError .invalidObjectOnKeyPath(object: nextTarget! , sourceObject: strongTarget ?? NSNull (), propertyName: propertyName)) } if strongTarget == nil { return Observable .empty() } let nextElementsObservable = keyPathSections.count == 1 ? Observable .just(nextTarget) : observeWeaklyKeyPathFor(nextObject! , keyPathSections: remainingPaths, options: options) if isWeak { return nextElementsObservable .finishWithNilWhenDealloc(nextObject! ) } else { return nextElementsObservable } } }func finishWithNilWhenDealloc (_ target : NSObject ) -> Observable <AnyObject ?> { let deallocating = target.rx.deallocating return deallocating .map { _ in return Observable .just(nil ) } .startWith(self .asObservable()) .switchLatest() }private final class KVOObserver : _RXKVOObserver , Disposable { typealias Callback = (Any ? ) -> Void var retainSelf: KVOObserver ? init (parent : KVOObservableProtocol , callback : @escaping Callback ) { super .init (target: parent.target, retainTarget: parent.retainTarget, keyPath: parent.keyPath, options: parent.options.nsOptions, callback: callback) self .retainSelf = self } override func dispose () { super .dispose() self .retainSelf = nil } deinit { #if TRACE_RESOURCES _ = Resources .decrementTotal() #endif } }
flat
就是降维的意思,不管是 flatmap
还是 flatMapLatest
,接受的 event
是单个element
或者element序列
,前者会包装成新Observable
后者直接返回,flatMapLatest
只会保留最后一个 element
进行Observable化
的操作,flatMapLatest
只会让最后一次生成的Observable
生效。
流程图如下:
这个例子中,一旦触发了subscribe
,会遍历检查keypath链
,检查每一级property
属性是否weak
或者nil
,weak
就hook dealloc
动作做.completed事件
释放,nil
发送empty
,也对应的是.completed事件
。
每次递归都会检查target
和keypath的第一级
必须存在,不存在直接抛Observable.error(RxCocoaError.invalidPropertyName(object: target, propertyName: propertyName))
。
这套监听操作检查了整个keypath
路径的合法性,如果异常就会抛出结束事件。
思考:如何确保递归被调用到? 因为每次遍历target
和keypath的第一级
必须存在,当subscribe
的时候会生成_RXKVOObserver
对象监听,第一级存在所以observeValueForKeyPath
回调事件一定会被触发,然后执行keypath
递归,另外递归的值本身就是下一级的target
确保了递归的可行。
思考:中间级的kvo
监听会释放吗,还是只存在最后一级? 不会被释放,源码中发现KVOObserver
把自己retain
了只有主动dispose
才会被释放,测试打印发现KVOObservable(flatMap)
都会被释放,也就是流程图中标注虚线的部分。一次递归下来只有KVOObserver
和其持有的Sink
不会被释放。
疑问:flatMapLatest
,不可用flatMap
吗? 感觉没关系,之前的序列(Observable.just(xxx)
或者KVOObservable.flatmap
)打印验证到用完就释放了,并没有再发出来数据,只是用flatMapLatest
应该性能高一些。