Combine 框架,从0到1 —— 5.Combine 中的 Scheduler

| Swift , iOS , Combine

 

内容概览

  • 前言
  • receive(on:options:)
  • subscribe(on:options:)
  • 总结

 

前言

 

相比于其他的 Combine 知识点,Scheduler 是比较容易理解的,毕竟这其实就是常见的多线程调度。

本文的核心内容是讲解两个操作符(其实也是发布者):subscribe(on: )receiveOn(on: ),理解了这两个操作符的含义及用法,您也就理解了 Combine 中的多线程。

 

请注意,后续内容中出现的 cancellables 全部由这个类的实例提供 :

final class CombineSchedulersDemo {

    private var cancellables = Set<AnyCancellable>()

}

 

receive(on:options:)

官方文档:https://developer.apple.com/documentation/combine/publisher/receive(on:options:)

 

receive(on:options:) 操作符可以指定下游在指定的调度器上接收发布者发布的元素。

其中,on 标签处需要传递的 scheduler 参数是 Scheduler 协议类型,发布者会使用这个 scheduler 向下游发送元素。

苹果已经提供了很多预定义的 Scheduler,如:RunLoop, DispatchQueueOperationQueue,常用的 Scheduler 有:RunLoop.main, DispatchQueue.main, DispatchQueue.global(), OperationQueue.main

 

代码示例:

        let queue = DispatchQueue(label: "my_queue")
        let queueKey = DispatchSpecificKey<String>()
        queue.setSpecific(key: queueKey, value: queue.label)

        DispatchQueue.main.setSpecific(key: queueKey, value: DispatchQueue.main.label)

        queue.async {
            Future<Int, Never> { promise in
                print("Future queue:", DispatchQueue.getSpecific(key: queueKey) ?? "")
                promise(.success(1))
            }
            .map { value -> Int in
                print("map queue:", DispatchQueue.getSpecific(key: queueKey) ?? "")
                return value
            }
            .receive(on: DispatchQueue.main)
            .sink { value in
                print("sink queue:", DispatchQueue.getSpecific(key: queueKey) ?? "")
            }
            .store(in: &self.cancellables)
        }

输出内容:

Future queue: my_queue
map queue: my_queue
sink queue: com.apple.main-thread

如果去掉 .receive(on: queue) 这一行代码,输出内容将变为:

Future queue: my_queue
map queue: my_queue
sink queue: my_queue

根据输出内容可以看出,receive(on: ) 确实可以指定执行下游代码的线程。

 

receive(on:options:) 最常见的使用场景:

在后台线程执行耗时任务,完成后将结果汇总到 UI 主线程上。

 

示例代码:

    Future<Int, Never> { promise in
        DispatchQueue.global().async {
            // 耗时的后台任务
            // ...
            let result = 1
            promise(.success(result))
        }
    }
    .receive(on: DispatchQueue.main) // 切换到主线程
    .sink { value in
        // 在主线程处理耗时任务得到的结果
    }
    .store(in: &self.cancellables)

 

subscribe(on:options:)

官方文档:https://developer.apple.com/documentation/combine/anypublisher/subscribe(on:options:)

 

subscribe(on:options:) 操作符可以指定上游执行订阅、取消和请求操作的线程。

其中,on 标签处需要传递的 scheduler 参数是 Scheduler 协议类型,下游会使用这个 scheduler 向上游发送消息。

 

代码示例:

        let queue = DispatchQueue(label: "my_queue")
        let queueKey = DispatchSpecificKey<String>()
        queue.setSpecific(key: queueKey, value: queue.label)

        DispatchQueue.main.setSpecific(key: queueKey, value: DispatchQueue.main.label)

        queue.async {
            Future<Int, Never> { promise in
                print("Future queue:", DispatchQueue.getSpecific(key: queueKey) ?? "")
                promise(.success(1))
            }
            .map { value -> Int in
                print("map queue:", DispatchQueue.getSpecific(key: queueKey) ?? "")
                return value
            }
            .subscribe(on: DispatchQueue.main) // 对比上面的例子,只增加了这一行代码
            .receive(on: DispatchQueue.main)
            .sink { value in
                print("sink queue:", DispatchQueue.getSpecific(key: queueKey) ?? "")
            }
            .store(in: &self.cancellables)
        }

输出内容:

Future queue: my_queue
map queue: com.apple.main-thread
sink queue: com.apple.main-thread

如果没有 .subscribe(on: DispatchQueue.main) 这一行代码,输出内容为:

Future queue: my_queue
map queue: my_queue
sink queue: com.apple.main-thread

根据输出内容可以看出,subscribe(on: ) 确实改变了 map 中运行的线程,但是却没有改变 Future 中运行的线程!为什么???

 

让我们观察另一个示例:

        let queue = DispatchQueue(label: "my_queue")
        let queueKey = DispatchSpecificKey<String>()
        queue.setSpecific(key: queueKey, value: queue.label)

        DispatchQueue.main.setSpecific(key: queueKey, value: DispatchQueue.main.label)

        queue.async {
            Deferred<Future<Int, Never>> {
                print("Deferred queue:", DispatchQueue.getSpecific(key: queueKey) ?? "")
                return Future<Int, Never> { promise in
                    print("Future queue:", DispatchQueue.getSpecific(key: queueKey) ?? "")
                    promise(.success(1))
                }
            }
            .map { value -> Int in
                print("map queue:", DispatchQueue.getSpecific(key: queueKey) ?? "")
                return value
            }
            .subscribe(on: DispatchQueue.main)
            .receive(on: DispatchQueue.main)
            .sink { value in
                print("sink queue:", DispatchQueue.getSpecific(key: queueKey) ?? "")
            }
            .store(in: &self.cancellables)
        }

输出内容:

Deferred queue: com.apple.main-thread
Future queue: com.apple.main-thread
map queue: com.apple.main-thread
sink queue: com.apple.main-thread

咦,全部都是 com.apple.main-thread,也就是说 subscribe(on: ) 改变了 Deferred, Futuremap 中运行的线程!为什么?

因为,Future 会在创建时就执行传入的闭包,而 Deferred 会等到订阅者发送需求时才执行传入的闭包。

创建 Future 的操作是在 queue 中进行的,所以此时执行传入的闭包的线程就是 queue 提供的线程。

Deferred 在下游订阅者发送需求时就已经被指定了 subscribe(on: DispatchQueue.main),所以创建 Deferred 时传入的闭包会在 DispatchQueue.main 提供的线程中执行。

 

如果您想了解更多 Publishers 的用法和注意事项,可以阅读:Combine 框架,从0到1 —— 5.Combine 提供的发布者(Publishers)

 

总结

 

相比于 receive(on:options:)subscribe(on:options:) 较难掌握,而且不常用。

另外,由于上游发布者执行的操作可能已经被配置了更为复杂的异步操作,在下游执行 subscribe(on:options:) 可能并不能完全改变上游的执行线程。

 

参考内容:

receive(on:options:))
subscribe(on:options:))
Scheduler and Thread handling operators

 

觉得不错?点个赞呗~

本文链接:Combine 框架,从0到1 —— 5.Combine 中的 Scheduler

转载声明:本站文章如无特别说明,皆为原创。转载请注明:Ficow Shen's Blog

评论区(期待你的留言)