Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BREAKING] Change launchTask to Task.launch #76

Merged
merged 1 commit into from
Sep 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ let input = SignalProducer<Data, NoError>(values: strings.map { $0.data(using: .
let task = Task("/usr/bin/sort")

// Run the task, ignoring the output, and do something with the final result.
let result: Result<String, TaskError>? = launchTask(task, standardInput: input)
let result: Result<String, TaskError>? = task.launch(standardInput: input)
.ignoreTaskData()
.map { String(data: $0, encoding: .utf8) }
.ignoreNil()
Expand All @@ -16,7 +16,7 @@ print("Output of `\(task)`: \(result?.value ?? "")")

// Start the task and print all the events, which includes all the output
// that was received.
launchTask(task, standardInput: input)
task.launch(standardInput: input)
.flatMapTaskEvents(.concat) { data in
return SignalProducer(value: String(data: data, encoding: .utf8))
}
Expand Down
245 changes: 123 additions & 122 deletions ReactiveTask/Task.swift
Original file line number Diff line number Diff line change
Expand Up @@ -392,146 +392,147 @@ extension Signal where Value: TaskEventType {
}
}

/// Launches a new shell task.
///
/// - Parameters:
/// - task: The task to launch.
/// - standardInput: Data to stream to standard input of the launched process. If nil, stdin will
/// be inherited from the parent process.
///
/// - Returns: A producer that will launch the task when started, then send
/// `TaskEvent`s as execution proceeds.
public func launchTask(_ task: Task, standardInput: SignalProducer<Data, NoError>? = nil) -> SignalProducer<TaskEvent<Data>, TaskError> {
return SignalProducer { observer, disposable in
let queue = DispatchQueue(label: task.description, attributes: [])
let group = Task.group

let process = Process()
process.launchPath = task.launchPath
process.arguments = task.arguments

if let cwd = task.workingDirectoryPath {
process.currentDirectoryPath = cwd
}
extension Task {
/// Launches a new shell task.
///
/// - Parameters:
/// - standardInput: Data to stream to standard input of the launched process. If nil, stdin will
/// be inherited from the parent process.
///
/// - Returns: A producer that will launch the receiver when started, then send
/// `TaskEvent`s as execution proceeds.
public func launch(standardInput: SignalProducer<Data, NoError>? = nil) -> SignalProducer<TaskEvent<Data>, TaskError> {
return SignalProducer { observer, disposable in
let queue = DispatchQueue(label: self.description, attributes: [])
let group = Task.group

if let env = task.environment {
process.environment = env
}
let process = Process()
process.launchPath = self.launchPath
process.arguments = self.arguments

if let cwd = self.workingDirectoryPath {
process.currentDirectoryPath = cwd
}

var stdinProducer: SignalProducer<(), TaskError> = .empty
if let env = self.environment {
process.environment = env
}

if let input = standardInput {
switch Pipe.create(queue, group) {
case let .success(pipe):
process.standardInput = pipe.readHandle
var stdinProducer: SignalProducer<(), TaskError> = .empty

stdinProducer = pipe.writeDataFromProducer(input).on(started: {
close(pipe.readFD)
})
if let input = standardInput {
switch Pipe.create(queue, group) {
case let .success(pipe):
process.standardInput = pipe.readHandle

case let .failure(error):
observer.send(error: error)
return
stdinProducer = pipe.writeDataFromProducer(input).on(started: {
close(pipe.readFD)
})

case let .failure(error):
observer.send(error: error)
return
}
}
}

SignalProducer(result: Pipe.create(queue, group) &&& Pipe.create(queue, group))
.flatMap(.merge) { stdoutPipe, stderrPipe -> SignalProducer<TaskEvent<Data>, TaskError> in
let stdoutProducer = stdoutPipe.transferReadsToProducer()
let stderrProducer = stderrPipe.transferReadsToProducer()

enum Aggregation {
case value(Data)
case failed(TaskError)
case interrupted

var producer: Pipe.ReadProducer {
switch self {
case let .value(data):
return .init(value: data)
case let .failed(error):
return .init(error: error)
case .interrupted:
return SignalProducer { observer, _ in
observer.sendInterrupted()
SignalProducer(result: Pipe.create(queue, group) &&& Pipe.create(queue, group))
.flatMap(.merge) { stdoutPipe, stderrPipe -> SignalProducer<TaskEvent<Data>, TaskError> in
let stdoutProducer = stdoutPipe.transferReadsToProducer()
let stderrProducer = stderrPipe.transferReadsToProducer()

enum Aggregation {
case value(Data)
case failed(TaskError)
case interrupted

var producer: Pipe.ReadProducer {
switch self {
case let .value(data):
return .init(value: data)
case let .failed(error):
return .init(error: error)
case .interrupted:
return SignalProducer { observer, _ in
observer.sendInterrupted()
}
}
}
}
}

return SignalProducer { observer, disposable in
func startAggregating(producer: Pipe.ReadProducer, chunk: @escaping (Data) -> TaskEvent<Data>) -> Pipe.ReadProducer {
let aggregated = MutableProperty<Aggregation?>(nil)

producer.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
return SignalProducer { observer, disposable in
func startAggregating(producer: Pipe.ReadProducer, chunk: @escaping (Data) -> TaskEvent<Data>) -> Pipe.ReadProducer {
let aggregated = MutableProperty<Aggregation?>(nil)

producer.startWithSignal { signal, signalDisposable in
disposable += signalDisposable

var aggregate = Data()
signal.observe(Observer(value: { data in
observer.send(value: chunk(data))
aggregate.append(data)
}, failed: { error in
observer.send(error: error)
aggregated.value = .failed(error)
}, completed: {
aggregated.value = .value(aggregate)
}, interrupted: {
aggregated.value = .interrupted
}))
}

var aggregate = Data()
signal.observe(Observer(value: { data in
observer.send(value: chunk(data))
aggregate.append(data)
}, failed: { error in
observer.send(error: error)
aggregated.value = .failed(error)
}, completed: {
aggregated.value = .value(aggregate)
}, interrupted: {
aggregated.value = .interrupted
}))
return aggregated.producer
.skipNil()
.flatMap(.concat) { $0.producer }
}

return aggregated.producer
.skipNil()
.flatMap(.concat) { $0.producer }
}

let stdoutAggregated = startAggregating(producer: stdoutProducer, chunk: TaskEvent.standardOutput)
let stderrAggregated = startAggregating(producer: stderrProducer, chunk: TaskEvent.standardError)

process.standardOutput = stdoutPipe.writeHandle
process.standardError = stderrPipe.writeHandle

group.enter()
process.terminationHandler = { nstask in
let terminationStatus = nstask.terminationStatus
if terminationStatus == EXIT_SUCCESS {
// Wait for stderr to finish, then pass
// through stdout.
disposable += stderrAggregated
.then(stdoutAggregated)
.map(TaskEvent.success)
.start(observer)
} else {
// Wait for stdout to finish, then pass
// through stderr.
disposable += stdoutAggregated
.then(stderrAggregated)
.flatMap(.concat) { data -> SignalProducer<TaskEvent<Data>, TaskError> in
let errorString = (data.count > 0 ? String(data: data, encoding: .utf8) : nil)
return SignalProducer(error: .shellTaskFailed(task, exitCode: terminationStatus, standardError: errorString))
}
.start(observer)
let stdoutAggregated = startAggregating(producer: stdoutProducer, chunk: TaskEvent.standardOutput)
let stderrAggregated = startAggregating(producer: stderrProducer, chunk: TaskEvent.standardError)

process.standardOutput = stdoutPipe.writeHandle
process.standardError = stderrPipe.writeHandle

group.enter()
process.terminationHandler = { nstask in
let terminationStatus = nstask.terminationStatus
if terminationStatus == EXIT_SUCCESS {
// Wait for stderr to finish, then pass
// through stdout.
disposable += stderrAggregated
.then(stdoutAggregated)
.map(TaskEvent.success)
.start(observer)
} else {
// Wait for stdout to finish, then pass
// through stderr.
disposable += stdoutAggregated
.then(stderrAggregated)
.flatMap(.concat) { data -> SignalProducer<TaskEvent<Data>, TaskError> in
let errorString = (data.count > 0 ? String(data: data, encoding: .utf8) : nil)
return SignalProducer(error: .shellTaskFailed(self, exitCode: terminationStatus, standardError: errorString))
}
.start(observer)
}
group.leave()
}
group.leave()
}

observer.send(value: .launch(task))
process.launch()
close(stdoutPipe.writeFD)
close(stderrPipe.writeFD)

observer.send(value: .launch(self))
process.launch()
close(stdoutPipe.writeFD)
close(stderrPipe.writeFD)

stdinProducer.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
}
stdinProducer.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
}

let _ = disposable.add {
process.terminate()
let _ = disposable.add {
process.terminate()
}
}
}
}
.startWithSignal { signal, taskDisposable in
disposable.add(taskDisposable)
signal.observe(observer)
}
.startWithSignal { signal, taskDisposable in
disposable.add(taskDisposable)
signal.observe(observer)
}
}
}
}
10 changes: 5 additions & 5 deletions ReactiveTaskTests/TaskSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class TaskSpec: QuickSpec {
var isLaunched: Bool = false

let task = Task("/usr/bin/true")
let result = launchTask(task)
let result = task.launch()
.on(value: { event in
if case let .launch(launched) = event {
isLaunched = true
Expand All @@ -33,7 +33,7 @@ class TaskSpec: QuickSpec {
}

it("should launch a task that writes to stdout") {
let result = launchTask(Task("/bin/echo", arguments: [ "foobar" ]))
let result = Task("/bin/echo", arguments: [ "foobar" ]).launch()
.reduce(Data()) { aggregated, event in
var mutableData = aggregated
if case let .standardOutput(data) = event {
Expand All @@ -52,7 +52,7 @@ class TaskSpec: QuickSpec {

it("should launch a task that writes to stderr") {
var aggregated = Data()
let result = launchTask(Task("/usr/bin/stat", arguments: [ "not-a-real-file" ]))
let result = Task("/usr/bin/stat", arguments: [ "not-a-real-file" ]).launch()
.reduce(aggregated) { _, event in
if case let .standardError(data) = event {
aggregated.append(data)
Expand All @@ -70,7 +70,7 @@ class TaskSpec: QuickSpec {
let strings = [ "foo\n", "bar\n", "buzz\n", "fuzz\n" ]
let data = strings.map { $0.data(using: .utf8)! }

let result = launchTask(Task("/usr/bin/sort"), standardInput: SignalProducer(values: data))
let result = Task("/usr/bin/sort").launch(standardInput: SignalProducer(values: data))
.map { event in event.value }
.skipNil()
.single()
Expand All @@ -83,7 +83,7 @@ class TaskSpec: QuickSpec {

it("should error correctly") {
let task = Task("/usr/bin/stat", arguments: [ "not-a-real-file" ])
let result = launchTask(task)
let result = task.launch()
.wait()

expect(result).notTo(beNil())
Expand Down