diff --git a/README.md b/README.md index bbb62ff..8dc5919 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ let input = SignalProducer(values: strings.map { $0.data(using: . let task = Task("/usr/bin/sort") // Run the task, ignoring the output, and do something with the final result. -let result: Result? = launchTask(task, standardInput: input) +let result: Result? = task.launch(standardInput: input) .ignoreTaskData() .map { String(data: $0, encoding: .utf8) } .ignoreNil() @@ -16,7 +16,7 @@ print("Output of `\(task)`: \(result?.value ?? "")") // Start the task and print all the events, which includes all the output // that was received. -launchTask(task, standardInput: input) +task.launch(standardInput: input) .flatMapTaskEvents(.concat) { data in return SignalProducer(value: String(data: data, encoding: .utf8)) } diff --git a/ReactiveTask/Task.swift b/ReactiveTask/Task.swift index dc0c50b..67635b7 100644 --- a/ReactiveTask/Task.swift +++ b/ReactiveTask/Task.swift @@ -392,146 +392,147 @@ extension Signal where Value: TaskEventType { } } -/// Launches a new shell task. -/// -/// - Parameters: -/// - task: The task to launch. -/// - standardInput: Data to stream to standard input of the launched process. If nil, stdin will -/// be inherited from the parent process. -/// -/// - Returns: A producer that will launch the task when started, then send -/// `TaskEvent`s as execution proceeds. -public func launchTask(_ task: Task, standardInput: SignalProducer? = nil) -> SignalProducer, TaskError> { - return SignalProducer { observer, disposable in - let queue = DispatchQueue(label: task.description, attributes: []) - let group = Task.group - - let process = Process() - process.launchPath = task.launchPath - process.arguments = task.arguments - - if let cwd = task.workingDirectoryPath { - process.currentDirectoryPath = cwd - } +extension Task { + /// Launches a new shell task. + /// + /// - Parameters: + /// - standardInput: Data to stream to standard input of the launched process. If nil, stdin will + /// be inherited from the parent process. + /// + /// - Returns: A producer that will launch the receiver when started, then send + /// `TaskEvent`s as execution proceeds. + public func launch(standardInput: SignalProducer? = nil) -> SignalProducer, TaskError> { + return SignalProducer { observer, disposable in + let queue = DispatchQueue(label: self.description, attributes: []) + let group = Task.group - if let env = task.environment { - process.environment = env - } + let process = Process() + process.launchPath = self.launchPath + process.arguments = self.arguments + + if let cwd = self.workingDirectoryPath { + process.currentDirectoryPath = cwd + } - var stdinProducer: SignalProducer<(), TaskError> = .empty + if let env = self.environment { + process.environment = env + } - if let input = standardInput { - switch Pipe.create(queue, group) { - case let .success(pipe): - process.standardInput = pipe.readHandle + var stdinProducer: SignalProducer<(), TaskError> = .empty - stdinProducer = pipe.writeDataFromProducer(input).on(started: { - close(pipe.readFD) - }) + if let input = standardInput { + switch Pipe.create(queue, group) { + case let .success(pipe): + process.standardInput = pipe.readHandle - case let .failure(error): - observer.send(error: error) - return + stdinProducer = pipe.writeDataFromProducer(input).on(started: { + close(pipe.readFD) + }) + + case let .failure(error): + observer.send(error: error) + return + } } - } - SignalProducer(result: Pipe.create(queue, group) &&& Pipe.create(queue, group)) - .flatMap(.merge) { stdoutPipe, stderrPipe -> SignalProducer, TaskError> in - let stdoutProducer = stdoutPipe.transferReadsToProducer() - let stderrProducer = stderrPipe.transferReadsToProducer() - - enum Aggregation { - case value(Data) - case failed(TaskError) - case interrupted - - var producer: Pipe.ReadProducer { - switch self { - case let .value(data): - return .init(value: data) - case let .failed(error): - return .init(error: error) - case .interrupted: - return SignalProducer { observer, _ in - observer.sendInterrupted() + SignalProducer(result: Pipe.create(queue, group) &&& Pipe.create(queue, group)) + .flatMap(.merge) { stdoutPipe, stderrPipe -> SignalProducer, TaskError> in + let stdoutProducer = stdoutPipe.transferReadsToProducer() + let stderrProducer = stderrPipe.transferReadsToProducer() + + enum Aggregation { + case value(Data) + case failed(TaskError) + case interrupted + + var producer: Pipe.ReadProducer { + switch self { + case let .value(data): + return .init(value: data) + case let .failed(error): + return .init(error: error) + case .interrupted: + return SignalProducer { observer, _ in + observer.sendInterrupted() + } } } } - } - - return SignalProducer { observer, disposable in - func startAggregating(producer: Pipe.ReadProducer, chunk: @escaping (Data) -> TaskEvent) -> Pipe.ReadProducer { - let aggregated = MutableProperty(nil) - producer.startWithSignal { signal, signalDisposable in - disposable += signalDisposable + return SignalProducer { observer, disposable in + func startAggregating(producer: Pipe.ReadProducer, chunk: @escaping (Data) -> TaskEvent) -> Pipe.ReadProducer { + let aggregated = MutableProperty(nil) + + producer.startWithSignal { signal, signalDisposable in + disposable += signalDisposable + + var aggregate = Data() + signal.observe(Observer(value: { data in + observer.send(value: chunk(data)) + aggregate.append(data) + }, failed: { error in + observer.send(error: error) + aggregated.value = .failed(error) + }, completed: { + aggregated.value = .value(aggregate) + }, interrupted: { + aggregated.value = .interrupted + })) + } - var aggregate = Data() - signal.observe(Observer(value: { data in - observer.send(value: chunk(data)) - aggregate.append(data) - }, failed: { error in - observer.send(error: error) - aggregated.value = .failed(error) - }, completed: { - aggregated.value = .value(aggregate) - }, interrupted: { - aggregated.value = .interrupted - })) + return aggregated.producer + .skipNil() + .flatMap(.concat) { $0.producer } } - return aggregated.producer - .skipNil() - .flatMap(.concat) { $0.producer } - } - - let stdoutAggregated = startAggregating(producer: stdoutProducer, chunk: TaskEvent.standardOutput) - let stderrAggregated = startAggregating(producer: stderrProducer, chunk: TaskEvent.standardError) - - process.standardOutput = stdoutPipe.writeHandle - process.standardError = stderrPipe.writeHandle - - group.enter() - process.terminationHandler = { nstask in - let terminationStatus = nstask.terminationStatus - if terminationStatus == EXIT_SUCCESS { - // Wait for stderr to finish, then pass - // through stdout. - disposable += stderrAggregated - .then(stdoutAggregated) - .map(TaskEvent.success) - .start(observer) - } else { - // Wait for stdout to finish, then pass - // through stderr. - disposable += stdoutAggregated - .then(stderrAggregated) - .flatMap(.concat) { data -> SignalProducer, TaskError> in - let errorString = (data.count > 0 ? String(data: data, encoding: .utf8) : nil) - return SignalProducer(error: .shellTaskFailed(task, exitCode: terminationStatus, standardError: errorString)) - } - .start(observer) + let stdoutAggregated = startAggregating(producer: stdoutProducer, chunk: TaskEvent.standardOutput) + let stderrAggregated = startAggregating(producer: stderrProducer, chunk: TaskEvent.standardError) + + process.standardOutput = stdoutPipe.writeHandle + process.standardError = stderrPipe.writeHandle + + group.enter() + process.terminationHandler = { nstask in + let terminationStatus = nstask.terminationStatus + if terminationStatus == EXIT_SUCCESS { + // Wait for stderr to finish, then pass + // through stdout. + disposable += stderrAggregated + .then(stdoutAggregated) + .map(TaskEvent.success) + .start(observer) + } else { + // Wait for stdout to finish, then pass + // through stderr. + disposable += stdoutAggregated + .then(stderrAggregated) + .flatMap(.concat) { data -> SignalProducer, TaskError> in + let errorString = (data.count > 0 ? String(data: data, encoding: .utf8) : nil) + return SignalProducer(error: .shellTaskFailed(self, exitCode: terminationStatus, standardError: errorString)) + } + .start(observer) + } + group.leave() } - group.leave() - } - - observer.send(value: .launch(task)) - process.launch() - close(stdoutPipe.writeFD) - close(stderrPipe.writeFD) + + observer.send(value: .launch(self)) + process.launch() + close(stdoutPipe.writeFD) + close(stderrPipe.writeFD) - stdinProducer.startWithSignal { signal, signalDisposable in - disposable += signalDisposable - } + stdinProducer.startWithSignal { signal, signalDisposable in + disposable += signalDisposable + } - let _ = disposable.add { - process.terminate() + let _ = disposable.add { + process.terminate() + } } } - } - .startWithSignal { signal, taskDisposable in - disposable.add(taskDisposable) - signal.observe(observer) - } + .startWithSignal { signal, taskDisposable in + disposable.add(taskDisposable) + signal.observe(observer) + } + } } } diff --git a/ReactiveTaskTests/TaskSpec.swift b/ReactiveTaskTests/TaskSpec.swift index d77fbf4..9cfa722 100644 --- a/ReactiveTaskTests/TaskSpec.swift +++ b/ReactiveTaskTests/TaskSpec.swift @@ -19,7 +19,7 @@ class TaskSpec: QuickSpec { var isLaunched: Bool = false let task = Task("/usr/bin/true") - let result = launchTask(task) + let result = task.launch() .on(value: { event in if case let .launch(launched) = event { isLaunched = true @@ -33,7 +33,7 @@ class TaskSpec: QuickSpec { } it("should launch a task that writes to stdout") { - let result = launchTask(Task("/bin/echo", arguments: [ "foobar" ])) + let result = Task("/bin/echo", arguments: [ "foobar" ]).launch() .reduce(Data()) { aggregated, event in var mutableData = aggregated if case let .standardOutput(data) = event { @@ -52,7 +52,7 @@ class TaskSpec: QuickSpec { it("should launch a task that writes to stderr") { var aggregated = Data() - let result = launchTask(Task("/usr/bin/stat", arguments: [ "not-a-real-file" ])) + let result = Task("/usr/bin/stat", arguments: [ "not-a-real-file" ]).launch() .reduce(aggregated) { _, event in if case let .standardError(data) = event { aggregated.append(data) @@ -70,7 +70,7 @@ class TaskSpec: QuickSpec { let strings = [ "foo\n", "bar\n", "buzz\n", "fuzz\n" ] let data = strings.map { $0.data(using: .utf8)! } - let result = launchTask(Task("/usr/bin/sort"), standardInput: SignalProducer(values: data)) + let result = Task("/usr/bin/sort").launch(standardInput: SignalProducer(values: data)) .map { event in event.value } .skipNil() .single() @@ -83,7 +83,7 @@ class TaskSpec: QuickSpec { it("should error correctly") { let task = Task("/usr/bin/stat", arguments: [ "not-a-real-file" ]) - let result = launchTask(task) + let result = task.launch() .wait() expect(result).notTo(beNil())