A-A+

RxSwift特征序列详解

2019年08月04日 iOS原创文章 暂无评论
博客主机

一、概述

任何序列都可以用Observable描述,创建序列 -> 订阅序列 -> 信号发送 -> 信号接收。

Observable.create { (observer) -> Disposable inobserver.onNext("信号1")return Disposables.create()
}.subscribe(onNext: { (val) inprint("信号接收区:\(val)")
}).disposed(by: disposeBag)复制代码

Observable是通用序列的描述符,调用.onNext.onErroronCompleted来发送信号,通用性强,但针对特殊需求可能会觉得繁琐,因此RxSwift还提供了一组特征序列,是Observable序列的变种,它能够帮助我们更准确的描述序列。即SingleCompletableMaybeDriverSignalControlEvent

二、Single

1、定义

单元素序列,信号只发送一次,响应信号或错误信号。

Single.create { (single) -> Disposable insingle(.success("假装我是一个正儿八经的数据"))
            //single(.error(NSError.init(domain: "网络出现错误", code: 101, userInfo:["name":"hibo"])))return Disposables.create()
        }.subscribe(onSuccess: { (val) inprint("Single:\(val)")
        }) { (error) inprint("SingleError:\(error)")
        }.disposed(by: disposeBag)复制代码
  • sinngle(.success(data))  -> onSuccess     发送响应元素到成功观察者
  • sinngle(.error(error))  -> error    发送错误元素到错误观察者

响应元素和错误元素分开处理,此时我们可以联想到应用中的网络请求,成功数据用来渲染,错误数则据弹出提示框。

2、源码探索

2.1、Single定义

/// Sequence containing exactly 1 element
public enum SingleTrait { }
/// Represents a push style sequence containing 1 element.
public typealias Single= PrimitiveSequencepublic enum SingleEvent{
    /// One and only sequence element is produced. (underlying observable sequence emits: `.next(Element)`, `.completed`)case success(Element)
    
    /// Sequence terminated with an error. (underlying observable sequence emits: `.error(Error)`)case error(Swift.Error)
}复制代码

定位到Single.swift文件,首先能看到SinglePrimitiveSequence结构体类型的别名,SingleEvent是事件枚举,有successerror两个成员变量。

2.2、create创建序列。代码如下(此处代码标记为1️⃣):

extension PrimitiveSequenceType where TraitType == SingleTrait {
    public typealias SingleObserver = (SingleEvent) -> Void
          //代码省略若干行
    public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single{let source = Observable.create { observer inreturn subscribe { event inswitch event {case .success(let element):
                    observer.on(.next(element))
                    observer.on(.completed)case .error(let error):
                    observer.on(.error(error))
                }
            }
        }        return PrimitiveSequence(raw: source)
    }
}复制代码

首先看参数是一个带Disposable类型返回值的闭包,交由外部(业务层)实现,内部调用向外传入一个SingleObserver闭包,以上写法不太好理解,我们可以换一种写法:

public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single{let source = Observable.create { observer in// 1、内部实现一个闭包,用来接收外界传入的SingleEvent信号,接着做进一步的信号发送let block = { (event:SingleEvent) -> Void inswitch event {case .success(let element):
                observer.on(.next(element))
                observer.on(.completed)case .error(let error):
                observer.on(.error(error))
            }
        }
        // 2、调用外部实现的闭包方法,向外部发送内部实现的闭包方法做连接作用let disposable = subscribe(block)//3、返回值Disposable对象 return disposable
    }return PrimitiveSequence(raw: source)//4、创建PrimitiveSequence对象并保存Observable序列对象
}复制代码
  • 内部实现一个闭包block,用来接收外界传入的SingleEvent信号,接着做进一步的信号发送
  • 调用外部实现的闭包方法,将内部实现的闭包block发送出去,起连接作用
  • 创建PrimitiveSequence对象并保存Observable序列对象source,返回PrimitiveSequence对象

