diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 66dc38b43eb08..971a95584f678 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -123,6 +123,7 @@ following works: - github.com/prometheus/common [Apache License 2.0](https://github.com/prometheus/common/blob/master/LICENSE) - github.com/prometheus/procfs [Apache License 2.0](https://github.com/prometheus/procfs/blob/master/LICENSE) - github.com/rcrowley/go-metrics [MIT License](https://github.com/rcrowley/go-metrics/blob/master/LICENSE) +- github.com/riemann/riemann-go-client [MIT License](https://github.com/riemann/riemann-go-client/blob/master/LICENSE) - github.com/safchain/ethtool [Apache License 2.0](https://github.com/safchain/ethtool/blob/master/LICENSE) - github.com/samuel/go-zookeeper [BSD 3-Clause Clear License](https://github.com/samuel/go-zookeeper/blob/master/LICENSE) - github.com/shirou/gopsutil [BSD 3-Clause Clear License](https://github.com/shirou/gopsutil/blob/master/LICENSE) @@ -172,6 +173,7 @@ following works: - gopkg.in/mgo.v2 [BSD 2-Clause "Simplified" License](https://github.com/go-mgo/mgo/blob/v2/LICENSE) - gopkg.in/olivere/elastic.v5 [MIT License](https://github.com/olivere/elastic/blob/v5.0.76/LICENSE) - gopkg.in/tomb.v1 [BSD 3-Clause Clear License](https://github.com/go-tomb/tomb/blob/v1/LICENSE) +- gopkg.in/tomb.v2 [BSD 3-Clause Clear License](https://github.com/go-tomb/tomb/blob/v2/LICENSE) - gopkg.in/yaml.v2 [Apache License 2.0](https://github.com/go-yaml/yaml/blob/v2.2.2/LICENSE) - gopkg.in/yaml.v3 [Apache License 2.0](https://github.com/go-yaml/yaml/blob/v3/LICENSE) - modernc.org/libc [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/libc/-/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 81464d821d18e..f106ffcdfaeb6 100644 --- a/go.mod +++ b/go.mod @@ -108,6 +108,7 @@ require ( github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 github.com/prometheus/procfs v0.0.8 + github.com/riemann/riemann-go-client v0.5.0 github.com/safchain/ethtool v0.0.0-20200218184317-f459e2d13664 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec // indirect github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect @@ -149,7 +150,7 @@ require ( gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce gopkg.in/olivere/elastic.v5 v5.0.70 gopkg.in/yaml.v2 v2.2.8 - gotest.tools v2.2.0+incompatible // indirect + gotest.tools v2.2.0+incompatible honnef.co/go/tools v0.0.1-2020.1.3 // indirect k8s.io/apimachinery v0.17.1 // indirect modernc.org/sqlite v1.7.4 diff --git a/go.sum b/go.sum index 866e6d15d95a1..58cc945365d80 100644 --- a/go.sum +++ b/go.sum @@ -257,6 +257,7 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v0.0.0-20170307001533-c9c7427a2a70/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= @@ -538,6 +539,8 @@ github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/riemann/riemann-go-client v0.5.0 h1:yPP7tz1vSYJkSZvZFCsMiDsHHXX57x8/fEX3qyEXuAA= +github.com/riemann/riemann-go-client v0.5.0/go.mod h1:FMiaOL8dgBnRfgwENzV0xlYJ2eCbV1o7yqVwOBLbShQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/safchain/ethtool v0.0.0-20200218184317-f459e2d13664 h1:gvolwzuDhul9qK6/oHqxCHD5TEYfsWNBGidOeG6kvpk= github.com/safchain/ethtool v0.0.0-20200218184317-f459e2d13664/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4= @@ -673,6 +676,7 @@ golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190206173232-65e2d4e15006/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -905,6 +909,8 @@ gopkg.in/olivere/elastic.v5 v5.0.70/go.mod h1:FylZT6jQWtfHsicejzOm3jIMVPOAksa80i gopkg.in/tomb.v1 v1.0.0-20140529071818-c131134a1947/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs= +gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 1d1b8eb58b463..6eb5dbb7aafef 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -146,6 +146,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/riak" + _ "github.com/influxdata/telegraf/plugins/inputs/riemann_listener" _ "github.com/influxdata/telegraf/plugins/inputs/salesforce" _ "github.com/influxdata/telegraf/plugins/inputs/sensors" _ "github.com/influxdata/telegraf/plugins/inputs/sflow" diff --git a/plugins/inputs/riemann_listener/README.md b/plugins/inputs/riemann_listener/README.md new file mode 100644 index 0000000000000..54e70be6ecb71 --- /dev/null +++ b/plugins/inputs/riemann_listener/README.md @@ -0,0 +1,42 @@ +# Riemann Listener Input Plugin + +The Riemann Listener is a simple input plugin that listens for messages from +client that use riemann clients using riemann-protobuff format. + + +### Configuration: + +This is a sample configuration for the plugin. + +```toml +[[inputs.rimann_listener]] + ## URL to listen on + ## Default is "tcp://:5555" + # service_address = "tcp://:8094" + # service_address = "tcp://127.0.0.1:http" + # service_address = "tcp4://:8094" + # service_address = "tcp6://:8094" + # service_address = "tcp6://[2001:db8::1]:8094" + + ## Maximum number of concurrent connections. + ## 0 (default) is unlimited. + # max_connections = 1024 + ## Read timeout. + ## 0 (default) is unlimited. + # read_timeout = "30s" + ## Optional TLS configuration. + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Enables client authentication if set. + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + ## Maximum socket buffer size (in bytes when no unit specified). + # read_buffer_size = "64KiB" + ## Period between keep alive probes. + ## 0 disables keep alive probes. + ## Defaults to the OS configuration. + # keep_alive_period = "5m" +``` +Just like Riemann the default port is 5555. This can be configured, refer configuration above. + +Riemann `Service` is mapped as `measurement`. `metric` and `TTL` are converted into field values. +As Riemann tags as simply an array, they are converted into the `influx_line` format key-value, where both key and value are the tags. diff --git a/plugins/inputs/riemann_listener/riemann_listener.go b/plugins/inputs/riemann_listener/riemann_listener.go new file mode 100644 index 0000000000000..45d1ef4db27f2 --- /dev/null +++ b/plugins/inputs/riemann_listener/riemann_listener.go @@ -0,0 +1,399 @@ +package riemann_listener + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/binary" + "fmt" + "io" + "log" + "net" + "os" + "os/signal" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf/metric" + + "github.com/gogo/protobuf/proto" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + tlsint "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/inputs" + riemanngo "github.com/riemann/riemann-go-client" + riemangoProto "github.com/riemann/riemann-go-client/proto" +) + +type RiemannSocketListener struct { + ServiceAddress string `toml:"service_address"` + MaxConnections int `toml:"max_connections"` + ReadBufferSize internal.Size `toml:"read_buffer_size"` + ReadTimeout *internal.Duration `toml:"read_timeout"` + KeepAlivePeriod *internal.Duration `toml:"keep_alive_period"` + SocketMode string `toml:"socket_mode"` + tlsint.ServerConfig + + wg sync.WaitGroup + + Log telegraf.Logger + + telegraf.Accumulator +} +type setReadBufferer interface { + SetReadBuffer(bytes int) error +} + +type riemannListener struct { + net.Listener + *RiemannSocketListener + + sockType string + + connections map[string]net.Conn + connectionsMtx sync.Mutex +} + +func (rsl *riemannListener) listen(ctx context.Context) { + rsl.connections = map[string]net.Conn{} + + wg := sync.WaitGroup{} + + select { + case <-ctx.Done(): + rsl.closeAllConnections() + wg.Wait() + return + default: + for { + c, err := rsl.Accept() + if err != nil { + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + rsl.Log.Error(err.Error()) + } + break + } + + if rsl.ReadBufferSize.Size > 0 { + if srb, ok := c.(setReadBufferer); ok { + srb.SetReadBuffer(int(rsl.ReadBufferSize.Size)) + } else { + rsl.Log.Warnf("Unable to set read buffer on a %s socket", rsl.sockType) + } + } + + rsl.connectionsMtx.Lock() + if rsl.MaxConnections > 0 && len(rsl.connections) >= rsl.MaxConnections { + rsl.connectionsMtx.Unlock() + c.Close() + continue + } + rsl.connections[c.RemoteAddr().String()] = c + rsl.connectionsMtx.Unlock() + + if err := rsl.setKeepAlive(c); err != nil { + rsl.Log.Errorf("Unable to configure keep alive %q: %s", rsl.ServiceAddress, err.Error()) + } + + wg.Add(1) + go func() { + defer wg.Done() + rsl.read(c) + }() + } + rsl.closeAllConnections() + wg.Wait() + } +} + +func (rsl *riemannListener) closeAllConnections() { + rsl.connectionsMtx.Lock() + for _, c := range rsl.connections { + c.Close() + } + rsl.connectionsMtx.Unlock() +} + +func (rsl *riemannListener) setKeepAlive(c net.Conn) error { + if rsl.KeepAlivePeriod == nil { + return nil + } + tcpc, ok := c.(*net.TCPConn) + if !ok { + return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(rsl.ServiceAddress, "://", 2)[0]) + } + if rsl.KeepAlivePeriod.Duration == 0 { + return tcpc.SetKeepAlive(false) + } + if err := tcpc.SetKeepAlive(true); err != nil { + return err + } + return tcpc.SetKeepAlivePeriod(rsl.KeepAlivePeriod.Duration) +} + +func (rsl *riemannListener) removeConnection(c net.Conn) { + rsl.connectionsMtx.Lock() + delete(rsl.connections, c.RemoteAddr().String()) + rsl.connectionsMtx.Unlock() +} + +//Utilities + +/* +readMessages will read Riemann messages in binary format +from the TCP connection. byte Array p size will depend on the size +of the riemann message as sent by the cleint +*/ +func readMessages(r io.Reader, p []byte) error { + for len(p) > 0 { + n, err := r.Read(p) + p = p[n:] + if err != nil { + return err + } + } + return nil +} + +func checkError(err error) { + log.Println("The error is") + if err != nil { + log.Println(err.Error()) + } +} + +func (rsl *riemannListener) read(conn net.Conn) { + defer rsl.removeConnection(conn) + defer conn.Close() + var err error + + for { + if rsl.ReadTimeout != nil && rsl.ReadTimeout.Duration > 0 { + + err = conn.SetDeadline(time.Now().Add(rsl.ReadTimeout.Duration)) + } + + messagePb := &riemangoProto.Msg{} + var header uint32 + // First obtain the size of the riemann event from client and acknowledge + if err = binary.Read(conn, binary.BigEndian, &header); err != nil { + if err.Error() != "EOF" { + rsl.Log.Debugf("Failed to read header") + riemannReturnErrorResponse(conn, err.Error()) + return + } + return + } + data := make([]byte, header) + + if err = readMessages(conn, data); err != nil { + rsl.Log.Debugf("Failed to read body: %s", err.Error()) + riemannReturnErrorResponse(conn, "Failed to read body") + return + } + if err = proto.Unmarshal(data, messagePb); err != nil { + rsl.Log.Debugf("Failed to unmarshal: %s", err.Error()) + riemannReturnErrorResponse(conn, "Failed to unmarshal") + return + } + riemannEvents := riemanngo.ProtocolBuffersToEvents(messagePb.Events) + + for _, m := range riemannEvents { + if m.Service == "" { + riemannReturnErrorResponse(conn, "No Service Name") + return + } + tags := make(map[string]string) + fieldValues := map[string]interface{}{} + for _, tag := range m.Tags { + tags[strings.ReplaceAll(tag, " ", "_")] = tag + } + tags["Host"] = m.Host + tags["Description"] = m.Description + tags["State"] = m.State + fieldValues["Metric"] = m.Metric + fieldValues["TTL"] = m.TTL.Seconds() + singleMetric, err := metric.New(m.Service, tags, fieldValues, m.Time, telegraf.Untyped) + if err != nil { + rsl.Log.Debugf("Could not create metric for service %s at %s", m.Service, m.Time.String()) + riemannReturnErrorResponse(conn, "Could not create metric") + return + } + + rsl.AddMetric(singleMetric) + } + riemannReturnResponse(conn) + + } + +} + +func riemannReturnResponse(conn net.Conn) { + t := true + message := new(riemangoProto.Msg) + message.Ok = &t + returnData, err := proto.Marshal(message) + if err != nil { + checkError(err) + return + } + b := new(bytes.Buffer) + if err = binary.Write(b, binary.BigEndian, uint32(len(returnData))); err != nil { + checkError(err) + } + // send the msg length + if _, err = conn.Write(b.Bytes()); err != nil { + checkError(err) + } + if _, err = conn.Write(returnData); err != nil { + checkError(err) + } +} + +func riemannReturnErrorResponse(conn net.Conn, errorMessage string) { + t := false + message := new(riemangoProto.Msg) + message.Ok = &t + message.Error = &errorMessage + returnData, err := proto.Marshal(message) + if err != nil { + checkError(err) + return + } + b := new(bytes.Buffer) + if err = binary.Write(b, binary.BigEndian, uint32(len(returnData))); err != nil { + checkError(err) + } + // send the msg length + if _, err = conn.Write(b.Bytes()); err != nil { + checkError(err) + } + if _, err = conn.Write(returnData); err != nil { + log.Println("Somethign") + checkError(err) + } +} + +func (rsl *RiemannSocketListener) Description() string { + return "Riemann protobuff listener." +} + +func (rsl *RiemannSocketListener) SampleConfig() string { + return ` + ## URL to listen on. + ## Default is "tcp://:5555" + # service_address = "tcp://:8094" + # service_address = "tcp://127.0.0.1:http" + # service_address = "tcp4://:8094" + # service_address = "tcp6://:8094" + # service_address = "tcp6://[2001:db8::1]:8094" + + ## Maximum number of concurrent connections. + ## 0 (default) is unlimited. + # max_connections = 1024 + ## Read timeout. + ## 0 (default) is unlimited. + # read_timeout = "30s" + ## Optional TLS configuration. + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Enables client authentication if set. + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + ## Maximum socket buffer size (in bytes when no unit specified). + # read_buffer_size = "64KiB" + ## Period between keep alive probes. + ## 0 disables keep alive probes. + ## Defaults to the OS configuration. + # keep_alive_period = "5m" +` +} + +func (rsl *RiemannSocketListener) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (rsl *RiemannSocketListener) Start(acc telegraf.Accumulator) error { + ctx, cancelFunc := context.WithCancel(context.Background()) + go processOsSignals(cancelFunc) + rsl.Accumulator = acc + if rsl.ServiceAddress == "" { + rsl.Log.Warnf("Using default service_address tcp://:5555") + rsl.ServiceAddress = "tcp://:5555" + } + spl := strings.SplitN(rsl.ServiceAddress, "://", 2) + if len(spl) != 2 { + return fmt.Errorf("invalid service address: %s", rsl.ServiceAddress) + } + + protocol := spl[0] + addr := spl[1] + + switch protocol { + case "tcp", "tcp4", "tcp6": + tlsCfg, err := rsl.ServerConfig.TLSConfig() + if err != nil { + return err + } + + var l net.Listener + if tlsCfg == nil { + l, err = net.Listen(protocol, addr) + } else { + l, err = tls.Listen(protocol, addr, tlsCfg) + } + if err != nil { + return err + } + + rsl.Log.Infof("Listening on %s://%s", protocol, l.Addr()) + + rsl := &riemannListener{ + Listener: l, + RiemannSocketListener: rsl, + sockType: spl[0], + } + + rsl.wg = sync.WaitGroup{} + rsl.wg.Add(1) + go func() { + defer rsl.wg.Done() + rsl.listen(ctx) + + }() + default: + return fmt.Errorf("unknown protocol '%s' in '%s'", protocol, rsl.ServiceAddress) + } + + return nil +} + +// Handle cancellations from the process +func processOsSignals(cancelFunc context.CancelFunc) { + signalChan := make(chan os.Signal) + signal.Notify(signalChan, os.Interrupt) + for { + sig := <-signalChan + switch sig { + case os.Interrupt: + log.Println("Signal SIGINT is received, probably due to `Ctrl-C`, exiting ...") + cancelFunc() + return + } + } + +} + +func (rsl *RiemannSocketListener) Stop() { + rsl.wg.Done() + rsl.wg.Wait() + os.Exit(0) +} + +func newRiemannSocketListener() *RiemannSocketListener { + return &RiemannSocketListener{} +} + +func init() { + inputs.Add("riemann_listener", func() telegraf.Input { return newRiemannSocketListener() }) +} diff --git a/plugins/inputs/riemann_listener/riemann_listener_test.go b/plugins/inputs/riemann_listener/riemann_listener_test.go new file mode 100644 index 0000000000000..f1ce824c6a731 --- /dev/null +++ b/plugins/inputs/riemann_listener/riemann_listener_test.go @@ -0,0 +1,55 @@ +package riemann_listener + +import ( + "log" + "testing" + "time" + + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + riemanngo "github.com/riemann/riemann-go-client" + "github.com/stretchr/testify/require" + "gotest.tools/assert" +) + +func TestSocketListener_tcp(t *testing.T) { + log.Println("Entering") + + sl := newRiemannSocketListener() + sl.Log = testutil.Logger{} + sl.ServiceAddress = "tcp://127.0.0.1:5555" + sl.ReadBufferSize = internal.Size{Size: 1024} + + acc := &testutil.Accumulator{} + err := sl.Start(acc) + require.NoError(t, err) + defer sl.Stop() + + testStats(t, sl) + testMissingService(t, sl) +} +func testStats(t *testing.T, sl *RiemannSocketListener) { + c := riemanngo.NewTCPClient("127.0.0.1:5555", 5*time.Second) + err := c.Connect() + if err != nil { + log.Println("Error") + panic(err) + } + defer c.Close() + result, err := riemanngo.SendEvent(c, &riemanngo.Event{ + Service: "hello", + }) + assert.Equal(t, result.GetOk(), true) + +} +func testMissingService(t *testing.T, sl *RiemannSocketListener) { + c := riemanngo.NewTCPClient("127.0.0.1:5555", 5*time.Second) + err := c.Connect() + if err != nil { + panic(err) + } + defer c.Close() + result, err := riemanngo.SendEvent(c, &riemanngo.Event{}) + assert.Equal(t, result.GetOk(), false) + +}