六游的博客小站
Combine核心概念
发布于: 2020-06-16 更新于: 2020-10-20 阅读次数: 

Combine简介

Combine是Apple在Swift语言内部实现的响应式异步编程的框架。核心角色由发布者、订阅者和操作符组成,事件由发布者发出,经过操作符的一系列变换,最后通知到订阅者处。

Publisher发布者

Publisher在Combine中是可以随着时间向一个或多个订阅者发送它们感兴趣的事件或者值的角色。

每一个Publisher都可以向外部发送三种事件

  1. 一个输出值,该值的类型由Pulisher中的泛型Output决定
  2. 一个成功的completion,代表该Publisher已经成功完成了自己的使命
  3. 一个失败的completion,其中携带者失败的错误信息,错误信息的类型由Publisher中的泛型Failure决定,代表该Publisher因为某种原因失败了,不得不结束自己的使命

在一个Publisher的生命周期中,可以向外部发送0个或者多个任意的值,但却只能发送一次completion事件(无论是成功还是失败),一旦completion事件被发送,就意味着该Publisher停止了工作,结束了自己的生命周期,之后再也不会发送任何事件或者值了

在Combine中使用了名为Publisher的协议来定义了Publisher角色的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protocol Publisher {
// Publisher所要输出的值的类型
associatedtype Output
// 用来表示一个Publisher失败的原因的类型,如果你觉得你所定义的Publisher永远不会出错,那么可以使用一个Never类型来指定Failure所代表的类型
associatedtype Failure:Error
// 提供一个使用订阅者订阅该发布者的方法
func receive<S>(subscriber: S) where S:Subscriber, Self.Failure==S.Failure, Self.OutPut==S.Intput
}
// 虽然在协议中定义了订阅该Publisher的方法接口
// 但实际上订阅者并不是通过该接口订阅Publisher的
// 而是使用另外扩展的一个方法
extension Publisher {
func subcribe<S>(subscriber: S) where S:Subscriber, Self.Failure==S.Failure, Self.OutPut==S.Intput
}

对于Publisher我们值得注意的一点是:对于一般的Publisher来说,如果没有任何订阅者订阅该Publisher,Publisher并不会处理之后出现的需要发送的事件或者值,也就是说一般的Publisher在没有订阅者订阅自己的情况下是不会发送任何消息的,这一点我们应该很容易理解,如果没有订阅者接受自己发送的消息,那么发送消息也将是无意义的。在Combine中定义的大多数Publisher如此,但也有例外,当我们接触到这一类Publisher的时候会特别的注明。

Subscriber订阅者

Subscriber在Combine中是一个从Publisher那里接受特定类型的事件值作为输入并对其做出反应的角色

在Combine中使用了名为Subscriber的协议来定义了Subscriber角色的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
protocol Subscriber: CustomCombineIndetifierConvertible {
// 订阅者接受的事件值类型
associatedtype Input
// 订阅者接受的错误类型
associatedtype Failure:Error

// 以下是定义了一些订阅过程中需要用到的方法
// 关于订阅过程我们会在下文中提及,这些方法同时也会再次被提起
// 所以现在暂时不用太过注意这些方法
func receive(subscription:Subscription)
func receive(_ input: Self.Input) -> Subscribers.Demand
func receive(completion: Subscribers.Completion<Self.Failure>)
}

如果一个Subscriber订阅了某个Publisher,那么该Subscriber接受的事件值类型Input应该与Publisher发布的事件值类型Output一致,他们的Failure类型也应一致

Combine中为我们提供了两种特别好用的内置的Subscriber:

第一种是sink,sink订阅者中存储用户提供的两个闭包,一个处理事件值,一个处理completion事件,订阅者会在接收到Publisher发出的事件之后调用对应的闭包方法,并把接收到的事件值或者completion值作为参数传给闭包。作为官方内置的订阅者,还拥有一些更加便捷的使用方法,我们通常使用Publisher的扩展方法sink来对某个发布者完成订阅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 创建一个sink订阅者
let sink = Sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: {value in
print(value)
})
// 调用方法使sink订阅者订阅某个发布者
somePublisher.subscribe(sink)
// 更为便捷的订阅方法
// 该方法内部的实质其实跟上面一样,创建实例,并进行属性赋值,最后订阅
_ = somePublisher.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: {value in
print(value)
})

