From c931a14a564e97a96bd271b470f73c9cc37c4e17 Mon Sep 17 00:00:00 2001 From: Zunnurain Badar Date: Sat, 23 Mar 2024 00:53:29 +0500 Subject: [PATCH] Added Clickhouse Logger (#349) * Added Clickhouse Logger * resync clickhouse client * fix linter * Update README.md --------- Co-authored-by: dmachard <5562930+dmachard@users.noreply.github.com> --- README.md | 1 + docs/loggers/logger_clickhouse.md | 23 ++++ loggers/clickhouse.go | 222 ++++++++++++++++++++++++++++++ loggers/clickhouse_test.go | 67 +++++++++ pkgconfig/loggers.go | 19 ++- pkglinker/multiplexer.go | 3 + pkglinker/pipelines.go | 3 + 7 files changed, 336 insertions(+), 2 deletions(-) create mode 100644 docs/loggers/logger_clickhouse.md create mode 100644 loggers/clickhouse.go create mode 100644 loggers/clickhouse_test.go diff --git a/README.md b/README.md index 46680c3c..ffa6b6fc 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,7 @@ - [`Scalyr`](docs/loggers/logger_scalyr.md) - [`Redis`](docs/loggers/logger_redis.md) publisher - [`Kafka`](docs/loggers/logger_kafka.md) producer + - [`Clickhouse`](doc/logger_clickhouse.md) *not yet production ready* - *Send to security tools* - [`Falco`](docs/loggers/logger_falco.md) diff --git a/docs/loggers/logger_clickhouse.md b/docs/loggers/logger_clickhouse.md new file mode 100644 index 00000000..d230d406 --- /dev/null +++ b/docs/loggers/logger_clickhouse.md @@ -0,0 +1,23 @@ + +# Logger: Clickhouse client + +Clickhouse client to remote Clickhouse server + +Options: + +- `url`: (string) Clickhouse server url +- `user`: (string) Clickhouse database user +- `password`: (string) Clickhouse database user password +- `table`: (string) Clickhouse table name +- `database`: (string) Clickhouse database name + +Defaults: + +```yaml +clickhouse: + url: "http://localhost:8123" + user: "default" + password: "password" + table: "records" + database: "dnscollector" +``` diff --git a/loggers/clickhouse.go b/loggers/clickhouse.go new file mode 100644 index 00000000..0dcd5c3f --- /dev/null +++ b/loggers/clickhouse.go @@ -0,0 +1,222 @@ +package loggers + +import ( + "net/http" + "strconv" + "time" + + "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-dnscollector/pkgutils" + "github.com/dmachard/go-dnscollector/transformers" + "github.com/dmachard/go-logger" +) + +type ClickhouseData struct { + Identity string `json:"identity"` + QueryIP string `json:"query_ip"` + QName string `json:"q_name"` + Operation string `json:"operation"` + Family string `json:"family"` + Protocol string `json:"protocol"` + QType string `json:"q_type"` + RCode string `json:"r_code"` + TimeNSec string `json:"timensec"` + TimeStamp string `json:"timestamp"` +} + +type ClickhouseClient struct { + stopProcess chan bool + doneProcess chan bool + stopRun chan bool + doneRun chan bool + inputChan chan dnsutils.DNSMessage + outputChan chan dnsutils.DNSMessage + config *pkgconfig.Config + configChan chan *pkgconfig.Config + logger *logger.Logger + name string + url string + user string + password string + database string + table string + RoutingHandler pkgutils.RoutingHandler +} + +func NewClickhouseClient(config *pkgconfig.Config, console *logger.Logger, name string) *ClickhouseClient { + console.Info("[%s] logger=clickhouse - enabled", name) + o := &ClickhouseClient{ + stopProcess: make(chan bool), + doneProcess: make(chan bool), + stopRun: make(chan bool), + doneRun: make(chan bool), + inputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize), + outputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize), + logger: console, + config: config, + name: name, + configChan: make(chan *pkgconfig.Config), + RoutingHandler: pkgutils.NewRoutingHandler(config, console, name), + } + o.ReadConfig() + return o +} + +func (o *ClickhouseClient) GetName() string { return o.name } + +func (o *ClickhouseClient) SetLoggers(loggers []pkgutils.Worker) {} + +func (o *ClickhouseClient) AddDroppedRoute(wrk pkgutils.Worker) { + o.RoutingHandler.AddDroppedRoute(wrk) +} + +func (o *ClickhouseClient) AddDefaultRoute(wrk pkgutils.Worker) { + o.RoutingHandler.AddDefaultRoute(wrk) +} + +func (o *ClickhouseClient) ReloadConfig(config *pkgconfig.Config) { + o.LogInfo("reload configuration!") + o.configChan <- config +} + +func (o *ClickhouseClient) ReadConfig() { + o.url = o.config.Loggers.ClickhouseClient.URL + o.user = o.config.Loggers.ClickhouseClient.User + o.password = o.config.Loggers.ClickhouseClient.Password + o.database = o.config.Loggers.ClickhouseClient.Database + o.table = o.config.Loggers.ClickhouseClient.Table +} + +func (o *ClickhouseClient) Channel() chan dnsutils.DNSMessage { + return o.inputChan +} + +func (o *ClickhouseClient) GetInputChannel() chan dnsutils.DNSMessage { + return o.inputChan +} + +func (o *ClickhouseClient) LogInfo(msg string, v ...interface{}) { + o.logger.Info("["+o.name+"] Clickhouse - "+msg, v...) +} + +func (o *ClickhouseClient) LogError(msg string, v ...interface{}) { + o.logger.Error("["+o.name+"] Clickhouse - "+msg, v...) +} + +func (o *ClickhouseClient) Stop() { + o.LogInfo("stopping to run...") + o.stopRun <- true + <-o.doneRun + + o.LogInfo("stopping to process...") + o.stopProcess <- true + <-o.doneProcess +} + +func (o *ClickhouseClient) Run() { + o.LogInfo("running in background...") + + // prepare next channels + defaultRoutes, defaultNames := o.RoutingHandler.GetDefaultRoutes() + droppedRoutes, droppedNames := o.RoutingHandler.GetDroppedRoutes() + + // prepare transforms + listChannel := []chan dnsutils.DNSMessage{} + listChannel = append(listChannel, o.outputChan) + subprocessors := transformers.NewTransforms(&o.config.OutgoingTransformers, o.logger, o.name, listChannel, 0) + + // goroutine to process transformed dns messages + go o.Process() + + // loop to process incoming messages +RUN_LOOP: + for { + select { + case <-o.stopRun: + // cleanup transformers + subprocessors.Reset() + + o.doneRun <- true + break RUN_LOOP + + case cfg, opened := <-o.configChan: + if !opened { + return + } + o.config = cfg + o.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + + case dm, opened := <-o.inputChan: + if !opened { + o.LogInfo("input channel closed!") + return + } + + // apply tranforms, init dns message with additionnals parts if necessary + subprocessors.InitDNSMessageFormat(&dm) + if subprocessors.ProcessMessage(&dm) == transformers.ReturnDrop { + o.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm) + continue + } + + // send to next ? + o.RoutingHandler.SendTo(defaultRoutes, defaultNames, dm) + + // send to output channel + o.outputChan <- dm + } + } + o.LogInfo("run terminated") +} + +func (o *ClickhouseClient) Process() { + o.LogInfo("ready to process") + +PROCESS_LOOP: + for { + select { + case <-o.stopProcess: + o.doneProcess <- true + break PROCESS_LOOP + + // incoming dns message to process + case dm, opened := <-o.outputChan: + if !opened { + o.LogInfo("output channel closed!") + return + } + t, err := time.Parse(time.RFC3339, dm.DNSTap.TimestampRFC3339) + timensec := "" + if err == nil { + timensec = strconv.Itoa(int(t.UnixNano())) + } + data := ClickhouseData{ + Identity: dm.DNSTap.Identity, + QueryIP: dm.NetworkInfo.QueryIP, + QName: dm.DNS.Qname, + Operation: dm.DNSTap.Operation, + Family: dm.NetworkInfo.Family, + Protocol: dm.NetworkInfo.Protocol, + QType: dm.DNS.Qtype, + RCode: dm.DNS.Rcode, + TimeNSec: timensec, + TimeStamp: strconv.Itoa(int(int64(dm.DNSTap.TimeSec))), + } + // nolint + url := o.url + "?query=INSERT%20INTO%20" + o.database + "." + o.table + "(identity,queryip,qname,operation,family,protocol,qtype,rcode,timensec,timestamp)%20VALUES%20('" + data.Identity + "','" + data.QueryIP + "','" + data.QName + "','" + data.Operation + "','" + data.Family + "','" + data.Protocol + "','" + data.QType + "','" + data.RCode + "','" + data.TimeNSec + "','" + data.TimeStamp + "')" + req, _ := http.NewRequest("POST", url, nil) + + req.Header.Add("Accept", "*/*") + req.Header.Add("X-ClickHouse-User", o.user) + req.Header.Add("X-ClickHouse-Key", o.password) + + _, errReq := http.DefaultClient.Do(req) + if errReq != nil { + o.LogError(errReq.Error()) + } + } + } + o.LogInfo("processing terminated") +} diff --git a/loggers/clickhouse_test.go b/loggers/clickhouse_test.go new file mode 100644 index 00000000..6bb6bffe --- /dev/null +++ b/loggers/clickhouse_test.go @@ -0,0 +1,67 @@ +package loggers + +import ( + "bufio" + "net" + "net/http" + "regexp" + "testing" + + "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-logger" +) + +func Test_ClickhouseClient(t *testing.T) { + + testcases := []struct { + mode string + pattern string + }{ + { + mode: pkgconfig.ModeJSON, + pattern: "dns.collector", + }, + } + cfg := pkgconfig.GetFakeConfig() + cfg.Loggers.ClickhouseClient.URL = "http://127.0.0.1:8123" + cfg.Loggers.ClickhouseClient.User = "default" + cfg.Loggers.ClickhouseClient.Password = "password" + cfg.Loggers.ClickhouseClient.Database = "database" + cfg.Loggers.ClickhouseClient.Table = "table" + fakeRcvr, err := net.Listen("tcp", "127.0.0.1:8123") + if err != nil { + t.Fatal(err) + } + defer fakeRcvr.Close() + + for _, tc := range testcases { + t.Run(tc.mode, func(t *testing.T) { + g := NewClickhouseClient(cfg, logger.New(false), "test") + + go g.Run() + + dm := dnsutils.GetFakeDNSMessage() + g.Channel() <- dm + // accept conn + conn, err := fakeRcvr.Accept() + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // read and parse http request on server side + request, err := http.ReadRequest(bufio.NewReader(conn)) + if err != nil { + t.Fatal(err) + } + query := request.URL.Query().Get("query") + conn.Write([]byte(pkgconfig.HTTPOK)) + + pattern := regexp.MustCompile(tc.pattern) + if !pattern.MatchString(query) { + t.Errorf("clickhouse test error want %s, got: %s", tc.pattern, query) + } + }) + } +} diff --git a/pkgconfig/loggers.go b/pkgconfig/loggers.go index bcc29eee..1255f6d3 100644 --- a/pkgconfig/loggers.go +++ b/pkgconfig/loggers.go @@ -300,6 +300,14 @@ type ConfigLoggers struct { URL string `yaml:"url"` ChannelBufferSize int `yaml:"chan-buffer-size"` } `yaml:"falco"` + ClickhouseClient struct { + Enable bool `yaml:"enable"` + URL string `yaml:"url"` + User string `yaml:"user"` + Password string `yaml:"password"` + Database string `yaml:"database"` + Table string `yaml:"table"` + } `yaml:"clickhouse"` } func (c *ConfigLoggers) SetDefault() { @@ -500,7 +508,7 @@ func (c *ConfigLoggers) SetDefault() { c.ElasticSearchClient.Enable = false c.ElasticSearchClient.Server = "http://127.0.0.1:9200/" - c.ElasticSearchClient.Index = "dnscollector" + c.ElasticSearchClient.Index = ProgName c.ElasticSearchClient.ChannelBufferSize = 4096 c.ElasticSearchClient.BulkSize = 5242880 c.ElasticSearchClient.FlushInterval = 10 @@ -546,7 +554,7 @@ func (c *ConfigLoggers) SetDefault() { c.KafkaProducer.BufferSize = 100 c.KafkaProducer.ConnectTimeout = 5 c.KafkaProducer.FlushInterval = 10 - c.KafkaProducer.Topic = "dnscollector" + c.KafkaProducer.Topic = ProgName c.KafkaProducer.Partition = 0 c.KafkaProducer.ChannelBufferSize = 4096 c.KafkaProducer.Compression = CompressNone @@ -554,6 +562,13 @@ func (c *ConfigLoggers) SetDefault() { c.FalcoClient.Enable = false c.FalcoClient.URL = "http://127.0.0.1:9200" c.FalcoClient.ChannelBufferSize = 65535 + + c.ClickhouseClient.Enable = false + c.ClickhouseClient.URL = "http://localhost:8123" + c.ClickhouseClient.User = "default" + c.ClickhouseClient.Password = "password" + c.ClickhouseClient.Database = ProgName + c.ClickhouseClient.Table = "records" } func (c *ConfigLoggers) GetTags() (ret []string) { diff --git a/pkglinker/multiplexer.go b/pkglinker/multiplexer.go index 80651a1e..4ebcf825 100644 --- a/pkglinker/multiplexer.go +++ b/pkglinker/multiplexer.go @@ -147,6 +147,9 @@ func InitMultiplexer(mapLoggers map[string]pkgutils.Worker, mapCollectors map[st if subcfg.Loggers.FalcoClient.Enable && IsLoggerRouted(config, output.Name) { mapLoggers[output.Name] = loggers.NewFalcoClient(subcfg, logger, output.Name) } + if subcfg.Loggers.ClickhouseClient.Enable && IsLoggerRouted(config, output.Name) { + mapLoggers[output.Name] = loggers.NewClickhouseClient(subcfg, logger, output.Name) + } } // load collectors diff --git a/pkglinker/pipelines.go b/pkglinker/pipelines.go index 6f9747af..cfb39149 100644 --- a/pkglinker/pipelines.go +++ b/pkglinker/pipelines.go @@ -172,6 +172,9 @@ func CreateStanza(stanzaName string, config *pkgconfig.Config, mapCollectors map if config.Loggers.FalcoClient.Enable { mapLoggers[stanzaName] = loggers.NewFalcoClient(config, logger, stanzaName) } + if config.Loggers.ClickhouseClient.Enable { + mapLoggers[stanzaName] = loggers.NewClickhouseClient(config, logger, stanzaName) + } // register the collector if enabled if config.Collectors.DNSMessage.Enable {