RxSwift(一) 上篇文章主要讲了,Rx 的 Observable、Observer以及操作符的实现方式。这篇文章主要看看Rx是如何实现一些iOS常用元素的监听的。
通知的监听 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 NotificationCenter .default.rx.notification(name).subscribe(onNext: block)extension NSObject : ReactiveCompatible { }public var rx: Reactive <Self > { get { Reactive (self ) } set { } }public func notification (_ name : Notification .Name ?, object : AnyObject ? = nil ) -> Observable <Notification > { return Observable .create { [weak object] observer in let nsObserver = self .base.addObserver(forName: name, object: object, queue: nil ) { notification in observer.on(.next(notification)) } return Disposables .create { self .base.removeObserver(nsObserver) } } }
通过NotificationCenter.default.rx得到一个base是NotificationCenter的Reactive对象。
通过notification方法,创建了一个Observable,event generator 中通过NotificationCenter的addObserver方法监听某个通知,然后把监听到的事件发送给Observer,Disposable中已经处理了移除通知的逻辑。
UITextField的监听 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 phoneNumberTF.rx.text.orEmpty.distinctUntilChanged().subscribe(onNext: { [weak self ] text in self ? .textDidChange(text) }).disposed(by: disposeBag)let newText = BehaviorSubject <String >(value: "" ) updateButton.rx.tap .map { "Updated at \(Date()) " } .bind(to: newText) .disposed(by: disposeBag) newText .bind(to: textField.rx.text) .disposed(by: disposeBag)public var value: ControlProperty <String ?> { return base.rx.controlPropertyWithDefaultEvents( getter: { textField in textField.text }, setter: { textField, value in if textField.text != value { textField.text = value } } ) }public func controlProperty <T >( editingEvents : UIControl .Event , getter : @escaping (Base ) -> T , setter : @escaping (Base , T ) -> Void ) -> ControlProperty <T > { let source: Observable <T > = Observable .create { [weak weakControl = base] observer in guard let control = weakControl else { observer.on(.completed) return Disposables .create() } observer.on(.next(getter(control))) let controlTarget = ControlTarget (control: control, controlEvents: editingEvents) { _ in if let control = weakControl { observer.on(.next(getter(control))) } } return Disposables .create(with: controlTarget.dispose) } .take(until: deallocated) let bindingObserver = Binder (base, binding: setter) return ControlProperty <T >(values: source, valueSink: bindingObserver) }... let selector: Selector = #selector (ControlTarget .eventHandler(_ :))weak var control: Control ?var callback: Callback ?init (control : Control , controlEvents : UIControl .Event , callback : @escaping Callback ) { MainScheduler .ensureRunningOnMainThread() self .control = control self .controlEvents = controlEvents self .callback = callback super .init () control.addTarget(self , action: selector, for: controlEvents) let method = self .method(for: selector) if method == nil { rxFatalError("Can't find method" ) } }...
phoneNumberTF.rx.text返回的是ControlProperty对象。
ControlProperty对象,持有tf的getter、setter方法以及事件变化类型的数组[.allEditingEvents, .valueChanged],通过这些输入最终转化成了source和bindingObserver。
ControlProperty实现了ObservableType和ObserverType协议,意味着可以作为输入也可以作为输出,当做为输出的时候source起作用,当作为输入的时候bindingObserver起作用。
tips: Binder可以看作一种Observer,接收两个参数一个是其本身
总结:本质上就是通过封装getter方法和Events为Observable,封装setter方法作为Observer。
PublishSubject 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 let publishSubject = PublishSubject <String >()let subscription = publishSubject.subscribe(onNext: { value in print ("接收到的值: \(value) " ) }) publishSubject.onNext("Hello, RxSwift!" )public func on (_ event : Event <Element >) { dispatch(self .synchronized_on(event), event) }func synchronized_on (_ event : Event <Element >) -> Observers { switch event { case .next: if self .isDisposed || self .stopped { return Observers () } return self .observers case .completed, .error: if self .stoppedEvent == nil { self .stoppedEvent = event self .stopped = true let observers = self .observers self .observers.removeAll() return observers } return Observers () } }func synchronized_subscribe <Observer : ObserverType >(_ observer : Observer ) -> Disposable where Observer .Element == Element { let key = self .observers.insert(observer.on) return SubscriptionDisposable (owner: self , key: key) }
PublishSubject与ControlProperty类似,也实现了ObservableType和ObserverType协议,意味着可以作为输入也可以作为输出。
当作为输出时,synchronized_on起作用,dispatch(self.synchronized_on(event), event)中self.synchronized_on(event)会返回所有的observers,然后通过dispatch方法遍历传入event。
从源码中可见, PublishSubject对之前的元素不做任何保存,订阅此Subject只有新的event发出才会被执行到。其他的几种Subject大体结构与此类似,只是对event有不同的处理方式。
rx.observe(keypath) 1 2 3 4 5 6 7 8 9 10 11 12 13 webView.rx.observe(\.estimatedProgress).subscribe(onNext: { [weak self ] _ in self ? .observeWebViewProgress() }).disposed(by: disposeBag)Observable <Element >.create { [weak base] observer in let observation = base? .observe(keyPath, options: options) { obj, _ in observer.on(.next(obj[keyPath: keyPath])) } return Disposables .create { observation? .invalidate() } } .take(until: base.rx.deallocated)
基于swift的KVOAPI,当subscribe的时候会调用系统kvo方法监听base自身的属性变化,在回调中把element发送给observer执行,但也受限于swift的kvo方式,必须强持有。
dispose里面设置kvo的invalidate。
rx.observe(Bool.self, “hidden”) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public extension Reactive where Base : UIView { var isHidden: Observable <Bool > { return base.layer.rx.observe(Bool .self , "hidden" ).compactMap { $0 } } }public func observe <Element >(_ type : Element .Type , _ keyPath : String , options : KeyValueObservingOptions = [.new, .initial], retainSelf : Bool = true ) -> Observable <Element ?> { KVOObservable (object: self .base, keyPath: keyPath, options: options, retainTarget: retainSelf).asObservable() }private final class KVOObservable <Element > : ObservableType , KVOObservableProtocol { typealias Element = Element ? unowned var target: AnyObject var strongTarget: AnyObject ? var keyPath: String var options: KeyValueObservingOptions var retainTarget: Bool init (object : AnyObject , keyPath : String , options : KeyValueObservingOptions , retainTarget : Bool ) { self .target = object self .keyPath = keyPath self .options = options self .retainTarget = retainTarget if retainTarget { self .strongTarget = object } } func subscribe <Observer : ObserverType >(_ observer : Observer ) -> Disposable where Observer .Element == Element ? { let observer = KVOObserver (parent: self ) { value in if value as? NSNull != nil { observer.on(.next(nil )) return } observer.on(.next(value as? Element )) } return Disposables .create(with: observer.dispose) } }private final class KVOObserver : _RXKVOObserver , Disposable { typealias Callback = (Any ? ) -> Void var retainSelf: KVOObserver ? init (parent : KVOObservableProtocol , callback : @escaping Callback ) { super .init (target: parent.target, retainTarget: parent.retainTarget, keyPath: parent.keyPath, options: parent.options.nsOptions, callback: callback) self .retainSelf = self } override func dispose () { super .dispose() self .retainSelf = nil } }- (void)dispose { [self .target removeObserver:self forKeyPath:self .keyPath context:nil ]; self .target = nil ; self .retainedTarget = nil ; }
KVOObservable会持有base、keypath,当subscribe一个observer的时候,会创建KVOObserver对象。
KVOObserver继承自OC文件_RXKVOObserver,对OC的kvo方法做了block回调封装。
retainTarget表示是否retain target,observeWeakly就是 false 的。
默认retainTarget是true,确保了不会因为target被释放导致的kvo崩溃(被观察者变化的时候,观察者target已经不在了但是被观察者的observe没有移除就会崩溃)。
dispose的时候会移除observever和target = nil,确保了内部释放。如果retainTarget是false就会面临kvo崩溃风险。
默认retainTarget是true的情况和系统kvo api一样(感觉没区别)。
rx.weakobserve(Bool.self, “hidden”) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 private func observeWeaklyKeyPathFor (_ target : NSObject , keyPath : String , options : KeyValueObservingOptions ) -> Observable <AnyObject ?> { let components = keyPath.components(separatedBy: "." ).filter { $0 != "self" } let observable = observeWeaklyKeyPathFor(target, keyPathSections: components, options: options) .finishWithNilWhenDealloc(target) if ! options.isDisjoint(with: .initial) { return observable } else { return observable .skip(1 ) } }private func observeWeaklyKeyPathFor ( _ target : NSObject , keyPathSections : [String ], options : KeyValueObservingOptions ) -> Observable <AnyObject ?> { weak var weakTarget: AnyObject ? = target let propertyName = keyPathSections[0 ] let remainingPaths = Array (keyPathSections[1 ..< keyPathSections.count]) let property = class_getProperty(object_getClass(target), propertyName) if property == nil { return Observable .error(RxCocoaError .invalidPropertyName(object: target, propertyName: propertyName)) } let propertyAttributes = property_getAttributes(property! ) let isWeak = isWeakProperty(propertyAttributes.map(String .init ) ?? "" ) let propertyObservable = KVOObservable (object: target, keyPath: propertyName, options: options.union(.initial), retainTarget: false ) as KVOObservable <AnyObject > return propertyObservable .flatMapLatest { (nextTarget: AnyObject ?) -> Observable <AnyObject ?> in if nextTarget == nil { return Observable .just(nil ) } let nextObject = nextTarget! as? NSObject let strongTarget: AnyObject ? = weakTarget if nextObject == nil { return Observable .error(RxCocoaError .invalidObjectOnKeyPath(object: nextTarget! , sourceObject: strongTarget ?? NSNull (), propertyName: propertyName)) } if strongTarget == nil { return Observable .empty() } let nextElementsObservable = keyPathSections.count == 1 ? Observable .just(nextTarget) : observeWeaklyKeyPathFor(nextObject! , keyPathSections: remainingPaths, options: options) if isWeak { return nextElementsObservable .finishWithNilWhenDealloc(nextObject! ) } else { return nextElementsObservable } } }func finishWithNilWhenDealloc (_ target : NSObject ) -> Observable <AnyObject ?> { let deallocating = target.rx.deallocating return deallocating .map { _ in return Observable .just(nil ) } .startWith(self .asObservable()) .switchLatest() }private final class KVOObserver : _RXKVOObserver , Disposable { typealias Callback = (Any ? ) -> Void var retainSelf: KVOObserver ? init (parent : KVOObservableProtocol , callback : @escaping Callback ) { super .init (target: parent.target, retainTarget: parent.retainTarget, keyPath: parent.keyPath, options: parent.options.nsOptions, callback: callback) self .retainSelf = self } override func dispose () { super .dispose() self .retainSelf = nil } deinit { #if TRACE_RESOURCES _ = Resources .decrementTotal() #endif } }
flat就是降维的意思,不管是 flatmap 还是 flatMapLatest,接受的 event是单个element或者element序列,前者会包装成新Observable后者直接返回,flatMapLatest只会保留最后一个 element 进行Observable化的操作,flatMapLatest只会让最后一次生成的Observable生效。
流程图如下:
这个例子中,一旦触发了subscribe,会遍历检查keypath链,检查每一级property属性是否weak或者nil,weak就hook dealloc动作做.completed事件释放,nil发送empty,也对应的是.completed事件。
每次递归都会检查target和keypath的第一级必须存在,不存在直接抛Observable.error(RxCocoaError.invalidPropertyName(object: target, propertyName: propertyName))。
这套监听操作检查了整个keypath路径的合法性,如果异常就会抛出结束事件。
思考:如何确保递归被调用到? 因为每次遍历target和keypath的第一级必须存在,当subscribe的时候会生成_RXKVOObserver对象监听,第一级存在所以observeValueForKeyPath回调事件一定会被触发,然后执行keypath递归,另外递归的值本身就是下一级的target确保了递归的可行。
思考:中间级的kvo监听会释放吗,还是只存在最后一级? 不会被释放,源码中发现KVOObserver把自己retain了只有主动dispose才会被释放,测试打印发现KVOObservable(flatMap)都会被释放,也就是流程图中标注虚线的部分。一次递归下来只有KVOObserver和其持有的Sink不会被释放。
疑问:flatMapLatest,不可用flatMap吗? 感觉没关系,之前的序列(Observable.just(xxx)或者KVOObservable.flatmap)打印验证到用完就释放了,并没有再发出来数据,只是用flatMapLatest应该性能高一些。