第二种是assign,assign允许我们在接收到Publisher的事件值的时候,将值赋值给指定实例的指定属性。

1
2
3
4
5
// 传统订阅方法
let assign = Assign(object:someObj, keyPath:\.someProperty)
somePublisher.subscribe(sink)
// 使用系统提供的扩展方法便捷订阅
_ = somePublisher.assign(to: \.someProperty, on: someObj)

Subscription

Subscription是订阅关系的一个抽象,Publisher与Subscriber依靠他们之间的一个Subscription来交流订阅中所要用到的信息

我们从Subscription数据的流转去从宏观的角度看一个订阅的流程是如何运转的

"123"

  1. Subscriber调用Publisher的subscribe方法,告诉Publisher:我想要订阅你
  2. Publisher随后创建一个subscription实例,并调用Subscriber的receive(subscription:)方法,告诉Subscriber:我收到你的订阅了
  3. Subscriber收到传回来的subscription实例之后,调用subscription.request()方法,告诉Publisher:我要接受你的若干个事件值通知
  4. Publisher通过Subscriber的receive(_:)方法来告诉Subscriber:这是你想要的数据
  5. 一直重复步骤4,发送多个事件值通知
  6. 发布结束,Publisher通过Subscriber的receive(completion:)方法告诉Subscriber:发布已经结束

下面我们通过实现一个自定义的订阅者对象来进一步的理解Combine中订阅的流程,以及在这个流程中subscription所扮演的一个对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class CustomSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
//在Subscriber订阅了某一个Publisher之后,Publisher会首先调用该方法,通知监听关系已经建立
//Subscriber在此方法中,会开始向Publisher请求数据,并指定请求的次数
// 1
func receive(subscription:Subscription) {
// 2
subscription.request(.max(5))
}
//在Publisher产生了值之后,就会调用Subscriber的这个方法
// 3
func receive(_ input: Int) -> Subscribers.Demand {
print(input)
// 4
return .none
}
// 5
func receive(completion: Subscribers.Completion<Never>) {
print("complete!")
}
}
  1. Subscriber订阅了一个Publisher之后,Publisher创建一个subscription并通过Subscriber的receive(subscription:)方法将该subscription传给订阅者
  2. Subscriber拿到发布者创建的subscription之后,调用subscription的request方法告诉发布者我要接受你的若干个事件值的通知,在这里可以指定该Subscriber可以接受发布者事件值通知的个数,可以通过.max指定接受的最大值,通过.unlimited指定不限制接受的数量,通过.none表示不接受任何值,通常.none在这里是没有意义的,因为如果在这里你传入了.none则代表一旦开始订阅,订阅者就主动放弃了之后的所有订阅,这份订阅关系会立即被销毁
  3. Publisher会通过Subscriber的receive(_:)方法来讲自身内部所产生的事件值通知发送给Subscriber。在该方法中,订阅者可以完成对接收到的事件值通知的响应(比如当前这个例子中,订阅者会对没一个收到的事件值响应进行打印)
  4. 该方法有一个Subscribers.Demand类型的返回值,该类型与调用subscription的request方法时传入的参数是一致的,代表Subscriber可以接受发布者事件值通知的个数,这里的返回值表明你可以在不断地去改变更新这个次数。需要注意的是:该方法的返回值是会以叠加的方式作用到原有的次数之上的。
  5. 当Publisher中产生completion事件的时候,会通过Subscriber的receive(completion:)方法通知订阅者,订阅者可以在这个方法里面做出响应(本例中在接收到Publisher的completion的事件时会直接打印completion)

Cancellable

现在我们来开始讨论一组订阅通道的生命周期,一组订阅通道在什么情况下会被关闭并释放资源:

  1. 发布者出发了自身的completion事件,那么该属于发布者的所有订阅通道都会被关闭
  2. 订阅者收到的事件值通知的数量达到了自己可以接受的最大值的时候该订阅者会主动关闭自身的订阅通道。比如订阅者准备开始接受发布者值的时候调用requerst(.max(5))指定了自己最多可以接受5个事件值通知,那么在发布者连续发布了五个事件值通知的时候,该订阅通道就会自己关闭
  3. 显式的执行关闭订阅通道的逻辑

