Skip to content

Commit

Permalink
Full duplex (#30)
Browse files Browse the repository at this point in the history
* duplex rewrite

* fix CLI usage bug

* fix race

* redesign api

* update README.md
  • Loading branch information
wybiral authored Jul 9, 2018
1 parent 10a22ee commit 18fb000
Show file tree
Hide file tree
Showing 74 changed files with 1,636 additions and 1,110 deletions.
10 changes: 8 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 20 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,39 @@ go build github.com/wybiral/hookah/cmd/hookah
```

# Usage instructions (CLI)
The hookah command allows you to specify an input source -i and an output destination -o.
Any data that's fed into the input will be piped to the output.
The hookah command allows you to pipe data betweem various sources/destinations.
By default pipes are full duplex but can be limited to input/output-only mode.

For details run `hookah -h`

## Examples

Pipe from stdin to a new TCP listener on port 8080:
Pipe from stdin/stdout to a new TCP listener on port 8080:
```
hookah stdio tcp-listen://localhost:8080
```
Note: this is the same even if you ommit the `stdio` part because hookah will
assume stdio is indended when only one node (tcp-listen in this case) is used.

Pipe from an existing TCP listener on port 8080 to a new WebSocket listener on
port 8081:
```
hookah -o tcp-listen://localhost:8080
hookah tcp-listen://localhost:8080 ws-listen://localhost:8081
```

Pipe from an existing TCP listener on port 8080 to a new HTTP listener on port 8081:
Pipe from a new Unix domain socket listener to a TCP client on port 8080:
```
hookah -i tcp://localhost:8080 -o http-listen://localhost:8081
hookah unix-listen://path/to/sock tcp://localhost:8080
```

Pipe from a new Unix domain socket listener to stdout:
Pipe only the input from a TCP client to the output of another TCP client:
```
hookah -i unix-listen://path/to/sock
hookah -i tcp://:8080 -o tcp://:8081
```

Pipe from a new HTTP listener on port 8080 to an existing Unix domain socket:
Fan-out the input from a TCP listener to the output of multiple TCP clients:
```
hookah -i http-listen://localhost:8080 -o unix://path/to/sock
hookah -i tcp-listen://:8080 -o tcp://:8081 -o tcp://:8082 -o tcp://:8083
```

# Usage instructions (Go package)
Expand Down
129 changes: 90 additions & 39 deletions cmd/hookah/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,116 @@
package main

import (
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"

"github.com/wybiral/hookah"
"github.com/wybiral/hookah/internal/app"
"github.com/wybiral/hookah/pkg/flagslice"
)

func main() {
// Create hookah API instance
h := hookah.New()
flag.Usage = func() {
fmt.Print("NAME:\n")
fmt.Print(" hookah\n\n")
fmt.Print("USAGE:\n")
fmt.Print(" hookah -i input -o output\n\n")
fmt.Print("VERSION:\n")
fmt.Printf(" %s\n\n", hookah.Version)
fmt.Print("INPUTS:\n")
for _, reg := range h.ListInputs() {
fmt.Printf(" %s\n", reg.Usage)
}
fmt.Print("\n")
fmt.Print("OUTPUTS:\n")
for _, reg := range h.ListOutputs() {
fmt.Printf(" %s\n", reg.Usage)
}
fmt.Print("\n")
usage(h)
os.Exit(0)
}
// Parse flags
var inOpts string
flag.StringVar(&inOpts, "i", "stdin", "Stream input")
var outOpts string
flag.StringVar(&outOpts, "o", "stdout", "Stream output")
var opts, rOpts, wOpts flagslice.FlagSlice
flag.Var(&rOpts, "i", "input node (readonly)")
flag.Var(&wOpts, "o", "output node (writeonly)")
flag.Parse()
// Setup input stream
r, err := h.NewInput(inOpts)
opts = flag.Args()
// Run and report errors
err := run(h, opts, rOpts, wOpts)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
os.Exit(0)
}

func run(h *hookah.API, opts, rOpts, wOpts []string) error {
a := app.New(nil)
// Closing this App instance will close all the nodes being created
defer a.Close()
// Add bidirectional nodes
err := addNodes(h, a, opts, true, true)
if err != nil {
return err
}
// Add reader (input) nodes
err = addNodes(h, a, rOpts, true, false)
if err != nil {
log.Fatal(err)
return err
}
defer r.Close()
// Setup output stream
w, err := h.NewOutput(outOpts)
// Add writer (output) nodes
err = addNodes(h, a, wOpts, false, true)
if err != nil {
log.Fatal(err)
return err
}
// No nodes, show usage
if len(a.Nodes) == 0 {
flag.Usage()
return nil
}
// Only one node, link to stdio
if len(a.Nodes) == 1 {
n, err := h.NewNode("stdio")
if err != nil {
return err
}
a.AddNode(n)
}
defer w.Close()
// Listen for interrupt to close gracefully
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
if len(a.Readers) == 0 {
return errors.New("no input nodes")
}
if len(a.Writers) == 0 {
return errors.New("no output nodes")
}
// Handle CTRL+C by sending a Quit signal
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
go func() {
<-ch
r.Close()
w.Close()
os.Exit(1)
<-interrupt
a.Quit <- nil
}()
// Copy all of in to out
io.Copy(w, r)
return a.Run()
}

// Add nodes to the app from opt strings and r/w status
func addNodes(h *hookah.API, a *app.App, opts []string, r, w bool) error {
for _, opt := range opts {
n, err := h.NewNode(opt)
if err != nil {
return err
}
if !r {
n.R = nil
}
if !w {
n.W = nil
}
a.AddNode(n)
}
return nil
}

// Print CLI usage info
func usage(h *hookah.API) {
fmt.Print("NAME:\n")
fmt.Print(" hookah\n\n")
fmt.Print("USAGE:\n")
fmt.Print(" hookah node [node] -i in_node -o out_node\n\n")
fmt.Print("VERSION:\n")
fmt.Printf(" %s\n\n", hookah.Version)
fmt.Print("PROTOCOLS:\n")
for _, p := range h.ListProtocols() {
fmt.Printf(" %s\n", p.Usage)
}
fmt.Print("\n")
}
22 changes: 10 additions & 12 deletions examples/certstream/main.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
// Example of using hookah to create an input stream from the CertStream
// Example of using hookah to create an input node from the CertStream
// WebSocket API (https://certstream.calidog.io/).
// The cert updates are filtered to remove heartbeat messages and processed by
// restricting the JSON fields and adding indentation.
// These updates are then written to stdout.
// These updates are then written to stdout node.
package main

import (
"encoding/json"
"io"
"log"

"github.com/wybiral/hookah"
"github.com/wybiral/hookah/pkg/node"
)

// CertStream JSON struct
Expand All @@ -32,28 +32,26 @@ type certUpdate struct {
func main() {
// Create hookah API instance
h := hookah.New()
// Create hookah input (certstream WebSocket API)
r, err := h.NewInput("wss://certstream.calidog.io")
// Create hookah node (certstream WebSocket API)
r, err := h.NewNode("wss://certstream.calidog.io")
if err != nil {
log.Fatal(err)
}
defer r.Close()
// Create hookah output (stdout)
w, err := h.NewOutput("stdout")
// Create hookah node (stdout)
w, err := h.NewNode("stdout")
if err != nil {
log.Fatal(err)
}
defer w.Close()
// Start stream
stream(w, r)
}

// Copy from reader to writer
// Drops heartbeat messages, restricts fields, and formats JSON
func stream(w io.Writer, r io.Reader) {
func stream(w, r *node.Node) {
var u certUpdate
d := json.NewDecoder(r)
e := json.NewEncoder(w)
d := json.NewDecoder(r.R)
e := json.NewEncoder(w.W)
e.SetIndent("", " ")
for {
err := d.Decode(&u)
Expand Down
49 changes: 20 additions & 29 deletions examples/customproto/main.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,64 @@
// Example of using hookah with a custom input protocol. In this case the input
// protocol is named numbers:// and it can accept "odd" or "even" as the
// argument.
// Example of using hookah with a custom protocol. In this case the protocol is
// named numbers:// and it can accept "odd" or "even" as the argument.
package main

import (
"errors"
"fmt"
"io"
"log"
"net/url"
"time"

"github.com/wybiral/hookah"
"github.com/wybiral/hookah/pkg/node"
)

func main() {
// Create hookah API instance
h := hookah.New()
// Register new protocol
h.RegisterInput("numbers", "numbers://parity", numbersHandler)
// Create hookah input (using new numbers:// protocol)
r, err := h.NewInput("numbers://odd")
h.RegisterProtocol("numbers", "numbers://parity", numbersHandler)
// Create hookah node (using our new numbers:// protocol)
r, err := h.NewNode("numbers://odd")
if err != nil {
log.Fatal(err)
}
defer r.Close()
// Create hookah output (stdout)
w, err := h.NewOutput("stdout")
// Create hookah node (stdout)
w, err := h.NewNode("stdout")
if err != nil {
log.Fatal(err)
}
defer w.Close()
// Copy forever
io.Copy(w, r)
io.Copy(w.W, r.R)
}

// struct type to implement interface on.
type numbers struct {
counter int64
}
// type to implement Reader interface on.
type numbers int64

// Input handlers take an arg string and return an io.ReadCloser for the input
// stream (or an error).
func numbersHandler(arg string, opts url.Values) (io.ReadCloser, error) {
var counter int64
// Handlers take an arg string and return a Node
func numbersHandler(arg string) (*node.Node, error) {
var counter numbers
if arg == "odd" {
counter = 1
} else if arg == "even" {
counter = 2
} else {
return nil, errors.New("numbers requires: odd or even")
}
return &numbers{counter: counter}, nil
// Node can have R: Reader, W: Writer, C: Closer
// In this case it's just a Reader
return &node.Node{R: &counter}, nil
}

// Read method satisfies the io.ReadCloser interface
// Read the next number (after delay) and increment counter.
func (num *numbers) Read(b []byte) (int, error) {
// Artificial delay
time.Sleep(time.Second)
// Format counter
s := fmt.Sprintf("%d\n", num.counter)
s := fmt.Sprintf("%d\n", *num)
// Increment counter
num.counter += 2
*num += 2
// Copy to byte array
n := copy(b, []byte(s))
return n, nil
}

// Close method satisfies the io.ReadCloser interface
func (num *numbers) Close() error {
return nil
}
Loading

0 comments on commit 18fb000

Please sign in to comment.