create方法内部实际上实现了一个Observable序列,由此可见Single序列是对Observable序列的封装,Disposable对象通过闭包交由业务层创建,Single序列在实现上,方式方法与Observable保持一致,此处可称一绝。当前我们只探索Single的信号是如何完成传递,Observable序列的信号传递流程在《Swift核心源码探索》中有详细介绍。

2.3、订阅序列

也是在同PrimitiveSequenceType扩展中定义,代码如下(此处代码标记为2️⃣):

public func subscribe(onSuccess: ((ElementType) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil) -> Disposable {#if DEBUG let callStack = Hooks.recordCallStackOnError ? Thread.callStackSymbols : []#elselet callStack = [String]()#endifreturn self.primitiveSequence.subscribe { event inswitch event {case .success(let element):
            onSuccess?(element)case .error(let error):if let onError = onError {
                onError(error)
            } else {
                Hooks.defaultErrorHandler(callStack, error)
            }
        }
    }
}复制代码

方法中先调用了self.primitiveSequence方法,返回了self,方法是在遵循PrimitiveSequenceType协议的PrimitiveSequence的扩展中,为了保证协议的一致性。代码如下:

extension PrimitiveSequence: PrimitiveSequenceType {
    /// Additional constraints
    public typealias TraitType = Trait
    /// Sequence element typepublic typealias ElementType = Element

    // Converts `self` to primitive sequence.
    ///
    /// - returns: Observable sequence that represents `self`.
    public var primitiveSequence: PrimitiveSequence{return self
    }
}复制代码

紧接着调用另一个subscribe方法,代码如下(此处代码标记为3️⃣):

public func subscribe(_ observer: @escaping (SingleEvent) -> Void) -> Disposable {
    var stopped = falsereturn self.primitiveSequence.asObservable().subscribe { event inif stopped { return }
        stopped = true
        switch event {case .next(let element):
            observer(.success(element))case .error(let error):
            observer(.error(error))case .completed:
            rxFatalErrorInDebug("Singles can't emit a completion event")
        }
    }
}复制代码
  • self.primitiveSequence -> asObservable() -> subscribe
  • 此处截断了completed信号的向上传递,因此Single序列只能收到响应信号和错误信号

该段代码也调用了self.primitiveSequence方法,接着调用asObservable()方法,查看代码发现此处是为了获取source对象,即Observable可观察序列。

再查看subscribe的方法(此处标记为代码4️⃣):

public func subscribe(_ on: @escaping (Event) -> Void)
    -> Disposable {let observer = AnonymousObserver { e inon(e)
        }return self.asObservable().subscribe(observer)
}复制代码
  • 代码创建了一个观察者,当前观察者将会收到发送过来的消息,并由此通过闭包一层层传到业务层。 4️⃣ -> 3️⃣ -> 2️⃣ -> 1️⃣ ->业务层
  • 当前self指向的是1️⃣处创建并保存的Observable类型的source对象,因此该处subscribe所调用的即是Produce类中的subscribe方法,在方法内部创建了sink对象,来触发创建序列时实现的闭包,即代码1️⃣处所create后的闭包
  • 此时就到了业务层,通过create内部实现的闭包single向内部发送消息,再有observer调用on来向观察者发送信号
  • 信号发送不做赘述,最终会到达4️⃣处代码的观察者,此时再由闭包一层层向上传递,直到业务层的监听闭包

总结:

序列的产生,订阅,发送,接收还是由Observable来实现的,Single只是对Observable做了封装,去除了onCompleted的消息监听及消息发送。

具体的Observable序列产生到观察流程见《Swift核心源码探索》

三、Completable

只能产生completed事件和error事件,没有序列元素值产生。

Completable.create { (completable) -> Disposable incompletable(.completed)
    //completable(.error(NSError.init(domain: "出现异常", code: 101, userInfo: nil)))return Disposables.create()
}.subscribe(onCompleted: {print("Completable")
}) { (error) inprint(error)
}.disposed(by: disposeBag)复制代码
  • 应用场景,只关心任务是否完成,不关心不需要结果
  • Competable.swift下,在PrimitiveSequenceType扩展中实现了序列的创建,订阅,即信号转发

