Operator之时间控制(Controlling timing)
debounce
每间隔一定事件进行一次数据流处理,使用场景:
1. 处理搜索框过于频繁发起网络请求的问题,每当用户输入一个字符的时候,都发起网络请求,会浪费一部分网络资源,通过
debounce
,可以实现,当用户停止输入0.5秒再发送请求。2. 处理按钮的连续点击问题,
debounce
只接收0.5秒后的最后一次点击事件,因此自动忽略了中间的多次连续点击事件
let bounces:[(Int,TimeInterval)] = [
(0, 0),
(1, 0.2), // 0.2s interval since last index
(2, 1), // 0.7s interval since last index
(3, 1.2), // 0.2s interval since last index
(4, 1.5), // 0.2s interval since last index
(5, 2) // 0.5s interval since last index
]
let subject = PassthroughSubject<Int, Never>()
cancellable = subject
.debounce(for: .seconds(0.5), scheduler: RunLoop.main)
.sink { index in
print ("Received index \(index)")
}
for bounce in bounces {
DispatchQueue.main.asyncAfter(deadline: .now() + bounce.1) {
subject.send(bounce.0)
}
}
delay
延迟:在特定调度程序上将所有输出延迟指定时间量到下游接收器。
let df = DateFormatter()
df.dateStyle = .none
df.timeStyle = .long
cancellable = Timer.publish(every: 1.0, on: .main, in: .default)
.autoconnect()
.handleEvents(receiveOutput: { date in
print ("Sending Timestamp \'\(df.string(from: date))\' to delay()")
})
.delay(for: .seconds(3), scheduler: RunLoop.main, options: .none)
.sink(
receiveCompletion: { print ("completion: \($0)", terminator: "\n") },
receiveValue: { value in
let now = Date()
print ("At \(df.string(from: now)) received Timestamp \'\(df.string(from: value))\' sent: \(String(format: "%.2f", now.timeIntervalSince(value))) secs ago", terminator: "\n")
}
)
// Prints:
// Sending Timestamp '5:02:33 PM PDT' to delay()
// Sending Timestamp '5:02:34 PM PDT' to delay()
// Sending Timestamp '5:02:35 PM PDT' to delay()
// Sending Timestamp '5:02:36 PM PDT' to delay()
// At 5:02:36 PM PDT received Timestamp '5:02:33 PM PDT' sent: 3.00 secs ago
// Sending Timestamp '5:02:37 PM PDT' to delay()
// At 5:02:37 PM PDT received Timestamp '5:02:34 PM PDT' sent: 3.00 secs ago
// Sending Timestamp '5:02:38 PM PDT' to delay()
// At 5:02:38 PM PDT received Timestamp '5:02:35 PM PDT' sent: 3.00 secs ago
measureInterval
measureInterval用于记录publisher发送数据的间隔时间。
let bounces:[(Int,TimeInterval)] = [
(0, 0.2),
(1, 1.5)
]
let subject = PassthroughSubject<Int, Never>()
cancellable = subject
.measureInterval(using: RunLoop.main)
.sink { print($0) }
for bounce in bounces {
DispatchQueue.main.asyncAfter(deadline: .now() + bounce.1) {
subject.send(bounce.0)
}
}
打印结果:
Stride(magnitude: 0.25349903106689453)
Stride(magnitude: 1.2467479705810547)
throttle
设定一个固定时间间隔的时间窗口,当时间窗口结束后发送数据,throttle
按照固定的顺序排列时间窗口,在时间窗口的结尾处发送数据,而debounce
每次接收到新数据,都会重新开启一个新的时间窗口,同时取消之前的时间窗口。
let bounces:[(Int,TimeInterval)] = [
(1, 0.2),
(2, 1),
(3, 1.2),
(4, 1.4),
]
let subject = PassthroughSubject<Int, Never>()
cancellable = subject
.throttle(for: 0.5,
scheduler: RunLoop.main,
latest: true)
.sink { index in
print ("Received index \(index) in \(Date().timeIntervalSince1970)")
}
for bounce in bounces {
DispatchQueue.main.asyncAfter(deadline: .now() + bounce.1) {
subject.send(bounce.0)
}
}
timeout
timeout
用于设置pipline的超时时间
let remoteDataPublisher = urlSession.dataTaskPublisher(for: self.mockURL!)
.delay(for: 2, scheduler: backgroundQueue)
.retry(5) // 5 retries, 2 seconds each ~ 10 seconds for this to fall through
.timeout(5, scheduler: backgroundQueue) // max time of 5 seconds before failing
.tryMap { data, response -> Data in
guard let httpResponse = response as? HTTPURLResponse,
httpResponse.statusCode == 200 else {
throw TestFailureCondition.invalidServerResponse
}
return data
}
.decode(type: PostmanEchoTimeStampCheckResponse.self, decoder: JSONDecoder())
.subscribe(on: backgroundQueue)
.eraseToAnyPublisher()