-
-
Notifications
You must be signed in to change notification settings - Fork 7.6k
/
Copy pathDataStreamRequest.swift
590 lines (510 loc) · 25.7 KB
/
DataStreamRequest.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
//
// DataStreamRequest.swift
//
// Copyright (c) 2014-2024 Alamofire Software Foundation (http://alamofire.org/)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//
import Foundation
/// `Request` subclass which streams HTTP response `Data` through a `Handler` closure.
public final class DataStreamRequest: Request, @unchecked Sendable {
/// Closure type handling `DataStreamRequest.Stream` values.
public typealias Handler<Success, Failure: Error> = @Sendable (Stream<Success, Failure>) throws -> Void
/// Type encapsulating an `Event` as it flows through the stream, as well as a `CancellationToken` which can be used
/// to stop the stream at any time.
public struct Stream<Success, Failure: Error>: Sendable where Success: Sendable, Failure: Sendable {
/// Latest `Event` from the stream.
public let event: Event<Success, Failure>
/// Token used to cancel the stream.
public let token: CancellationToken
/// Cancel the ongoing stream by canceling the underlying `DataStreamRequest`.
public func cancel() {
token.cancel()
}
}
/// Type representing an event flowing through the stream. Contains either the `Result` of processing streamed
/// `Data` or the completion of the stream.
public enum Event<Success, Failure: Error>: Sendable where Success: Sendable, Failure: Sendable {
/// Output produced every time the instance receives additional `Data`. The associated value contains the
/// `Result` of processing the incoming `Data`.
case stream(Result<Success, Failure>)
/// Output produced when the instance has completed, whether due to stream end, cancellation, or an error.
/// Associated `Completion` value contains the final state.
case complete(Completion)
}
/// Value containing the state of a `DataStreamRequest` when the stream was completed.
public struct Completion: Sendable {
/// Last `URLRequest` issued by the instance.
public let request: URLRequest?
/// Last `HTTPURLResponse` received by the instance.
public let response: HTTPURLResponse?
/// Last `URLSessionTaskMetrics` produced for the instance.
public let metrics: URLSessionTaskMetrics?
/// `AFError` produced for the instance, if any.
public let error: AFError?
}
/// Type used to cancel an ongoing stream.
public struct CancellationToken: Sendable {
weak var request: DataStreamRequest?
init(_ request: DataStreamRequest) {
self.request = request
}
/// Cancel the ongoing stream by canceling the underlying `DataStreamRequest`.
public func cancel() {
request?.cancel()
}
}
/// `URLRequestConvertible` value used to create `URLRequest`s for this instance.
public let convertible: any URLRequestConvertible
/// Whether or not the instance will be cancelled if stream parsing encounters an error.
public let automaticallyCancelOnStreamError: Bool
/// Internal mutable state specific to this type.
struct StreamMutableState {
/// `OutputStream` bound to the `InputStream` produced by `asInputStream`, if it has been called.
var outputStream: OutputStream?
/// Stream closures called as `Data` is received.
var streams: [@Sendable (_ data: Data) -> Void] = []
/// Number of currently executing streams. Used to ensure completions are only fired after all streams are
/// enqueued.
var numberOfExecutingStreams = 0
/// Completion calls enqueued while streams are still executing.
var enqueuedCompletionEvents: [@Sendable () -> Void] = []
/// Handler for any `HTTPURLResponse`s received.
var httpResponseHandler: (queue: DispatchQueue,
handler: @Sendable (_ response: HTTPURLResponse,
_ completionHandler: @escaping @Sendable (ResponseDisposition) -> Void) -> Void)?
}
let streamMutableState = Protected(StreamMutableState())
/// Creates a `DataStreamRequest` using the provided parameters.
///
/// - Parameters:
/// - id: `UUID` used for the `Hashable` and `Equatable` implementations. `UUID()`
/// by default.
/// - convertible: `URLRequestConvertible` value used to create `URLRequest`s for this
/// instance.
/// - automaticallyCancelOnStreamError: `Bool` indicating whether the instance will be cancelled when an `Error`
/// is thrown while serializing stream `Data`.
/// - underlyingQueue: `DispatchQueue` on which all internal `Request` work is performed.
/// - serializationQueue: `DispatchQueue` on which all serialization work is performed. By default
/// targets
/// `underlyingQueue`, but can be passed another queue from a `Session`.
/// - eventMonitor: `EventMonitor` called for event callbacks from internal `Request` actions.
/// - interceptor: `RequestInterceptor` used throughout the request lifecycle.
/// - delegate: `RequestDelegate` that provides an interface to actions not performed by
/// the `Request`.
init(id: UUID = UUID(),
convertible: any URLRequestConvertible,
automaticallyCancelOnStreamError: Bool,
underlyingQueue: DispatchQueue,
serializationQueue: DispatchQueue,
eventMonitor: (any EventMonitor)?,
interceptor: (any RequestInterceptor)?,
delegate: any RequestDelegate) {
self.convertible = convertible
self.automaticallyCancelOnStreamError = automaticallyCancelOnStreamError
super.init(id: id,
underlyingQueue: underlyingQueue,
serializationQueue: serializationQueue,
eventMonitor: eventMonitor,
interceptor: interceptor,
delegate: delegate)
}
override func task(for request: URLRequest, using session: URLSession) -> URLSessionTask {
let copiedRequest = request
return session.dataTask(with: copiedRequest)
}
override func finish(error: AFError? = nil) {
streamMutableState.write { state in
state.outputStream?.close()
}
super.finish(error: error)
}
func didReceive(data: Data) {
streamMutableState.write { state in
#if !canImport(FoundationNetworking) // If we not using swift-corelibs-foundation.
if let stream = state.outputStream {
underlyingQueue.async {
var bytes = Array(data)
stream.write(&bytes, maxLength: bytes.count)
}
}
#endif
state.numberOfExecutingStreams += state.streams.count
underlyingQueue.async { [streams = state.streams] in streams.forEach { $0(data) } }
}
}
func didReceiveResponse(_ response: HTTPURLResponse, completionHandler: @escaping @Sendable (URLSession.ResponseDisposition) -> Void) {
streamMutableState.read { dataMutableState in
guard let httpResponseHandler = dataMutableState.httpResponseHandler else {
underlyingQueue.async { completionHandler(.allow) }
return
}
httpResponseHandler.queue.async {
httpResponseHandler.handler(response) { disposition in
if disposition == .cancel {
self.mutableState.write { mutableState in
mutableState.state = .cancelled
mutableState.error = mutableState.error ?? AFError.explicitlyCancelled
}
}
self.underlyingQueue.async {
completionHandler(disposition.sessionDisposition)
}
}
}
}
}
/// Validates the `URLRequest` and `HTTPURLResponse` received for the instance using the provided `Validation` closure.
///
/// - Parameter validation: `Validation` closure used to validate the request and response.
///
/// - Returns: The `DataStreamRequest`.
@discardableResult
public func validate(_ validation: @escaping Validation) -> Self {
let validator: @Sendable () -> Void = { [unowned self] in
guard error == nil, let response else { return }
let result = validation(request, response)
if case let .failure(error) = result {
self.error = error.asAFError(or: .responseValidationFailed(reason: .customValidationFailed(error: error)))
}
eventMonitor?.request(self,
didValidateRequest: request,
response: response,
withResult: result)
}
validators.write { $0.append(validator) }
return self
}
#if !canImport(FoundationNetworking) // If we not using swift-corelibs-foundation.
/// Produces an `InputStream` that receives the `Data` received by the instance.
///
/// - Note: The `InputStream` produced by this method must have `open()` called before being able to read `Data`.
/// Additionally, this method will automatically call `resume()` on the instance, regardless of whether or
/// not the creating session has `startRequestsImmediately` set to `true`.
///
/// - Parameter bufferSize: Size, in bytes, of the buffer between the `OutputStream` and `InputStream`.
///
/// - Returns: The `InputStream` bound to the internal `OutboundStream`.
public func asInputStream(bufferSize: Int = 1024) -> InputStream? {
defer { resume() }
var inputStream: InputStream?
streamMutableState.write { state in
Foundation.Stream.getBoundStreams(withBufferSize: bufferSize,
inputStream: &inputStream,
outputStream: &state.outputStream)
state.outputStream?.open()
}
return inputStream
}
#endif
/// Sets a closure called whenever the `DataRequest` produces an `HTTPURLResponse` and providing a completion
/// handler to return a `ResponseDisposition` value.
///
/// - Parameters:
/// - queue: `DispatchQueue` on which the closure will be called. `.main` by default.
/// - handler: Closure called when the instance produces an `HTTPURLResponse`. The `completionHandler` provided
/// MUST be called, otherwise the request will never complete.
///
/// - Returns: The instance.
@_disfavoredOverload
@preconcurrency
@discardableResult
public func onHTTPResponse(
on queue: DispatchQueue = .main,
perform handler: @escaping @Sendable (_ response: HTTPURLResponse,
_ completionHandler: @escaping @Sendable (ResponseDisposition) -> Void) -> Void
) -> Self {
streamMutableState.write { mutableState in
mutableState.httpResponseHandler = (queue, handler)
}
return self
}
/// Sets a closure called whenever the `DataRequest` produces an `HTTPURLResponse`.
///
/// - Parameters:
/// - queue: `DispatchQueue` on which the closure will be called. `.main` by default.
/// - handler: Closure called when the instance produces an `HTTPURLResponse`.
///
/// - Returns: The instance.
@preconcurrency
@discardableResult
public func onHTTPResponse(on queue: DispatchQueue = .main,
perform handler: @escaping @Sendable (HTTPURLResponse) -> Void) -> Self {
onHTTPResponse(on: queue) { response, completionHandler in
handler(response)
completionHandler(.allow)
}
return self
}
func capturingError(from closure: () throws -> Void) {
do {
try closure()
} catch {
self.error = error.asAFError(or: .responseSerializationFailed(reason: .customSerializationFailed(error: error)))
cancel()
}
}
func appendStreamCompletion<Success, Failure>(on queue: DispatchQueue,
stream: @escaping Handler<Success, Failure>) where Success: Sendable, Failure: Sendable {
appendResponseSerializer {
self.underlyingQueue.async {
self.responseSerializerDidComplete {
self.streamMutableState.write { state in
guard state.numberOfExecutingStreams == 0 else {
state.enqueuedCompletionEvents.append {
self.enqueueCompletion(on: queue, stream: stream)
}
return
}
self.enqueueCompletion(on: queue, stream: stream)
}
}
}
}
}
func enqueueCompletion<Success, Failure>(on queue: DispatchQueue,
stream: @escaping Handler<Success, Failure>) where Success: Sendable, Failure: Sendable {
queue.async {
do {
let completion = Completion(request: self.request,
response: self.response,
metrics: self.metrics,
error: self.error)
try stream(.init(event: .complete(completion), token: .init(self)))
} catch {
// Ignore error, as errors on Completion can't be handled anyway.
}
}
}
// MARK: Response Serialization
/// Adds a `StreamHandler` which performs no parsing on incoming `Data`.
///
/// - Parameters:
/// - queue: `DispatchQueue` on which to perform `StreamHandler` closure.
/// - stream: `StreamHandler` closure called as `Data` is received. May be called multiple times.
///
/// - Returns: The `DataStreamRequest`.
@preconcurrency
@discardableResult
public func responseStream(on queue: DispatchQueue = .main, stream: @escaping Handler<Data, Never>) -> Self {
let parser = { @Sendable [unowned self] (data: Data) in
queue.async {
self.capturingError {
try stream(.init(event: .stream(.success(data)), token: .init(self)))
}
self.updateAndCompleteIfPossible()
}
}
streamMutableState.write { $0.streams.append(parser) }
appendStreamCompletion(on: queue, stream: stream)
return self
}
/// Adds a `StreamHandler` which uses the provided `DataStreamSerializer` to process incoming `Data`.
///
/// - Parameters:
/// - serializer: `DataStreamSerializer` used to process incoming `Data`. Its work is done on the `serializationQueue`.
/// - queue: `DispatchQueue` on which to perform `StreamHandler` closure.
/// - stream: `StreamHandler` closure called as `Data` is received. May be called multiple times.
///
/// - Returns: The `DataStreamRequest`.
@preconcurrency
@discardableResult
public func responseStream<Serializer: DataStreamSerializer>(using serializer: Serializer,
on queue: DispatchQueue = .main,
stream: @escaping Handler<Serializer.SerializedObject, AFError>) -> Self {
let parser = { @Sendable [unowned self] (data: Data) in
serializationQueue.async {
// Start work on serialization queue.
let result = Result { try serializer.serialize(data) }
.mapError { $0.asAFError(or: .responseSerializationFailed(reason: .customSerializationFailed(error: $0))) }
// End work on serialization queue.
self.underlyingQueue.async {
self.eventMonitor?.request(self, didParseStream: result)
if result.isFailure, self.automaticallyCancelOnStreamError {
self.cancel()
}
queue.async {
self.capturingError {
try stream(.init(event: .stream(result), token: .init(self)))
}
self.updateAndCompleteIfPossible()
}
}
}
}
streamMutableState.write { $0.streams.append(parser) }
appendStreamCompletion(on: queue, stream: stream)
return self
}
/// Adds a `StreamHandler` which parses incoming `Data` as a UTF8 `String`.
///
/// - Parameters:
/// - queue: `DispatchQueue` on which to perform `StreamHandler` closure.
/// - stream: `StreamHandler` closure called as `Data` is received. May be called multiple times.
///
/// - Returns: The `DataStreamRequest`.
@preconcurrency
@discardableResult
public func responseStreamString(on queue: DispatchQueue = .main,
stream: @escaping Handler<String, Never>) -> Self {
let parser = { @Sendable [unowned self] (data: Data) in
serializationQueue.async {
// Start work on serialization queue.
let string = String(decoding: data, as: UTF8.self)
// End work on serialization queue.
self.underlyingQueue.async {
self.eventMonitor?.request(self, didParseStream: .success(string))
queue.async {
self.capturingError {
try stream(.init(event: .stream(.success(string)), token: .init(self)))
}
self.updateAndCompleteIfPossible()
}
}
}
}
streamMutableState.write { $0.streams.append(parser) }
appendStreamCompletion(on: queue, stream: stream)
return self
}
private func updateAndCompleteIfPossible() {
streamMutableState.write { state in
state.numberOfExecutingStreams -= 1
guard state.numberOfExecutingStreams == 0, !state.enqueuedCompletionEvents.isEmpty else { return }
let completionEvents = state.enqueuedCompletionEvents
self.underlyingQueue.async { completionEvents.forEach { $0() } }
state.enqueuedCompletionEvents.removeAll()
}
}
/// Adds a `StreamHandler` which parses incoming `Data` using the provided `DataDecoder`.
///
/// - Parameters:
/// - type: `Decodable` type to parse incoming `Data` into.
/// - queue: `DispatchQueue` on which to perform `StreamHandler` closure.
/// - decoder: `DataDecoder` used to decode the incoming `Data`.
/// - preprocessor: `DataPreprocessor` used to process the incoming `Data` before it's passed to the `decoder`.
/// - stream: `StreamHandler` closure called as `Data` is received. May be called multiple times.
///
/// - Returns: The `DataStreamRequest`.
@preconcurrency
@discardableResult
public func responseStreamDecodable<T: Decodable>(of type: T.Type = T.self,
on queue: DispatchQueue = .main,
using decoder: any DataDecoder = JSONDecoder(),
preprocessor: any DataPreprocessor = PassthroughPreprocessor(),
stream: @escaping Handler<T, AFError>) -> Self where T: Sendable {
responseStream(using: DecodableStreamSerializer<T>(decoder: decoder, dataPreprocessor: preprocessor),
on: queue,
stream: stream)
}
}
extension DataStreamRequest.Stream {
/// Incoming `Result` values from `Event.stream`.
public var result: Result<Success, Failure>? {
guard case let .stream(result) = event else { return nil }
return result
}
/// `Success` value of the instance, if any.
public var value: Success? {
guard case let .success(value) = result else { return nil }
return value
}
/// `Failure` value of the instance, if any.
public var error: Failure? {
guard case let .failure(error) = result else { return nil }
return error
}
/// `Completion` value of the instance, if any.
public var completion: DataStreamRequest.Completion? {
guard case let .complete(completion) = event else { return nil }
return completion
}
}
// MARK: - Serialization
/// A type which can serialize incoming `Data`.
public protocol DataStreamSerializer: Sendable {
/// Type produced from the serialized `Data`.
associatedtype SerializedObject: Sendable
/// Serializes incoming `Data` into a `SerializedObject` value.
///
/// - Parameter data: `Data` to be serialized.
///
/// - Throws: Any error produced during serialization.
func serialize(_ data: Data) throws -> SerializedObject
}
/// `DataStreamSerializer` which uses the provided `DataPreprocessor` and `DataDecoder` to serialize the incoming `Data`.
public struct DecodableStreamSerializer<T: Decodable>: DataStreamSerializer where T: Sendable {
/// `DataDecoder` used to decode incoming `Data`.
public let decoder: any DataDecoder
/// `DataPreprocessor` incoming `Data` is passed through before being passed to the `DataDecoder`.
public let dataPreprocessor: any DataPreprocessor
/// Creates an instance with the provided `DataDecoder` and `DataPreprocessor`.
/// - Parameters:
/// - decoder: ` DataDecoder` used to decode incoming `Data`. `JSONDecoder()` by default.
/// - dataPreprocessor: `DataPreprocessor` used to process incoming `Data` before it's passed through the
/// `decoder`. `PassthroughPreprocessor()` by default.
public init(decoder: any DataDecoder = JSONDecoder(), dataPreprocessor: any DataPreprocessor = PassthroughPreprocessor()) {
self.decoder = decoder
self.dataPreprocessor = dataPreprocessor
}
public func serialize(_ data: Data) throws -> T {
let processedData = try dataPreprocessor.preprocess(data)
do {
return try decoder.decode(T.self, from: processedData)
} catch {
throw AFError.responseSerializationFailed(reason: .decodingFailed(error: error))
}
}
}
/// `DataStreamSerializer` which performs no serialization on incoming `Data`.
public struct PassthroughStreamSerializer: DataStreamSerializer {
/// Creates an instance.
public init() {}
public func serialize(_ data: Data) throws -> Data { data }
}
/// `DataStreamSerializer` which serializes incoming stream `Data` into `UTF8`-decoded `String` values.
public struct StringStreamSerializer: DataStreamSerializer {
/// Creates an instance.
public init() {}
public func serialize(_ data: Data) throws -> String {
String(decoding: data, as: UTF8.self)
}
}
extension DataStreamSerializer {
/// Creates a `DecodableStreamSerializer` instance with the provided `DataDecoder` and `DataPreprocessor`.
///
/// - Parameters:
/// - type: `Decodable` type to decode from stream data.
/// - decoder: ` DataDecoder` used to decode incoming `Data`. `JSONDecoder()` by default.
/// - dataPreprocessor: `DataPreprocessor` used to process incoming `Data` before it's passed through the
/// `decoder`. `PassthroughPreprocessor()` by default.
public static func decodable<T: Decodable>(of type: T.Type,
decoder: any DataDecoder = JSONDecoder(),
dataPreprocessor: any DataPreprocessor = PassthroughPreprocessor()) -> Self where Self == DecodableStreamSerializer<T> {
DecodableStreamSerializer<T>(decoder: decoder, dataPreprocessor: dataPreprocessor)
}
}
extension DataStreamSerializer where Self == PassthroughStreamSerializer {
/// Provides a `PassthroughStreamSerializer` instance.
public static var passthrough: PassthroughStreamSerializer { PassthroughStreamSerializer() }
}
extension DataStreamSerializer where Self == StringStreamSerializer {
/// Provides a `StringStreamSerializer` instance.
public static var string: StringStreamSerializer { StringStreamSerializer() }
}