定义如下:

/// Sequence containing 0 elements
public enum CompletableTrait { }
/// Represents a push style sequence containing 0 elements.
public typealias Completable = PrimitiveSequencepublic enum CompletableEvent {
    /// Sequence terminated with an error. (underlying observable sequence emits: `.error(Error)`)case error(Swift.Error)
    
    /// Sequence completed successfully.case completed
}复制代码

同样Completable类也是PrimitiveSequence的别名,并声明一个枚举包含,errorcompleted成员变量,限定了事件产生类型。都是对Observable序列的封装,源码此处不做探索说明,和Single一致,只是在订阅阶段对.next事件做了拦截。

四、Maybe

Single序列相似,发出一个元素或一个completed事件或error事件。

Maybe.create { (maybe) -> Disposable inmaybe(.success("element"))
        //maybe(.completed)
        //maybe(.error(NSError.init(domain: "出现异常", code: 101, userInfo: nil)))return Disposables.create()
    }.subscribe(onSuccess: { (val) inprint(val)
    }, onError: { (error) inprint("error:\(error)")
    }) {print("completed")
    }.disposed(by: disposeBag)复制代码

在开发中,如果一个业务有时候需要一个元素,有时候只需要知道处理完成的时候,可以使用该Maybe,解决不确定需求问题。源码探索略,同上。

五、Driver

老司机开车永远不会出错,因此Driver序列不会产生error事件,并一定在主线程中监听,会想新订阅者发送,上次发送出的元素,主要为简化UI层的代码。下面看看为什么会有Driver序列。

有一个需求:

  • 搜索框中每次输入一个文本,获取一次网络请求,成功后渲染UI

先实现一个简单的UI

let tf = UITextField.init(frame: CGRect(x: 100, y: 100, width: 200, height: 40))
tf.borderStyle = .roundedRect
tf.placeholder = "请输入"self.view.addSubview(tf)let label1 = UILabel.init(frame: CGRect(x: 100, y: 160, width: 200, height: 40))
label1.backgroundColor = .groupTableViewBackground
label1.textAlignment = .center
self.view.addSubview(label1)let label2 = UILabel.init(frame: CGRect(x: 100, y: 210, width: 200, height: 40))
label2.backgroundColor = .groupTableViewBackground
label2.textAlignment = .center
self.view.addSubview(label2)复制代码

创建了一个textfield,两个label用来展示。下面在来实现一个网络请求,返回一个Single序列:

func network(text:String) -> Single{return Single.create(subscribe: { (single) -> Disposable inif text == "1234"{
            single(.error(NSError.init(domain: "出现错误", code: 101, userInfo: nil)))
        }
        DispatchQueue.global().async {print("请求网络")
            single(.success(text))
        }return Disposables.create()
    })
}复制代码

网络请求为耗时操作,因此我们在异步中来完成,直接发送序列,假装我们请求了一次网络。

再实现textfield输入序列的监听,并调取网络请求方法:

let result = tf.rx.text.orEmpty.skip(1)
                .flatMap{return self.network(text: $0)
                        .observeOn(MainScheduler.instance)
                        .catchErrorJustReturn("网络请求失败")
            }.share(replay: 1, scope: .whileConnected)
//网络请求将发送多次请求
result.subscribe(onNext: { (val) inprint("订阅一:\(val) 线程:\(Thread.current)")
}).disposed(by: disposeBag)

result.subscribe(onNext: { (val) inprint("订阅二:\(val) 线程:\(Thread.current)")
}).disposed(by: disposeBag)

result.map{"\(($0 as! String).count)"}.bind(to: label1.rx.text).disposed(by: disposeBag)
result.map{"\($0)"}.bind(to: label2.rx.text).disposed(by: disposeBag)复制代码

代码介绍

  • flatMap
  • observeOn选择在哪个线程执行
  • catchErrorJustReturn错误处理,将onError事件转为onNext事件
  • share为多个观察者共享资源,网络请求只发送呢一次,否则多个订阅将会触发多个请求