前两条很显然是发布者自身内部的逻辑,与我们几乎无关(除非你自定义发布者)。可以由我们来具体控制通道何时关闭的方法只有第三种。

Combine中使用Cancellable协议来规范这种行为,该协议中定义了一个cancel方法用于取消订阅流程

1
2
3
protocol Cancellable {
func cancel()
}

除此之外Combine还定义了AnyCancellable类,该类遵循了Cancellable协议,这个类的特点就是会在deinit的时候自动调用自身的cancel方法,并且提供了加入集合的能力

1
2
3
4
5
6
7
8
9
10
11
12
class AnyCancellable: Cancellable, Hashable {
// 该类中的cancel方法并没有实现任何实际的逻辑
// 只是会调用初始化传入的这个闭包
// 所以该类的实例的特定的cancel逻辑是由初始化时外部参数决定的
init(() -> Void) {
// ...
}
}
extension AnyCancellable {
// 提供了加入集合的能力
final public func store(in set: inout Set<AnyCancellable>)
}

Combine中内置的两种订阅者Sink和Assign内部都实现了Cancellable协议,并且当我们通过发布者的扩展方法sink以及assign建立订阅关系的时候,会返回一个AnyCancellable实例

由于AnyCancellable的deinit时会自动调用cancel方法,所以在我们调用sink以及assign方法之后,如果不将其返回值存储起来保留引用计数,那么我们通过sink或者assign方法建立的订阅关系会在代码执行脱离当前域之后立马被cancel

我们通常的做法是将AnyCancellable对象保存为当前的类实例属性,这样当该类被deinit时也会触发属性的deinit,自动释放该类中所拥有的订阅通道。当一个类会发生多个订阅事件的时候,可以使用AnyCancellable的store方法所提供的加入集合的能力,设置一个集合作为该类的一个实例属性,并把所有订阅产生的AnyCancellable实例添加到这个集合中,当前类执行deinit的时候,会执行集合的deinit,集合的deinit中又会一一执行其所拥有元素的deinit,这样就实现了多个订阅通道的自动关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 在一个类中保持订阅关系的方法
// 方法一:将订阅返回的AnyCancellable作为实例属性存储,类deinit是会自动调用实例中存储的AnyCancellable属性的deinit方法
class MyClass {
let cancellable: AnyCancellable

init() {
self.cancellable = somePublisher.sink(receiveValue: { value in
print(value)
})
}
}
// 方法二:对订阅返回的AnyCancellable调用store方法,将其添加到类中的set中去。当类deinit时,会自动调用set的deinit方法,而set的deinit方法又会调用其内部存储的所有的AnyCancellable的deinit方法
class MyClass {
var cancelSet = Set<AnyCancellable>()

func someFunc() {
_ = somePublisher.sink(receiveValue: { value in
print(value)
}).store(in: &cancelSet)
}
}

类型擦除

我们在使用Combine编写封装功能模块并在最后将Puiblisher作为对外的接口将数据提供出去的时候,常常会希望不要将Publisher的一些细节(比如类型)暴露出去,因为很多情况下Publisher的一些额外的细节信息并不能给我们的调用提供额外的帮助,反而会在我们从外部进行调用的时候分散我们的注意力,而且使得从外部对内部封装做出一些不合规范的改变成为了可能。而类型擦除就是用来避免这样的情况的。类型擦除将我们的任意一个Publisher封装为一个AnyPublisher结构体,该结构体仅实现了Publisher协议。通过AnyPublisher可以隐藏掉Publisher中你不想暴漏给订阅者的实现细节

eraseToAnyPublisher()是一个操作符,在Publisher之后调用这个方法,会返回一个封装了这个Publisher对象的AnyPublisher实例。

1
2
3
4
5
6
7
8
9
let result = someSubject
.map{ $0 * 2}
.eraseToAnyPublisher()
// 类型擦出之后我们还可以正常地订阅发布者
_ = result.sink {val in
print(val)
}.store(in: &subscriptions)
// 但是发布者背后的subject被隐藏了,调用subject的send方法会出错
result.send(2)
--- 本文结束 The End ---