Driver实现:

let result = tf.rx.text.orEmpty
    .asDriver()
    .flatMap {return self.network(text: $0).asDriver(onErrorJustReturn: "网络请求失败")
    }
result.map{"长度:\(($0 as! String).count)"}
        .drive(label1.rx.text).disposed(by: disposeBag)
result.map{"\($0)"}.drive(label2.rx.text).disposed(by: disposeBag)复制代码
  • asDriver()将序列转换为Driver序列
  • map重新组合并生成新的序列
  • driver将元素在主线程中绑定到label1label2

相比非driver下的代码实现,Driver序列省去了线程的设置,share数据共享设置。

源码探索

断点查看asDriver()方法:

extension ControlProperty {
    /// Converts `ControlProperty` to `Driver` trait.
    ///
    /// `ControlProperty` already can't fail, so no special case needs to be handled.
    public func asDriver() -> Driver{
        return self.asDriver { _ -> Driverin
            #if DEBUG
                rxFatalError("Somehow driver received error from a source that shouldn't fail.")
            #else
                return Driver.empty()
            #endif
        }
    }
}复制代码

ControlProperty的扩展方法,返回了一个Driver类,DriverSharedSequence的别名,用来描述不同类型的序列,最后又调用了asDriver方法,而该方法在ObservableConvertibleType的扩展中,一直追踪会发现很多类都是继承自ObservableConvertibleType下。

extension ObservableConvertibleType {
    public func asDriver(onErrorRecover: @escaping (_ error: Swift.Error) -> Driver) -> Driver{let source = self
            .asObservable()
            .observeOn(DriverSharingStrategy.scheduler)
            .catchError { error inonErrorRecover(error).asObservable()
            }return Driver(source)
    }
}复制代码

如上代码也设置了observerOn方法,来指定线程,继续深入能够发现DriverSharingStrategy.scheduler内部指定的就是主线程,印证了上面所说的Driver的执行是在主线程的。下面再看flatMap方法的实现:

public func flatMap(_ selector: @escaping (E) throws -> O)
    -> Observable{return FlatMap(source: self.asObservable(), selector: selector)
}复制代码

业务层实现闭包,在闭包中调用了网络请求方法,并向FlatMap中传入业务层实现的闭包…… 当前篇幅过程,源码分析需要另起篇幅。

六、Signal

Driver相似,不会产生error事件,在主线程执行,但不会像Driver一样会给新观察者发送上一次发送的元素。

使用如下:

let event : Driver= button.rx.tap.asDriver()
event.drive(onNext: {print("yahibo")
    event.drive(onNext: {print("yahibo1")
    }).disposed(by: self.disposeBag)
}).disposed(by: disposeBag)复制代码

运行打印,发现在点击后重新订阅的观察者,会直接收到点击事件,这是我们业务不允许的。下面再看Signal序列:

let event : Signal= button.rx.tap.asSignal()
event.emit(onNext: {print("yahibo")
    event.emit(onNext: {print("yahibo1")
    }).disposed(by: self.disposeBag)
}).disposed(by: disposeBag)复制代码

运行结果,每一个新序列都会在点击后触发。

七、ControlEvent

专门用于描述UI控件所产生的事件,不会产生error事件,在主线程中监听。代码如下:

1、监听点击事件

let event : ControlEvent= button.rx.tap.asControlEvent()
event.bind(onNext: {print("controllerEvent")
}).disposed(by: disposeBag)复制代码

2、监听点击事件并绑定数据到其他UI

let event : ControlEvent= button.rx.tap.asControlEvent()
event.map{"yahibo"}.bind(to: label1.rx.text).disposed(by: disposeBag)复制代码

总结:

以上序列都是基于Observable的,是对其更高层的封装,针对不同应用场景设计,简化不同场景下序列的使用流程。

标签:

给我留言

Copyright © ios教程,苹果粉丝,苹果资讯,ios入门教程,ios学习,ios程序员,ios视频教程,ios粉丝网 保留所有权利.   Theme  Ality

用户登录