diff --git a/NOTICE.txt b/NOTICE.txt index f9893b43dddd..73c53c6b0e34 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -5708,6 +5708,218 @@ Contents of probable licence file $GOMODCACHE/github.com/docker/go-units@v0.4.0/ limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/dolmen-go/contextio +Version: v0.0.0-20200217195037-68fc5150bcd5 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/dolmen-go/contextio@v0.0.0-20200217195037-68fc5150bcd5/LICENSE: + + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/andrewkroh/goja Version: v0.0.0-20190128172624-dd2ac4456e20 diff --git a/go.mod b/go.mod index 47278a56e8be..d955e91f4b39 100644 --- a/go.mod +++ b/go.mod @@ -55,6 +55,7 @@ require ( github.com/docker/go-metrics v0.0.1 // indirect github.com/docker/go-plugins-helpers v0.0.0-20181025120712-1e6269c305b8 github.com/docker/go-units v0.4.0 + github.com/dolmen-go/contextio v0.0.0-20200217195037-68fc5150bcd5 github.com/dop251/goja v0.0.0-20200831102558-9af81ddcf0e1 github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 github.com/dustin/go-humanize v1.0.0 diff --git a/go.sum b/go.sum index 67010c699485..dc1deb6297ae 100644 --- a/go.sum +++ b/go.sum @@ -259,6 +259,8 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96 h1:cenwrSVm+Z7QL github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 h1:bWDMxwH3px2JBh6AyO7hdCn/PkvCZXii8TGj7sbtEbQ= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/dolmen-go/contextio v0.0.0-20200217195037-68fc5150bcd5 h1:BzN9o4IS1Hj+AM5qDggsfMDQGFXau5KagipEFmnyIbc= +github.com/dolmen-go/contextio v0.0.0-20200217195037-68fc5150bcd5/go.mod h1:cxc20xI7fOgsFHWgt+PenlDDnMcrvh7Ocuj5hEFIdEk= github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 h1:RrkoB0pT3gnjXhL/t10BSP1mcr/0Ldea2uMyuBr2SWk= github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= diff --git a/x-pack/osquerybeat/.gitignore b/x-pack/osquerybeat/.gitignore index 6903d360c781..e06c8d5c496a 100644 --- a/x-pack/osquerybeat/.gitignore +++ b/x-pack/osquerybeat/.gitignore @@ -8,4 +8,11 @@ # Ignore Osquery artifacts that could be created during development /osqueryd -/osquery/ \ No newline at end of file +/osquery/ + +/ext/osquery-extension/build/ +/osquery-extension.ext +/osquery-extension.exe + +# VSCode +.vscode/ \ No newline at end of file diff --git a/x-pack/osquerybeat/beater/action_handler.go b/x-pack/osquerybeat/beater/action_handler.go new file mode 100644 index 000000000000..c5a06d513388 --- /dev/null +++ b/x-pack/osquerybeat/beater/action_handler.go @@ -0,0 +1,74 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package beater + +import ( + "context" + "fmt" + "time" + + "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/config" +) + +type actionHandler struct { + inputType string + bt *osquerybeat +} + +func (a *actionHandler) Name() string { + return a.inputType +} + +type actionData struct { + Query string + ID string +} + +func actionDataFromRequest(req map[string]interface{}) (ad actionData, err error) { + if len(req) == 0 { + return ad, ErrActionRequest + } + if v, ok := req["id"]; ok { + if id, ok := v.(string); ok { + ad.ID = id + } + } + if v, ok := req["data"]; ok { + if m, ok := v.(map[string]interface{}); ok { + if v, ok := m["query"]; ok { + if query, ok := v.(string); ok { + ad.Query = query + } + } + } + } + return ad, nil +} + +// Execute handles the action request. +func (a *actionHandler) Execute(ctx context.Context, req map[string]interface{}) (map[string]interface{}, error) { + + start := time.Now().UTC() + err := a.execute(ctx, req) + end := time.Now().UTC() + + res := map[string]interface{}{ + "started_at": start.Format(time.RFC3339Nano), + "completed_at": end.Format(time.RFC3339Nano), + } + + if err != nil { + res["error"] = err.Error() + } + return res, nil +} + +func (a *actionHandler) execute(ctx context.Context, req map[string]interface{}) error { + ad, err := actionDataFromRequest(req) + if err != nil { + return fmt.Errorf("%v: %w", err, ErrQueryExecution) + } + return a.bt.executeQuery(ctx, config.DefaultStreamIndex, ad.ID, ad.Query, "", req) +} diff --git a/x-pack/osquerybeat/beater/config_plugin.go b/x-pack/osquerybeat/beater/config_plugin.go new file mode 100644 index 000000000000..a8c350fa4472 --- /dev/null +++ b/x-pack/osquerybeat/beater/config_plugin.go @@ -0,0 +1,215 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package beater + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + "sync" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +const ( + configName = "osq_config" + osqueryConfigFile = "osquery.conf" + scheduleSplayPercent = 10 +) + +type QueryConfig struct { + Name string + Query string + Interval int + Platform string + Version string +} + +type ConfigPlugin struct { + dataPath string + + log *logp.Logger + + mx sync.RWMutex + + newQueryConfigs []QueryConfig + + dirty bool + schedule map[string]osqueryConfigInfo +} + +func NewConfigPlugin(log *logp.Logger, dataPath string) *ConfigPlugin { + p := &ConfigPlugin{ + dataPath: dataPath, + log: log.With("ctx", "config"), + } + + // load queries config from the file if it was previously persisted + // the errors are logged + p.loadConfig() + return p +} + +func (p *ConfigPlugin) Set(configs []QueryConfig) { + p.mx.Lock() + defer p.mx.Unlock() + + p.newQueryConfigs = configs + p.dirty = true +} + +func (p *ConfigPlugin) Count() int { + return len(p.schedule) +} + +func (p *ConfigPlugin) ResolveName(name string) (sql string, ok bool) { + p.mx.RLock() + defer p.mx.RUnlock() + sc, ok := p.schedule[name] + + return sc.Query, ok +} + +func (p *ConfigPlugin) GenerateConfig(ctx context.Context) (map[string]string, error) { + p.log.Debug("configPlugin GenerateConfig is called") + + p.mx.Lock() + defer p.mx.Unlock() + + c, err := p.render() + if err != nil { + return nil, err + } + + return map[string]string{ + configName: c, + }, nil +} + +type osqueryConfigInfo struct { + Query string `json:"query"` + Interval int `json:"interval,omitempty"` + Platform string `json:"platform,omitempty"` + Version string `json:"version,omitempty"` + Snapshot bool `json:"snapshot,omitempty"` +} + +type osqueryConfig struct { + Options map[string]interface{} `json:"options"` + Schedule map[string]osqueryConfigInfo `json:"schedule,omitempty"` +} + +func newOsqueryConfig(schedule map[string]osqueryConfigInfo) osqueryConfig { + return osqueryConfig{ + Options: map[string]interface{}{ + "schedule_splay_percent": scheduleSplayPercent, + }, + Schedule: schedule, + } +} + +func (c osqueryConfig) render() ([]byte, error) { + return json.MarshalIndent(c, "", " ") +} + +func (p *ConfigPlugin) render() (string, error) { + save := false + if p.dirty { + save = true + p.schedule = make(map[string]osqueryConfigInfo) + + for _, qc := range p.newQueryConfigs { + p.schedule[qc.Name] = osqueryConfigInfo{ + Query: qc.Query, + Interval: qc.Interval, + Platform: qc.Platform, + Version: qc.Version, + Snapshot: true, // enforce snapshot for all queries + } + } + p.dirty = false + } + + raw, err := newOsqueryConfig(p.schedule).render() + if err != nil { + return "", err + } + if save { + err := p.saveConfig(p.getConfigFilePath(), raw) + if err != nil { + p.log.Errorf("failed to persist config file: %v", err) + return "", err + } + } + return string(raw), err +} + +func (p *ConfigPlugin) loadConfig() { + p.log.Debug("try load config from file") + f, err := os.Open(p.getConfigFilePath()) + if err != nil { + if os.IsNotExist(err) { + p.log.Debug("config file is not found") + return + } + p.log.Errorf("failed to load the config file: %v", err) + return + } + defer f.Close() + + var c osqueryConfig + d := json.NewDecoder(f) + err = d.Decode(&c) + if err != nil { + p.log.Errorf("failed to decode config file: %v", err) + return + } + + sz := len(c.Schedule) + if sz == 0 { + return + } + + p.newQueryConfigs = make([]QueryConfig, 0, sz) + p.dirty = true + + for name, qi := range c.Schedule { + p.newQueryConfigs = append(p.newQueryConfigs, QueryConfig{ + Name: name, + Query: qi.Query, + Interval: qi.Interval, + Platform: qi.Platform, + Version: qi.Version, + }) + } + return +} + +func (p *ConfigPlugin) getConfigFilePath() string { + return filepath.Join(p.dataPath, osqueryConfigFile) +} + +func (p *ConfigPlugin) saveConfig(fp string, data []byte) error { + + tmpFilePath := p.getConfigFilePath() + ".tmp" + + err := ioutil.WriteFile(tmpFilePath, data, 0644) + if err != nil { + return err + } + + defer func() { + os.Remove(tmpFilePath) + }() + + err = os.Rename(tmpFilePath, p.getConfigFilePath()) + if err != nil { + return err + } + + return nil +} diff --git a/x-pack/osquerybeat/beater/config_plugin_test.go b/x-pack/osquerybeat/beater/config_plugin_test.go new file mode 100644 index 000000000000..456836a0a59a --- /dev/null +++ b/x-pack/osquerybeat/beater/config_plugin_test.go @@ -0,0 +1,198 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package beater + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/google/go-cmp/cmp" + + "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/testutil" +) + +func buildConfigFilePath(dataPath string) string { + return filepath.Join(dataPath, "osquery.conf") +} + +func renderFullConfig(schedule map[string]osqueryConfigInfo) (map[string]string, error) { + raw, err := newOsqueryConfig(schedule).render() + if err != nil { + return nil, err + } + + return map[string]string{ + configName: string(raw), + }, nil +} + +func TestConfigPluginNew(t *testing.T) { + validLogger := logp.NewLogger("config_test") + + tests := []struct { + name string + log *logp.Logger + dataPath string + shouldPanic bool + }{ + { + name: "invalid", + log: nil, + dataPath: "", + shouldPanic: true, + }, + { + name: "empty", + log: validLogger, + dataPath: "", + }, + { + name: "nonempty", + log: validLogger, + dataPath: "data", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if tc.shouldPanic { + testutil.AssertPanic(t, func() { NewConfigPlugin(tc.log, tc.dataPath) }) + return + } + + p := NewConfigPlugin(tc.log, tc.dataPath) + + diff := cmp.Diff(tc.dataPath, p.dataPath) + if diff != "" { + t.Error(diff) + } + diff = cmp.Diff(buildConfigFilePath(tc.dataPath), p.getConfigFilePath()) + if diff != "" { + t.Error(diff) + } + }) + } +} + +func TestConfigPluginNoConfigFile(t *testing.T) { + validLogger := logp.NewLogger("config_test") + + tempDirPath, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer func() { + os.RemoveAll(tempDirPath) + }() + + p := NewConfigPlugin(validLogger, tempDirPath) + diff := cmp.Diff(buildConfigFilePath(tempDirPath), p.getConfigFilePath()) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(0, p.Count()) + if diff != "" { + t.Error(diff) + } + + generatedConfig, err := p.GenerateConfig(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Expecting empty config with non-existent file + expectedConfig, err := renderFullConfig(nil) + if err != nil { + t.Fatal(err) + } + + diff = cmp.Diff(expectedConfig, generatedConfig) + if diff != "" { + t.Error(diff) + } +} + +var testQueries = []QueryConfig{ + { + Name: "users", + Query: "select * from users", + Interval: 60, + }, + { + Name: "uptime", + Query: "select * from uptime", + Interval: 30, + }, + { + Name: "processes", + Query: "select * from processes", + Interval: 45, + }, +} + +func convertQueriesToSchedule(queryConfigs []QueryConfig) map[string]osqueryConfigInfo { + schedule := make(map[string]osqueryConfigInfo) + + for _, qc := range queryConfigs { + schedule[qc.Name] = osqueryConfigInfo{ + Query: qc.Query, + Interval: qc.Interval, + Platform: qc.Platform, + Version: qc.Version, + Snapshot: true, // enforce snapshot for all queries + } + } + return schedule +} + +func TestConfigPluginWithConfig(t *testing.T) { + validLogger := logp.NewLogger("config_test") + tempDirPath, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer func() { + os.RemoveAll(tempDirPath) + }() + + p := NewConfigPlugin(validLogger, tempDirPath) + diff := cmp.Diff(buildConfigFilePath(tempDirPath), p.getConfigFilePath()) + if diff != "" { + t.Error(diff) + } + + p.Set(testQueries) + + generatedConfig, err := p.GenerateConfig(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Test the expected configuration + expectedConfig, err := renderFullConfig(convertQueriesToSchedule(testQueries)) + if err != nil { + t.Fatal(err) + } + diff = cmp.Diff(expectedConfig, generatedConfig) + if diff != "" { + t.Error(diff) + } + + // Create a new configuration plugin, test the configuration read from the file is correct + p2 := NewConfigPlugin(validLogger, tempDirPath) + generatedConfig2, err := p2.GenerateConfig(context.Background()) + if err != nil { + t.Fatal(err) + } + diff = cmp.Diff(generatedConfig, generatedConfig2) + if diff != "" { + t.Error(diff) + } +} diff --git a/x-pack/osquerybeat/beater/install.go b/x-pack/osquerybeat/beater/install.go index cd9df6ba1c95..d273a1db0ffb 100644 --- a/x-pack/osquerybeat/beater/install.go +++ b/x-pack/osquerybeat/beater/install.go @@ -7,6 +7,7 @@ package beater import ( "context" "os" + "path/filepath" "runtime" "github.com/elastic/beats/v7/libbeat/logp" @@ -15,7 +16,18 @@ import ( "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/install" ) -func installOsquery(ctx context.Context, dir string) error { +func installOsquery(ctx context.Context) error { + exefp, err := os.Executable() + if err != nil { + return err + } + exedir := filepath.Dir(exefp) + + // Install osqueryd if needed + return installOsqueryWithDir(ctx, exedir) +} + +func installOsqueryWithDir(ctx context.Context, dir string) error { log := logp.NewLogger("osqueryd_install").With("dir", dir) log.Info("Check if osqueryd needs to be installed") diff --git a/x-pack/osquerybeat/beater/logger_plugin.go b/x-pack/osquerybeat/beater/logger_plugin.go new file mode 100644 index 000000000000..83c510207f55 --- /dev/null +++ b/x-pack/osquerybeat/beater/logger_plugin.go @@ -0,0 +1,54 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package beater + +import ( + "context" + "encoding/json" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/kolide/osquery-go/plugin/logger" +) + +type SnapshotResult struct { + Action string `json:"action"` + Name string `json:"name"` + Numeric string `json:"numeric"` + CalendarTime string `json:"calendarTime"` + UnixTime int64 `json:"unixTime"` + Hits []map[string]string `json:"snapshot"` +} + +type HandleSnapshotResultFunc func(res SnapshotResult) + +type LoggerPlugin struct { + log *logp.Logger + logSnapshotFn HandleSnapshotResultFunc +} + +func NewLoggerPlugin(log *logp.Logger, logSnapshotFn HandleSnapshotResultFunc) *LoggerPlugin { + return &LoggerPlugin{ + log: log.With("ctx", "logger"), + logSnapshotFn: logSnapshotFn, + } +} + +func (p *LoggerPlugin) Log(ctx context.Context, typ logger.LogType, logText string) error { + if typ == logger.LogTypeSnapshot { + var res SnapshotResult + if err := json.Unmarshal([]byte(logText), &res); err != nil { + p.log.Errorf("failed to unmarshal shapshot result: %v", err) + return err + } + if p.logSnapshotFn != nil { + p.logSnapshotFn(res) + } + } else { + raw := []byte(logText) + p.log.Debugf("log type: %s, %s", typ, string(raw)) + } + + return nil +} diff --git a/x-pack/osquerybeat/beater/logger_plugin_test.go b/x-pack/osquerybeat/beater/logger_plugin_test.go new file mode 100644 index 000000000000..a501d8aebab1 --- /dev/null +++ b/x-pack/osquerybeat/beater/logger_plugin_test.go @@ -0,0 +1,148 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package beater + +import ( + "context" + "encoding/json" + "testing" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/google/go-cmp/cmp" + "github.com/kolide/osquery-go/plugin/logger" + + "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/testutil" +) + +func TestLoggerPlugin_New(t *testing.T) { + validLogger := logp.NewLogger("logger_test") + + tests := []struct { + name string + log *logp.Logger + logSnapshotFn HandleSnapshotResultFunc + shouldPanic bool + }{ + { + name: "invalid", + log: nil, + logSnapshotFn: nil, + shouldPanic: true, + }, + { + name: "nologfunc", + log: validLogger, + logSnapshotFn: nil, + }, + { + name: "nonempty", + log: validLogger, + logSnapshotFn: func(res SnapshotResult) {}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if tc.shouldPanic { + testutil.AssertPanic(t, func() { NewLoggerPlugin(tc.log, tc.logSnapshotFn) }) + return + } + + p := NewLoggerPlugin(tc.log, tc.logSnapshotFn) + if p == nil { + t.Error("expected nil logger pluggin") + } + }) + } +} + +func TestLoggerPlugin_Log(t *testing.T) { + validLogger := logp.NewLogger("logger_test") + + snapshotFn := func(res SnapshotResult) { + } + + result := SnapshotResult{ + Action: "foo", + Name: "bar", + Hits: []map[string]string{ + { + "testkey": "testval", + }, + { + "testkey2": "testval2", + "testkey3": "testval3", + }, + }, + } + resultbytes, err := json.Marshal(result) + if err != nil { + t.Fatal(err) + } + + tests := []struct { + name string + logSnapshotFn HandleSnapshotResultFunc + logType logger.LogType + logMessage string + err string + }{ + { + name: "nosnapshot", + logSnapshotFn: snapshotFn, + logType: logger.LogTypeString, + logMessage: "", + }, + { + name: "snapshot invalid", + logSnapshotFn: snapshotFn, + logType: logger.LogTypeSnapshot, + logMessage: "", + err: "unexpected end of JSON input", + }, + { + name: "snapshot empty", + logSnapshotFn: snapshotFn, + logType: logger.LogTypeSnapshot, + logMessage: "{}", + }, + { + name: "snapshot nonempty", + logSnapshotFn: snapshotFn, + logType: logger.LogTypeSnapshot, + logMessage: string(resultbytes), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var capturedSnapshot *SnapshotResult + p := NewLoggerPlugin(validLogger, func(res SnapshotResult) { + capturedSnapshot = &res + }) + err := p.Log(context.Background(), tc.logType, tc.logMessage) + if err != nil { + if tc.err == "" { + t.Errorf("unexpected error: %v", err) + } else { + diff := cmp.Diff(err.Error(), tc.err) + if diff != "" { + t.Error(diff) + } + } + } else { + if tc.err != "" { + t.Errorf("expected error: %v", tc.err) + } + if tc.logType == logger.LogTypeSnapshot && tc.logMessage == string(resultbytes) { + diff := cmp.Diff(capturedSnapshot, &result) + if diff != "" { + t.Error(diff) + } + } + } + }) + } +} diff --git a/x-pack/osquerybeat/beater/osquerybeat.go b/x-pack/osquerybeat/beater/osquerybeat.go index 730cd5b493fd..fef59f975ec8 100644 --- a/x-pack/osquerybeat/beater/osquerybeat.go +++ b/x-pack/osquerybeat/beater/osquerybeat.go @@ -8,14 +8,15 @@ import ( "context" "errors" "fmt" - "os" - "path/filepath" "sync" "time" "github.com/gofrs/uuid" lru "github.com/hashicorp/golang-lru" - "golang.org/x/sync/semaphore" + "github.com/kolide/osquery-go" + kconfig "github.com/kolide/osquery-go/plugin/config" + klogger "github.com/kolide/osquery-go/plugin/logger" + "golang.org/x/sync/errgroup" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -24,7 +25,8 @@ import ( "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/config" "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/distro" - "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/osqueryd" + "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/osqd" + "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/osqdcli" ) var ( @@ -39,6 +41,17 @@ const ( adhocOsqueriesTypesCacheSize = 256 // The final cache size equals the number of periodic queries plus this value, in order to have additional cache for ad-hoc queries limitQueryAtTime = 1 // Always run only one osquery query at a time. Addresses the issue: https://github.com/elastic/beats/issues/25297 + + // The interval in second for configuration refresh; + // osqueryd child process requests configuration from the configuration plugin implemented in osquerybeat + configurationRefreshIntervalSecs = 60 +) + +const ( + osqueryInputType = "osquery" + extManagerServerName = "osqextman" + configPluginName = "osq_config" + loggerPluginName = "osq_logger" ) // osquerybeat configuration. @@ -46,16 +59,13 @@ type osquerybeat struct { b *beat.Beat config config.Config client beat.Client - osqCli *osqueryd.Client + osqCli *osqdcli.Client log *logp.Logger // Beat lifecycle context, cancelled on Stop cancel context.CancelFunc mx sync.Mutex - - // limiter to run one query at a time - limitSem *semaphore.Weighted } // New creates an instance of osquerybeat. @@ -68,10 +78,9 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { } bt := &osquerybeat{ - b: b, - config: c, - log: log, - limitSem: semaphore.NewWeighted(limitQueryAtTime), + b: b, + config: c, + log: log, } return bt, nil @@ -101,20 +110,6 @@ func (bt *osquerybeat) close() { } } -func (bt *osquerybeat) inputTypes() []string { - m := make(map[string]struct{}) - for _, input := range bt.config.Inputs { - m[input.Type] = struct{}{} - } - - res := make([]string, 0, len(m)) - for k := range m { - res = append(res, k) - } - - return res -} - // Run starts osquerybeat. func (bt *osquerybeat) Run(b *beat.Beat) error { ctx, err := bt.initContext() @@ -126,38 +121,43 @@ func (bt *osquerybeat) Run(b *beat.Beat) error { // Watch input configuration updates inputConfigCh := config.WatchInputs(ctx) - var wg sync.WaitGroup - - exefp, err := os.Executable() + // Install osqueryd if needed + err = installOsquery(ctx) if err != nil { return err } - exedir := filepath.Dir(exefp) - // Create temp directory for socket and possibly other things - // The unix domain socker path is limited to 108 chars and would - // not always be able to create in subdirectory - tmpdir, removeTmpDir, err := createSockDir(bt.log) + // Create socket path + socketPath, cleanupFn, err := osqd.CreateSocketPath() if err != nil { return err } - defer func() { - if removeTmpDir != nil { - removeTmpDir() - } - }() + defer cleanupFn() - // Install osqueryd if needed - err = installOsquery(ctx, exedir) + // Create osqueryd runner + osq := osqd.New( + socketPath, + osqd.WithLogger(bt.log), + osqd.WithConfigRefresh(configurationRefreshIntervalSecs), + osqd.WithConfigPlugin(configPluginName), + osqd.WithLoggerPlugin(loggerPluginName), + ) + + // Check that osqueryd exists and runnable + err = osq.Check(ctx) if err != nil { return err } - // Start osqueryd child process - osd := osqueryd.OsqueryD{ - RootDir: exedir, - SocketPath: osqueryd.SocketPath(tmpdir), - } + // Start osqueryd + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + err := osq.Run(ctx) + if err != nil { + bt.log.Errorf("Failed to run osqueryd: %v", err) + } + return err + }) // Connect publisher processors, err := bt.reconnectPublisher(b, bt.config.Inputs) @@ -165,147 +165,150 @@ func (bt *osquerybeat) Run(b *beat.Beat) error { return err } - // Start osqueryd child process - osdCtx, osdCn := context.WithCancel(ctx) - defer osdCn() - osqDone, err := osd.Start(osdCtx) - if err != nil { - bt.log.Errorf("Failed to start osqueryd process: %v", err) - return err - } - - // Create a cache for queries - cache, err := lru.New(scheduledOsqueriesTypesCacheSize + adhocOsqueriesTypesCacheSize) + // Create a cache for queries types resolution + cache, err := lru.New(adhocOsqueriesTypesCacheSize) if err != nil { bt.log.Errorf("Failed to create osquery query results types cache: %v", err) return err } - // Connect to osqueryd socket. Replying on the client library retry logic that checks for the socket availability - bt.osqCli, err = osqueryd.NewClient(ctx, osd.SocketPath, osqueryd.DefaultTimeout, bt.log, osqueryd.WithCache(cache)) + // Create osqueryd client + cli := osqdcli.New(socketPath, + osqdcli.WithLogger(bt.log), + osqdcli.WithTimeout(3*time.Second), + osqdcli.WithCache(cache, adhocOsqueriesTypesCacheSize), + ) + + // Connect to osqueryd + err = cli.Connect(ctx) if err != nil { - bt.log.Errorf("Failed to create osqueryd client: %v", err) return err } + defer cli.Close() - cacheResize := func(size int) { - if size <= 0 { - size = scheduledOsqueriesTypesCacheSize - } - cache.Resize(size + adhocOsqueriesTypesCacheSize) - } + bt.osqCli = cli - // Unlink socket path early - if removeTmpDir != nil { - removeTmpDir() - removeTmpDir = nil - } + // Create osquery configuration plugin that loads a persisted configuration from the disk + configPlugin := NewConfigPlugin(bt.log, osq.DataPath()) + // Resize cache + cache.Resize(configPlugin.Count()) - // Start queries execution scheduler - schedCtx, schedCancel := context.WithCancel(ctx) - scheduler := NewScheduler(schedCtx, bt.query) - defer schedCancel() - wg.Add(1) - go func() { - defer wg.Done() - scheduler.Run() - }() - - // Load initial queries - loadSchedulerStreams := func(streams []config.StreamConfig) { - cacheResize(len(streams)) - scheduler.Load(streams) - } - streams, inputTypes := config.StreamsFromInputs(bt.config.Inputs) - sz := len(streams) - if sz > 0 { - loadSchedulerStreams(streams) - } + // Create osquery logger plugin + loggerPlugin := NewLoggerPlugin(bt.log, func(res SnapshotResult) { + bt.handleSnapshotResult(ctx, configPlugin, res) + }) - // Agent actions handlers - var actionHandlers []*actionHandler - unregisterActionHandlers := func() { - bt.log.Debug("unregisterActionHandlers") - // Unregister action handlers - if b.Manager != nil { - for _, ah := range actionHandlers { - b.Manager.UnregisterAction(ah) - ah.bt = nil - } - } - actionHandlers = nil - } + // Start osquery extensions for logger and configuration + g.Go(func() error { + return runExtensionServer(ctx, socketPath, configPlugin, loggerPlugin) + }) - registerActionHandlers := func(itypes []string) { - unregisterActionHandlers() - // Register action handler - if b.Manager != nil { - bt.log.Debugf("registerActionHandlers register actions: %v", itypes) - for _, inType := range itypes { - ah := &actionHandler{ - inputType: inType, - bt: bt, + // Register action handler + ah := bt.registerActionHandler(b) + defer bt.unregisterActionHandler(b, ah) + + // Set the osquery beat version to the manager payload. This allows the bundled osquery version to be reported to the stack. + bt.setManagerPayload(b) + + // Set initial queries from beats config if defined + queryConfigs := inputsToQueryConfigs(bt.config.Inputs) + if len(queryConfigs) > 0 { + configPlugin.Set(queryConfigs) + cache.Resize(configPlugin.Count()) + } + + // Run main loop + g.Go(func() error { + for { + select { + case <-ctx.Done(): + bt.log.Info("context cancelled, exiting") + return ctx.Err() + case inputConfigs := <-inputConfigCh: + // Only set processor if it was not set before + // TODO: implement a proper input/streams/processors manager, one publisher per input stream + if processors == nil { + processors, err = bt.reconnectPublisher(b, inputConfigs) + if err != nil { + bt.log.Errorf("Failed to connect beat publisher client, err: %v", err) + return err + } } - b.Manager.RegisterAction(ah) - actionHandlers = append(actionHandlers, ah) + queryConfigs = inputsToQueryConfigs(inputConfigs) + configPlugin.Set(queryConfigs) + cache.Resize(configPlugin.Count()) } - } else { - bt.log.Debug("registerActionHandlers b.Manager is nil, not registering actions") } + }) + + // Wait for clean exit + return g.Wait() +} + +func runExtensionServer(ctx context.Context, socketPath string, configPlugin *ConfigPlugin, loggerPlugin *LoggerPlugin) (err error) { + // Register config and logger extensions + extserver, err := osquery.NewExtensionManagerServer(extManagerServerName, socketPath) + if err != nil { + return } - handleInputConfig := func(inputConfigs []config.InputConfig) error { - bt.log.Debug("Handle input configuration change") - // Only set processor if it was not set before - if processors == nil { - bt.log.Debug("Set processors for the first time") - var err error - processors, err = bt.reconnectPublisher(b, inputConfigs) - if err != nil { - bt.log.Errorf("Failed to connect beat publisher client, err: %v", err) - return err + // Register osquery configuration plugin + extserver.RegisterPlugin(kconfig.NewPlugin(configPluginName, configPlugin.GenerateConfig)) + // Register osquery logger plugin + extserver.RegisterPlugin(klogger.NewPlugin(loggerPluginName, loggerPlugin.Log)) + + g, ctx := errgroup.WithContext(ctx) + // Run extension server + g.Go(func() error { + return extserver.Run() + }) + + // Run extension server shutdown goroutine, otherwise it waits for ping failure + g.Go(func() error { + for { + select { + case <-ctx.Done(): + return extserver.Shutdown(context.Background()) } - } else { - bt.log.Debug("Processors are already set") } + }) - streams, inputTypes = config.StreamsFromInputs(inputConfigs) - registerActionHandlers(inputTypes) - bt.setManagerPayload(b) - loadSchedulerStreams(streams) - return nil + return g.Wait() +} + +func (bt *osquerybeat) handleSnapshotResult(ctx context.Context, configPlugin *ConfigPlugin, res SnapshotResult) { + sql, ok := configPlugin.ResolveName(res.Name) + if !ok { + bt.log.Errorf("failed to resolve query name: %s", res.Name) + return } -LOOP: - for { - select { - case err = <-osqDone: - break LOOP // Exiting if osquery child process exited with error - case <-ctx.Done(): - bt.log.Info("Wait osqueryd exit") - exitErr := <-osqDone - bt.log.Infof("Exited osqueryd process, error: %v", exitErr) - break LOOP - case inputConfigs := <-inputConfigCh: - err = handleInputConfig(inputConfigs) - if err != nil { - bt.log.Errorf("Failed to handle input configuration, err: %v, exiting", err) - // Cancel scheduler - schedCancel() - break LOOP - } - } + hits, err := bt.osqCli.ResolveResult(ctx, sql, res.Hits) + if err != nil { + bt.log.Errorf("failed to resolve query types: %s", res.Name) + return } + _ = hits + responseID := uuid.Must(uuid.NewV4()).String() + bt.publishEvents(config.DefaultStreamIndex, res.Name, responseID, hits, nil) +} - // Unregister action handlers - unregisterActionHandlers() +func inputsToQueryConfigs(inputs []config.InputConfig) []QueryConfig { + var queryConfigs []QueryConfig - // Wait for clean scheduler exit - bt.log.Debug("Wait clean beat run exit") - wg.Wait() - bt.log.Debug("Beat run exited, err: %v", err) + for _, input := range inputs { + for _, s := range input.Streams { + queryConfigs = append(queryConfigs, QueryConfig{ + Name: s.ID, + Query: s.Query, + Interval: s.Interval, + Platform: s.Platform, + Version: s.Version, + }) + } + } - return err + return queryConfigs } func (bt *osquerybeat) setManagerPayload(b *beat.Beat) { @@ -365,86 +368,64 @@ func (bt *osquerybeat) Stop() { bt.close() } -func (bt *osquerybeat) query(ctx context.Context, q interface{}) error { - cfg, ok := q.(config.StreamConfig) - if !ok { - bt.log.Error("Unexpected query configuration") - return ErrInvalidQueryConfig - } - - // Response ID could be useful in order to differentiate between different runs for the interval queries - responseID := uuid.Must(uuid.NewV4()).String() - - log := bt.log.With("id", cfg.ID).With("query", cfg.Query).With("interval", cfg.Interval) - - reqData := map[string]interface{}{ - "id": cfg.ID, - "query": cfg.Query, +func (bt *osquerybeat) registerActionHandler(b *beat.Beat) *actionHandler { + if b.Manager == nil { + return nil } - err := bt.executeQuery(ctx, log, cfg.Index, cfg.ID, cfg.Query, responseID, reqData) - if err != nil { - // Preserving the error as is, it will be attached to the result document - return err + ah := &actionHandler{ + inputType: osqueryInputType, + bt: bt, } - return nil + b.Manager.RegisterAction(ah) + return ah } -func (bt *osquerybeat) executeQueryWithLimiter(ctx context.Context, log *logp.Logger, query string) ([]map[string]interface{}, error) { - // This limits the execution of query to one at a time. - // Concurrent use of osqueryd socket lead to failures/errors. - // Example: osquery failed: *osquery.ExtensionResponse error reading struct: error reading field 0: read unix ->/var/run/404419649/osquery.sock: i/o timeout" - // The scheduled and ad-hoc queries use the same code path at the moment. - // The plan for the next release is to switch the scheduled queries to use osqueryd scheduler instead. - err := bt.limitSem.Acquire(ctx, limitQueryAtTime) - if err != nil { - return nil, err +func (bt *osquerybeat) unregisterActionHandler(b *beat.Beat, ah *actionHandler) { + if b.Manager != nil && ah != nil { + b.Manager.UnregisterAction(ah) } - defer bt.limitSem.Release(limitQueryAtTime) +} - // "If ctx is already done, Acquire may still succeed without blocking." - // https://github.com/golang/sync/blob/master/semaphore/semaphore.go#L68 - if ctx.Err() != nil { - return nil, err - } +func (bt *osquerybeat) executeQuery(ctx context.Context, index, id, query, responseID string, req map[string]interface{}) error { - log.Debugf("Execute query: %s", query) + bt.log.Debugf("Execute query: %s", query) start := time.Now() hits, err := bt.osqCli.Query(ctx, query) if err != nil { - log.Errorf("Failed to execute query, err: %v", err) - return nil, err + bt.log.Errorf("Failed to execute query, err: %v", err) + return err } - log.Infof("Completed query in: %v", time.Since(start)) - return hits, nil -} + bt.log.Debugf("Completed query in: %v", time.Since(start)) -func (bt *osquerybeat) executeQuery(ctx context.Context, log *logp.Logger, index, id, query, responseID string, req map[string]interface{}) error { - - hits, err := bt.executeQueryWithLimiter(ctx, log, query) if err != nil { return err } + bt.publishEvents(index, id, responseID, hits, req["data"]) + return nil +} +func (bt *osquerybeat) publishEvents(index, actionID, responseID string, hits []map[string]interface{}, reqData interface{}) { bt.mx.Lock() defer bt.mx.Unlock() for _, hit := range hits { - reqData := req["data"] event := beat.Event{ Timestamp: time.Now(), Fields: common.MapStr{ "type": bt.b.Info.Name, - "action_id": id, + "action_id": actionID, "osquery": hit, }, } + if reqData != nil { event.Fields["action_data"] = reqData } + if responseID != "" { event.Fields["response_id"] = responseID } @@ -454,67 +435,5 @@ func (bt *osquerybeat) executeQuery(ctx context.Context, log *logp.Logger, index bt.client.Publish(event) } - log.Infof("The %d events sent to index %s", len(hits), index) - return nil -} - -type actionHandler struct { - inputType string - bt *osquerybeat -} - -func (a *actionHandler) Name() string { - return a.inputType -} - -type actionData struct { - Query string - ID string -} - -func actionDataFromRequest(req map[string]interface{}) (ad actionData, err error) { - if req == nil { - return ad, ErrActionRequest - } - if v, ok := req["id"]; ok { - if id, ok := v.(string); ok { - ad.ID = id - } - } - if v, ok := req["data"]; ok { - if m, ok := v.(map[string]interface{}); ok { - if v, ok := m["query"]; ok { - if query, ok := v.(string); ok { - ad.Query = query - } - } - } - } - return ad, nil -} - -// Execute handles the action request. -func (a *actionHandler) Execute(ctx context.Context, req map[string]interface{}) (map[string]interface{}, error) { - - start := time.Now().UTC() - err := a.execute(ctx, req) - end := time.Now().UTC() - - res := map[string]interface{}{ - "started_at": start.Format(time.RFC3339Nano), - "completed_at": end.Format(time.RFC3339Nano), - } - - if err != nil { - res["error"] = err.Error() - } - return res, nil -} - -func (a *actionHandler) execute(ctx context.Context, req map[string]interface{}) error { - ad, err := actionDataFromRequest(req) - if err != nil { - return fmt.Errorf("%v: %w", err, ErrQueryExecution) - } - return a.bt.executeQuery(ctx, a.bt.log, config.DefaultStreamIndex, ad.ID, ad.Query, "", req) + bt.log.Infof("The %d events sent to index %s", len(hits), index) } diff --git a/x-pack/osquerybeat/beater/runner.go b/x-pack/osquerybeat/beater/runner.go deleted file mode 100644 index c56f5caa9aa7..000000000000 --- a/x-pack/osquerybeat/beater/runner.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package beater - -import ( - "context" - "sync" - "time" -) - -type runner struct { - cancel context.CancelFunc - wg sync.WaitGroup -} - -func (r *runner) stop() { - r.cancel() - r.wg.Wait() -} - -func startRunner(pctx context.Context, q interface{}, interval time.Duration, query func(context.Context, interface{}) error) *runner { - ctx, cancel := context.WithCancel(pctx) - r := &runner{ - cancel: cancel, - } - - r.wg.Add(1) - go func() { - defer cancel() - defer r.wg.Done() - - // Run query right away - query(ctx, q) - - if interval == 0 { - return - } - - // Schedule with interval - t := time.NewTimer(interval) - defer t.Stop() - - for { - select { - case <-t.C: - query(ctx, q) - t.Reset(interval) - case <-ctx.Done(): - return - } - } - }() - - return r -} diff --git a/x-pack/osquerybeat/beater/scheduler.go b/x-pack/osquerybeat/beater/scheduler.go deleted file mode 100644 index 335555470aaf..000000000000 --- a/x-pack/osquerybeat/beater/scheduler.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package beater - -import ( - "context" - - "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/config" -) - -type QueryFunc func(context.Context, interface{}) error - -// Scheduler executes queries either periodically or once depending on the query configuration -type Scheduler struct { - ctx context.Context - inCh chan []config.StreamConfig - runners map[string]*runner - queryFunc QueryFunc - log *logp.Logger -} - -func NewScheduler(ctx context.Context, queryFunc QueryFunc) *Scheduler { - return &Scheduler{ - ctx: ctx, - inCh: make(chan []config.StreamConfig, 1), - runners: make(map[string]*runner), - queryFunc: queryFunc, - log: logp.NewLogger("scheduler"), - } -} - -func (s *Scheduler) Load(streams []config.StreamConfig) { - select { - case s.inCh <- streams: - case <-s.ctx.Done(): - } -} - -func (s *Scheduler) Run() { -LOOP: - for { - select { - case streams := <-s.inCh: - s.load(streams) - case <-s.ctx.Done(): - s.stopRunners() - s.log.Info("Exiting on context cancel") - break LOOP - } - } -} - -func (s *Scheduler) isCancelled() bool { - return s.ctx.Err() != nil -} - -func (s *Scheduler) stopRunners() { - s.load(nil) -} - -func (s *Scheduler) load(streams []config.StreamConfig) { - var ( - once, repeating []config.StreamConfig - ) - - // Separate fire-once queries and repeating queries - for _, stream := range streams { - if stream.Interval == 0 { - once = append(once, stream) - } else { - repeating = append(repeating, stream) - } - } - - // Cancel and remove the query runners that are not in the streams - var ids []string - for id, r := range s.runners { - found := false - for _, s := range repeating { - if id == s.ID { - found = true - break - } - } - if !found { - r.stop() - ids = append(ids, id) - } - } - - for _, id := range ids { - delete(s.runners, id) - } - - if s.isCancelled() { - return - } - - // Run queries that should be executed only one - for _, q := range once { - if s.isCancelled() { - return - } - startRunner(s.ctx, q, q.Interval, s.queryFunc) - } - - // Schedule interval queries - for _, q := range repeating { - if s.isCancelled() { - return - } - if _, ok := s.runners[q.ID]; !ok { - s.runners[q.ID] = startRunner(s.ctx, q, q.Interval, s.queryFunc) - } - } -} diff --git a/x-pack/osquerybeat/beater/sockdir_unix.go b/x-pack/osquerybeat/beater/sockdir_unix.go deleted file mode 100644 index eb8d1808b7b5..000000000000 --- a/x-pack/osquerybeat/beater/sockdir_unix.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -// +build !windows - -package beater - -import ( - "io/ioutil" - "os" - "syscall" - - "github.com/elastic/beats/v7/libbeat/logp" -) - -func createSockDir(log *logp.Logger) (string, func(), error) { - // Try to create socket in /var/run first - // This would result in something the directory something like: /var/run/027202467 - tpath, err := ioutil.TempDir("/var/run", "") - if err != nil { - if perr, ok := err.(*os.PathError); ok { - if perr.Err == syscall.EACCES { - log.Warnf("Failed to access the directory %s, running as non-root?", perr.Path) - tpath, err = ioutil.TempDir("", "") - if err != nil { - return "", nil, err - } - } - } - } - - return tpath, func() { - os.RemoveAll(tpath) - }, nil -} diff --git a/x-pack/osquerybeat/beater/sockdir_windows.go b/x-pack/osquerybeat/beater/sockdir_windows.go deleted file mode 100644 index 911157742418..000000000000 --- a/x-pack/osquerybeat/beater/sockdir_windows.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -// +build windows - -package beater - -import "github.com/elastic/beats/v7/libbeat/logp" - -func createSockDir(log *logp.Logger) (string, func(), error) { - // Noop on winders - return "", func() { - }, nil -} diff --git a/x-pack/osquerybeat/dev-tools/packaging/packages.yml b/x-pack/osquerybeat/dev-tools/packaging/packages.yml index 710465e9e123..04af8679a7f7 100644 --- a/x-pack/osquerybeat/dev-tools/packaging/packages.yml +++ b/x-pack/osquerybeat/dev-tools/packaging/packages.yml @@ -42,11 +42,28 @@ shared: mode: 0600 config: true + - &unix_binary_files + 'osquery-extension.ext': + source: ext/osquery-extension/build/golang-crossbuild/osquery-extension-{{.GOOS}}-{{.Platform.Arch}}{{.BinaryExt}} + mode: 0755 + + - &windows_binary_files + 'osquery-extension{{.BinaryExt}}': + source: ext/osquery-extension/build/golang-crossbuild/osquery-extension-{{.GOOS}}-{{.Platform.Arch}}{{.BinaryExt}} + mode: 0755 + # Binary package spec (tar.gz for linux/darwin) - - &binary_spec + - &unix_binary_spec + <<: *common + files: + <<: *binary_files + <<: *unix_binary_files + + - &windows_binary_spec <<: *common files: <<: *binary_files + <<: *windows_binary_files # # License modifiers for the Elastic License @@ -67,7 +84,7 @@ specs: - os: windows types: [zip] spec: - <<: *binary_spec + <<: *windows_binary_spec <<: *elastic_license_for_binaries files: '{{.BeatName}}{{.BinaryExt}}': @@ -76,13 +93,13 @@ specs: - os: darwin types: [tgz] spec: - <<: *binary_spec + <<: *unix_binary_spec <<: *elastic_license_for_binaries - os: linux types: [tgz] spec: - <<: *binary_spec + <<: *unix_binary_spec <<: *elastic_license_for_binaries files: '{{.BeatName}}{{.BinaryExt}}': diff --git a/x-pack/osquerybeat/ext/osquery-extension/main.go b/x-pack/osquerybeat/ext/osquery-extension/main.go new file mode 100644 index 000000000000..b77232709a6a --- /dev/null +++ b/x-pack/osquerybeat/ext/osquery-extension/main.go @@ -0,0 +1,76 @@ +// Borrowed from https://github.com/kolide/launcher/blob/master/cmd/osquery-extension/osquery-extension.go +// Original license from the kolide launcher repository + +// MIT License + +// Copyright (c) 2017 Kolide + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package main + +import ( + "flag" + "fmt" + "os" + "os/signal" + "time" +) + +func main() { + var ( + _ = flag.Bool("verbose", false, "") + _ = flag.Int("interval", 0, "") + _ = flag.Int("timeout", 0, "") + _ = flag.String("socket", "", "") + ) + flag.Parse() + + fmt.Fprintf(os.Stderr, "%+v", os.Args) + + go monitorForParent() + + sig := make(chan os.Signal) + signal.Notify(sig, os.Interrupt) + <-sig +} + +// continuously monitor for ppid and exit if osqueryd is no longer the parent process. +// because osqueryd is always the process starting the extension, when osqueryd is killed this process should also be cleaned up. +// sometimes the termination is not clean, causing this process to remain running, which sometimes prevents osqueryd from properly restarting. +// https://github.com/kolide/launcher/issues/341 +func monitorForParent() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + f := func() { + ppid := os.Getppid() + if ppid <= 1 { + fmt.Println("extension process no longer owned by osqueryd, quitting") + os.Exit(1) + } + } + + f() + + select { + case <-ticker.C: + f() + } +} diff --git a/x-pack/osquerybeat/internal/config/config.go b/x-pack/osquerybeat/internal/config/config.go index ba7249e88969..48f6dcfd0769 100644 --- a/x-pack/osquerybeat/internal/config/config.go +++ b/x-pack/osquerybeat/internal/config/config.go @@ -8,8 +8,6 @@ package config import ( - "time" - "github.com/elastic/beats/v7/libbeat/processors" ) @@ -24,10 +22,11 @@ import ( const DefaultStreamIndex = "logs-osquery_manager.result-default" type StreamConfig struct { - ID string `config:"id"` - Query string `config:"query"` - Interval time.Duration `config:"interval"` - Index string `config:"index"` // ES output index pattern + ID string `config:"id"` + Query string `config:"query"` // the SQL query to run + Interval int `config:"interval"` // an interval in seconds to run the query (subject to splay/smoothing). It has a maximum value of 604,800 (1 week). + Platform string `config:"platform"` // restrict this query to a given platform, default is 'all' platforms; you may use commas to set multiple platforms + Version string `config:"version"` // only run on osquery versions greater than or equal-to this version string } type InputConfig struct { @@ -40,32 +39,4 @@ type Config struct { Inputs []InputConfig `config:"inputs"` } -type void struct{} -type inputTypeSet map[string]void - -var none = void{} - var DefaultConfig = Config{} - -func StreamsFromInputs(inputs []InputConfig) ([]StreamConfig, []string) { - var ( - streams []StreamConfig - ) - - typeSet := make(inputTypeSet, 1) - for _, input := range inputs { - typeSet[input.Type] = none - for _, s := range input.Streams { - if s.Index == "" { - s.Index = DefaultStreamIndex - } - streams = append(streams, s) - } - } - - var inputTypes []string - for t := range typeSet { - inputTypes = append(inputTypes, t) - } - return streams, inputTypes -} diff --git a/x-pack/osquerybeat/internal/osqd/osqueryd.go b/x-pack/osquerybeat/internal/osqd/osqueryd.go new file mode 100644 index 000000000000..e3de3e00913a --- /dev/null +++ b/x-pack/osquerybeat/internal/osqd/osqueryd.go @@ -0,0 +1,375 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package osqd + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/dolmen-go/contextio" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/pkg/errors" +) + +const ( + osqueryDName = "osqueryd" + osqueryAutoload = "osquery.autoload" +) + +const ( + defaultExtensionsTimeout = 10 + defaultExitTimeout = 10 * time.Second + defaultDataDir = "osquery" + defaultConfigRefreshInterval = 30 // interval osqueryd will poll for configuration changed; scheduled queries configuration for now +) + +type OSQueryD struct { + socketPath string + binPath string + dataPath string + + configPlugin string + loggerPlugin string + + extensionsTimeout int + configRefreshInterval int + + log *logp.Logger +} + +type Option func(*OSQueryD) + +func WithExtensionsTimeout(to int) Option { + return func(q *OSQueryD) { + q.extensionsTimeout = to + } +} + +func WithBinaryPath(binPath string) Option { + return func(q *OSQueryD) { + q.binPath = binPath + } +} + +func WithConfigRefresh(refreshInterval int) Option { + return func(q *OSQueryD) { + q.extensionsTimeout = refreshInterval + } +} + +func WithDataPath(dataPath string) Option { + return func(q *OSQueryD) { + q.dataPath = dataPath + } +} + +func WithLogger(log *logp.Logger) Option { + return func(q *OSQueryD) { + q.log = log + } +} + +func WithConfigPlugin(name string) Option { + return func(q *OSQueryD) { + q.configPlugin = name + } +} + +func WithLoggerPlugin(name string) Option { + return func(q *OSQueryD) { + q.loggerPlugin = name + } +} + +func New(socketPath string, opts ...Option) *OSQueryD { + q := &OSQueryD{ + socketPath: socketPath, + extensionsTimeout: defaultExtensionsTimeout, + configRefreshInterval: defaultConfigRefreshInterval, + } + + for _, opt := range opts { + opt(q) + } + + if q.dataPath == "" { + q.dataPath = filepath.Join(q.binPath, defaultDataDir) + } + + return q +} + +func (q *OSQueryD) DataPath() string { + return q.dataPath +} + +// Check checks if the binary exists and executable +func (q *OSQueryD) Check(ctx context.Context) error { + err := q.prepareBinPath() + if err != nil { + return fmt.Errorf("failed to prepare bin path, %w", err) + } + + cmd := exec.Command( + osquerydPath(q.binPath), + "--S", + "--version", + ) + + err = cmd.Start() + if err != nil { + return err + } + + err = cmd.Wait() + + return nil +} + +// Run executes osqueryd binary as a child process +func (q *OSQueryD) Run(ctx context.Context) error { + cleanup, err := q.prepare(ctx) + if err != nil { + return err + } + defer cleanup() + + cmd := q.createCommand() + + q.log.Debugf("start osqueryd process: args: %v", cmd.Args) + + cmd.SysProcAttr = setpgid() + + // Read standard output + var wg sync.WaitGroup + + if q.isVerbose() { + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + wg.Add(1) + go func() { + defer wg.Done() + q.logOSQueryOutput(ctx, stdout) + }() + } + + stderr, err := cmd.StderrPipe() + if err != nil { + return err + } + + err = cmd.Start() + if err != nil { + return err + } + + var ( + errbuf strings.Builder + ) + + ctxstderr := contextio.NewReader(ctx, stderr) + wait := func() error { + if _, cerr := io.Copy(&errbuf, ctxstderr); cerr != nil { + return cerr + } + return cmd.Wait() + } + + finished := make(chan error, 1) + + // Wait on osqueryd exit + wg.Add(1) + go func() { + defer wg.Done() + select { + case finished <- wait(): + } + }() + + select { + case err = <-finished: + if err != nil { + s := strings.TrimSpace(errbuf.String()) + if s != "" { + err = fmt.Errorf("%s: %w", s, err) + } + } + if err != nil { + q.log.Errorf("process exited with error: %v", err) + } else { + q.log.Info("process exited") + } + case <-ctx.Done(): + q.log.Debug("kill process group on context done") + killProcessGroup(cmd) + // Wait till finished + <-finished + } + + wg.Wait() + + return err +} + +func (q *OSQueryD) prepare(ctx context.Context) (func(), error) { + err := q.prepareBinPath() + if err != nil { + return nil, fmt.Errorf("failed to prepare bin path, %w", err) + } + + // Create data directory for all the osquery config/runtime files + if err := os.MkdirAll(q.dataPath, 0750); err != nil { + return nil, fmt.Errorf("failed to create dir %v, %w", q.dataPath, err) + } + + // If socket path was not specified, create + if q.socketPath == "" { + // Create temp directory for socket and possibly other things + // The unix domain socker path is limited to 108 chars and would + // not always be able to create in subdirectory + socketPath, cleanupFn, err := CreateSocketPath() + if err != nil { + return nil, err + } + q.socketPath = socketPath + return cleanupFn, nil + } + + // Prepare autoload osquery-extension + extensionPath := osqueryExtensionPath(q.binPath) + if _, err := os.Stat(extensionPath); err != nil { + if os.IsNotExist(err) { + return nil, errors.Wrapf(err, "extension path does not exist: %s", extensionPath) + } else { + return nil, errors.Wrapf(err, "could not stat extension path") + } + } + + // Write the autoload file + extensionAutoloadPath := q.osqueryAutoloadPath() + if err := ioutil.WriteFile(extensionAutoloadPath, []byte(extensionPath), 0644); err != nil { + return nil, errors.Wrap(err, "could not write osquery extension autoload file") + } + + return func() {}, nil +} + +func (q *OSQueryD) prepareBinPath() error { + // If path to osquery was not set use the current executable path + if q.binPath == "" { + exePath, err := os.Executable() + if err != nil { + return err + } + q.binPath = filepath.Dir(exePath) + } + return nil +} + +func (q *OSQueryD) createCommand() *exec.Cmd { + + cmd := exec.Command( + osquerydPath(q.binPath), + "--force=true", + "--disable_watchdog", + "--utc", + // // Enable events collection + // "--disable_events=false", + // // Begin: enable process events audit + // "--disable_audit=false", + // "--audit_allow_config=true", + // "--audit_persist=true", + // "--audit_allow_process_events=true", + // // End: enable process events audit + + // // Begin: enable sockets audit + // "--audit_allow_sockets=true", + // "--audit_allow_unix=true", // Allow domain sockets audit + // // End: enable sockets audit + + // // Setting this value to 1 will auto-clear events whenever a SELECT is performed against the table, reducing all impact of the buffer. + // "--events_expiry=1", + + "--pidfile="+path.Join(q.dataPath, "osquery.pid"), + "--database_path="+path.Join(q.dataPath, "osquery.db"), + "--extensions_socket="+q.socketPath, + "--logger_path="+q.dataPath, + "--extensions_autoload="+q.osqueryAutoloadPath(), + "--extensions_interval=3", + fmt.Sprint("--extensions_timeout=", q.extensionsTimeout), + ) + + if q.configPlugin != "" { + cmd.Args = append(cmd.Args, "--config_plugin="+q.configPlugin) + } + + if q.loggerPlugin != "" { + cmd.Args = append(cmd.Args, "--logger_plugin="+q.loggerPlugin) + } + + if q.configRefreshInterval > 0 { + cmd.Args = append(cmd.Args, fmt.Sprintf("--config_refresh=%d", q.configRefreshInterval)) + } + + cmd.Args = append(cmd.Args, platformArgs()...) + + if q.isVerbose() { + cmd.Args = append(cmd.Args, "--verbose") + cmd.Args = append(cmd.Args, "--disable_logging=false") + } + return cmd +} + +func (q *OSQueryD) isVerbose() bool { + return q.log.IsDebug() +} + +func osquerydPath(dir string) string { + return filepath.Join(dir, osquerydFilename()) +} + +func osqueryExtensionPath(dir string) string { + return filepath.Join(dir, extensionName) +} + +func (q *OSQueryD) osqueryAutoloadPath() string { + return filepath.Join(q.dataPath, osqueryAutoload) +} + +func (q *OSQueryD) logOSQueryOutput(ctx context.Context, r io.ReadCloser) error { + log := q.log.With("ctx", "osqueryd output") + + buf := make([]byte, 2048, 2048) +LOOP: + for { + n, err := r.Read(buf[:]) + if n > 0 { + log.Info(string(buf[:n])) + } + if err != nil { + if err == io.EOF { + err = nil + } + return err + } + select { + case <-ctx.Done(): + break LOOP + default: + } + } + return nil +} diff --git a/x-pack/osquerybeat/internal/osqueryd/osqueryd_unix.go b/x-pack/osquerybeat/internal/osqd/osqueryd_unix.go similarity index 57% rename from x-pack/osquerybeat/internal/osqueryd/osqueryd_unix.go rename to x-pack/osquerybeat/internal/osqd/osqueryd_unix.go index 18f00603edc5..0d5da3e1cfa7 100644 --- a/x-pack/osquerybeat/internal/osqueryd/osqueryd_unix.go +++ b/x-pack/osquerybeat/internal/osqd/osqueryd_unix.go @@ -4,9 +4,11 @@ // +build !windows -package osqueryd +package osqd import ( + "io/ioutil" + "os" "os/exec" "path/filepath" "syscall" @@ -14,6 +16,30 @@ import ( "github.com/pkg/errors" ) +const ( + extensionName = "osquery-extension.ext" +) + +func CreateSocketPath() (string, func(), error) { + // Try to create socket in /var/run first + // This would result in something the directory something like: /var/run/027202467 + tpath, err := ioutil.TempDir("/var/run", "") + if err != nil { + if perr, ok := err.(*os.PathError); ok { + if perr.Err == syscall.EACCES { + tpath, err = ioutil.TempDir("", "") + if err != nil { + return "", nil, err + } + } + } + } + + return SocketPath(tpath), func() { + os.RemoveAll(tpath) + }, nil +} + func SocketPath(dir string) string { return filepath.Join(dir, "osquery.sock") } @@ -32,3 +58,7 @@ func killProcessGroup(cmd *exec.Cmd) error { err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) return errors.Wrapf(err, "kill process group %d", cmd.Process.Pid) } + +func osquerydFilename() string { + return osqueryDName +} diff --git a/x-pack/osquerybeat/internal/osqueryd/osqueryd_windows.go b/x-pack/osquerybeat/internal/osqd/osqueryd_windows.go similarity index 80% rename from x-pack/osquerybeat/internal/osqueryd/osqueryd_windows.go rename to x-pack/osquerybeat/internal/osqd/osqueryd_windows.go index 0b7d881f041c..22b4f2e820f8 100644 --- a/x-pack/osquerybeat/internal/osqueryd/osqueryd_windows.go +++ b/x-pack/osquerybeat/internal/osqd/osqueryd_windows.go @@ -4,7 +4,7 @@ // +build windows -package osqueryd +package osqd import ( "fmt" @@ -13,6 +13,14 @@ import ( "github.com/gofrs/uuid" ) +const ( + extensionName = "osquery-extension.exe" +) + +func CreateSocketPath() (string, func(), error) { + return SocketPath(""), func() { + }, nil +} func SocketPath(dir string) string { return `\\.\pipe\elastic\osquery\` + uuid.Must(uuid.NewV4()).String() @@ -35,3 +43,7 @@ func killProcessGroup(cmd *exec.Cmd) error { exec.Command("taskkill", "/F", "/T", "/PID", fmt.Sprint(cmd.Process.Pid)).Run() return nil } + +func osquerydFilename() string { + return osqueryDName + ".exe" +} diff --git a/x-pack/osquerybeat/internal/osqdcli/cache.go b/x-pack/osquerybeat/internal/osqdcli/cache.go new file mode 100644 index 000000000000..de69943da6c5 --- /dev/null +++ b/x-pack/osquerybeat/internal/osqdcli/cache.go @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package osqdcli + +type Cache interface { + Add(key, value interface{}) (evicted bool) + Get(key interface{}) (value interface{}, ok bool) + Resize(size int) (evicted int) +} + +func WithCache(cache Cache, minSize int) Option { + return func(c *Client) { + nsc := &nullSafeCache{cache: cache} + if minSize > 0 { + nsc.minSize = minSize + } + c.cache = nsc + } +} + +type nullSafeCache struct { + cache Cache + minSize int +} + +func (c *nullSafeCache) Add(key, value interface{}) (evicted bool) { + if c.cache == nil { + return + } + return c.cache.Add(key, value) +} + +func (c *nullSafeCache) Get(key interface{}) (value interface{}, ok bool) { + if c.cache == nil { + return + } + return c.cache.Get(key) +} + +func (c *nullSafeCache) Resize(size int) (evicted int) { + if c.cache == nil { + return + } + return c.cache.Resize(c.minSize + size) +} diff --git a/x-pack/osquerybeat/internal/osqdcli/client.go b/x-pack/osquerybeat/internal/osqdcli/client.go new file mode 100644 index 000000000000..ae582226573d --- /dev/null +++ b/x-pack/osquerybeat/internal/osqdcli/client.go @@ -0,0 +1,291 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package osqdcli + +import ( + "context" + "errors" + "fmt" + "strconv" + "sync" + "time" + + "golang.org/x/sync/semaphore" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/kolide/osquery-go" +) + +const ( + defaultTimeout = 30 * time.Second + defaultConnectRetries = 10 +) + +// Hardcoded retry values +const ( + retryWait = 200 * time.Millisecond +) + +// Limit number of queries across the socket +const ( + limit = 1 +) + +var ( + ErrAlreadyConnected = errors.New("already connected") + ErrClientClosed = errors.New("client is closed") +) + +type ErrorQueryFailure struct { + code int32 + message string +} + +func (e *ErrorQueryFailure) Error() string { + return fmt.Sprintf("query failed, code: %d, message: %s", e.code, e.message) +} + +type Client struct { + socketPath string + timeout time.Duration + connectRetries int + + log *logp.Logger + + cli *osquery.ExtensionManagerClient + mx sync.Mutex + + cache Cache + + cliLimiter *semaphore.Weighted +} + +type Option func(*Client) + +func WithTimeout(to time.Duration) Option { + return func(c *Client) { + c.timeout = to + } +} + +func WithLogger(log *logp.Logger) Option { + return func(c *Client) { + c.log = log + } +} + +func WithConnectRetries(retries int) Option { + return func(c *Client) { + c.connectRetries = retries + } +} + +func New(socketPath string, opts ...Option) *Client { + c := &Client{ + socketPath: socketPath, + timeout: defaultTimeout, + connectRetries: defaultConnectRetries, + cache: &nullSafeCache{}, + cliLimiter: semaphore.NewWeighted(limit), + } + + for _, opt := range opts { + opt(c) + } + + return c +} + +func (c *Client) Connect(ctx context.Context) error { + c.mx.Lock() + defer c.mx.Unlock() + c.log.Debugf("connect client: socket_path: %s, retries: %v", c.socketPath, c.connectRetries) + if c.cli != nil { + err := ErrAlreadyConnected + c.log.Error(err) + return err + } + + var err error + + for i := 0; i < c.connectRetries; i++ { + attempt := i + 1 + llog := c.log.With("attempt", attempt) + llog.Debug("connecting") + cli, err := osquery.NewClient(c.socketPath, c.timeout) + if err != nil { + llog.Errorf("failed to connect: %v", err) + if i < c.connectRetries-1 { + llog.Infof("wait before next connect attempt: retry_wait: %v", retryWait) + if werr := waitWithContext(ctx, retryWait); werr != nil { + err = werr + break // Context cancelled, exit loop + } + } else { + return err + } + continue + } + c.cli = cli + break + } + if err != nil { + c.log.Errorf("failed connect: %v", err) + return err + } + c.log.Info("connected") + return err +} + +func (c *Client) Close() { + c.mx.Lock() + defer c.mx.Unlock() + + if c.cli != nil { + c.cli.Close() + c.cli = nil + } +} + +// Query executes a given query, resolves the types +func (c *Client) Query(ctx context.Context, sql string) ([]map[string]interface{}, error) { + c.mx.Lock() + defer c.mx.Unlock() + if c.cli == nil { + return nil, ErrClientClosed + } + + err := c.cliLimiter.Acquire(ctx, limit) + if err != nil { + return nil, err + } + defer c.cliLimiter.Release(limit) + + res, err := c.cli.Client.Query(ctx, sql) + if err != nil { + return nil, fmt.Errorf("osquery failed: %w", err) + } + if res.Status.Code != int32(0) { + return nil, &ErrorQueryFailure{ + code: res.Status.Code, + message: res.Status.Message, + } + } + + return c.resolveResult(ctx, sql, res.Response) +} + +// ResolveResult types for a give query +// The API is public to allow resolution of scheduled queries results captured by custom logger plugin +func (c *Client) ResolveResult(ctx context.Context, sql string, hits []map[string]string) ([]map[string]interface{}, error) { + c.mx.Lock() + defer c.mx.Unlock() + if c.cli == nil { + return nil, ErrClientClosed + } + + err := c.cliLimiter.Acquire(ctx, limit) + if err != nil { + return nil, err + } + defer c.cliLimiter.Release(limit) + + return c.resolveResult(ctx, sql, hits) +} + +func (c *Client) resolveResult(ctx context.Context, sql string, hits []map[string]string) ([]map[string]interface{}, error) { + // Get column types + colTypes, err := c.queryColumnTypes(ctx, sql) + if err != nil { + return nil, err + } + return resolveTypes(hits, colTypes), nil +} + +func (c *Client) queryColumnTypes(ctx context.Context, sql string) (map[string]string, error) { + var colTypes map[string]string + + if v, ok := c.cache.Get(sql); ok { + colTypes, ok = v.(map[string]string) + if ok { + c.log.Debugf("using cached column types for query: %s", sql) + } else { + c.log.Error("failed get the column types from cache, incompatible type") + } + } + + if colTypes == nil { + exres, err := c.cli.GetQueryColumns(sql) + if err != nil { + return nil, fmt.Errorf("osquery get query columns failed: %w", err) + } + + colTypes = make(map[string]string) + for _, m := range exres.Response { + for k, v := range m { + colTypes[k] = v + } + } + c.cache.Add(sql, colTypes) + } + return colTypes, nil +} + +func waitWithContext(ctx context.Context, to time.Duration) error { + t := time.NewTimer(to) + defer t.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + } + return nil +} + +func resolveTypes(hits []map[string]string, colTypes map[string]string) []map[string]interface{} { + resolved := make([]map[string]interface{}, 0, len(hits)) + for _, hit := range hits { + res := resolveHitTypes(hit, colTypes) + resolved = append(resolved, res) + } + return resolved +} + +// Best effort to convert value types and replace values in the +// If conversion fails the value is kept as string +func resolveHitTypes(hit, colTypes map[string]string) map[string]interface{} { + m := make(map[string]interface{}) + for k, v := range hit { + t, ok := colTypes[k] + if ok { + var err error + switch t { + case "BIGINT", "INTEGER": + var n int64 + n, err = strconv.ParseInt(v, 10, 64) + if err == nil { + m[k] = n + } + case "UNSIGNED_BIGINT": + var n uint64 + n, err = strconv.ParseUint(v, 10, 64) + if err == nil { + m[k] = n + } + case "DOUBLE": + var n float64 + n, err = strconv.ParseFloat(v, 64) + if err == nil { + m[k] = n + } + default: + m[k] = v + } + } else { + m[k] = v + } + } + return m +} diff --git a/x-pack/osquerybeat/internal/osqueryd/client.go b/x-pack/osquerybeat/internal/osqueryd/client.go deleted file mode 100644 index 6ef8019929e4..000000000000 --- a/x-pack/osquerybeat/internal/osqueryd/client.go +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package osqueryd - -import ( - "context" - "errors" - "fmt" - "strconv" - "time" - - "github.com/elastic/beats/v7/libbeat/logp" - - "github.com/kolide/osquery-go" -) - -const ( - DefaultTimeout = 30 * time.Second - - lruCacheSize = 1024 - - retryWait = 200 * time.Millisecond - retryTimes = 10 - logTag = "osqueryd_cli" -) - -type Cache interface { - Add(key, value interface{}) (evicted bool) - Get(key interface{}) (value interface{}, ok bool) - Resize(size int) (evicted int) -} - -type Client struct { - cli *osquery.ExtensionManagerClient - cache Cache - log *logp.Logger -} - -type Option func(*Client) - -func NewClient(ctx context.Context, socketPath string, to time.Duration, log *logp.Logger, opts ...Option) (*Client, error) { - cli, err := newClientWithRetries(ctx, socketPath, to) - if err != nil { - return nil, err - } - c := &Client{ - cli: cli, - log: log, - } - - for _, opt := range opts { - opt(c) - } - - return c, nil -} - -func WithCache(cache Cache) Option { - return func(c *Client) { - c.cache = cache - } -} - -func newClientWithRetries(ctx context.Context, socketPath string, to time.Duration) (cli *osquery.ExtensionManagerClient, err error) { - log := logp.NewLogger(logTag).With("socket_path", socketPath) - for i := 0; i < retryTimes; i++ { - attempt := i + 1 - llog := log.With("attempt", attempt) - llog.Debug("Connecting") - cli, err = osquery.NewClient(socketPath, to) - if err != nil { - llog.Debug("Failed to connect, err: %v", err) - if i < retryTimes-1 { - llog.Infof("Wait for %v before next connect attempt", retryWait) - if werr := waitWithContext(ctx, retryWait); werr != nil { - err = werr - break // Context cancelled, exit loop - } - } - continue - } - break - } - if err != nil { - log.Error("Failed to connect, err: %v", err) - } else { - log.Info("Connected.") - } - return cli, err -} - -func (c *Client) Close() { - if c.cli != nil { - c.cli.Close() - c.cli = nil - } -} - -func (c *Client) Query(ctx context.Context, sql string) ([]map[string]interface{}, error) { - res, err := c.cli.Client.Query(ctx, sql) - if err != nil { - return nil, fmt.Errorf("osquery failed: %w", err) - } - if res.Status.Code != int32(0) { - return nil, errors.New(res.Status.Message) - } - - // Get column types - colTypes, err := c.queryColumnTypes(ctx, sql) - if err != nil { - return nil, err - } - - return resolveTypes(res.Response, colTypes), nil -} - -func (c *Client) queryColumnTypes(ctx context.Context, sql string) (map[string]string, error) { - var colTypes map[string]string - if c.cache != nil { - if v, ok := c.cache.Get(sql); ok { - colTypes, ok = v.(map[string]string) - if ok { - c.log.Debug("Using cached column types for query: %s", sql) - } else { - c.log.Error("Failed get the column types from cache, incompatible type") - } - } - } - if colTypes == nil { - exres, err := c.cli.GetQueryColumns(sql) - if err != nil { - return nil, fmt.Errorf("osquery get query columns failed: %w", err) - } - - colTypes = make(map[string]string) - for _, m := range exres.Response { - for k, v := range m { - colTypes[k] = v - } - } - c.cache.Add(sql, colTypes) - } - return colTypes, nil -} - -func resolveTypes(hits []map[string]string, colTypes map[string]string) []map[string]interface{} { - resolved := make([]map[string]interface{}, 0, len(hits)) - for _, hit := range hits { - res := resolveHitTypes(hit, colTypes) - resolved = append(resolved, res) - } - return resolved -} - -// Best effort to convert value types and replace values in the -// If conversion fails the value is kept as string -func resolveHitTypes(hit, colTypes map[string]string) map[string]interface{} { - m := make(map[string]interface{}) - for k, v := range hit { - t, ok := colTypes[k] - if ok { - var err error - switch t { - case "BIGINT", "INTEGER": - var n int64 - n, err = strconv.ParseInt(v, 10, 64) - if err == nil { - m[k] = n - } - case "UNSIGNED_BIGINT": - var n uint64 - n, err = strconv.ParseUint(v, 10, 64) - if err == nil { - m[k] = n - } - case "DOUBLE": - var n float64 - n, err = strconv.ParseFloat(v, 64) - if err == nil { - m[k] = n - } - default: - m[k] = v - } - } else { - m[k] = v - } - } - return m -} - -func waitWithContext(ctx context.Context, to time.Duration) error { - t := time.NewTimer(to) - defer t.Stop() - select { - case <-ctx.Done(): - return context.Canceled - case <-t.C: - } - return nil -} diff --git a/x-pack/osquerybeat/internal/osqueryd/osqueryd.go b/x-pack/osquerybeat/internal/osqueryd/osqueryd.go deleted file mode 100644 index f45bc0dc8708..000000000000 --- a/x-pack/osquerybeat/internal/osqueryd/osqueryd.go +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package osqueryd - -import ( - "context" - "fmt" - "io" - "os" - "os/exec" - "path" - "path/filepath" - "strings" - - "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/distro" -) - -// The subdirectory to hold .pid, .db, .sock and other work file for osqueryd sub process. Open for discussion. -// Will see later what needs to be parameterized and what not. -const ( - osquerySubdir = "osquery" - extensionsTimeout = 10 -) - -type OsqueryD struct { - RootDir string - SocketPath string -} - -// TODO(AM): finalize what to do with config file, how much of the config file we need etc. Open question for now. -func (q *OsqueryD) Start(ctx context.Context) (<-chan error, error) { - log := logp.NewLogger("osqueryd").With("dir", q.RootDir).With("socket_path", q.SocketPath) - log.Info("Starting process") - - dir := filepath.Join(q.RootDir, osquerySubdir) - - if err := os.MkdirAll(dir, 0700); err != nil { - return nil, fmt.Errorf("failed to create dir %v, %w", dir, err) - } - - cmd := q.createCommand(log, dir) - - cmd.SysProcAttr = setpgid() - - stderr, err := cmd.StderrPipe() - if err != nil { - return nil, err - } - - err = cmd.Start() - if err != nil { - return nil, err - } - - var ( - errbuf strings.Builder - ) - - wait := func() error { - if _, cerr := io.Copy(&errbuf, stderr); cerr != nil { - return cerr - } - return cmd.Wait() - } - - finished := make(chan error, 1) - - go func() { - finished <- wait() - }() - - done := make(chan error, 1) - go func() { - var ferr error - select { - case ferr = <-finished: - if ferr != nil { - s := strings.TrimSpace(errbuf.String()) - if s != "" { - ferr = fmt.Errorf("%s: %w", s, ferr) - } - } - if ferr != nil { - log.Errorf("Process exited with error: %v", ferr) - } else { - log.Info("Process exited") - } - case <-ctx.Done(): - log.Info("Kill process group on context done") - killProcessGroup(cmd) - // Wait till finished - <-finished - ferr = ctx.Err() - } - done <- ferr - }() - - return done, err -} - -func (q *OsqueryD) createCommand(log *logp.Logger, dir string) *exec.Cmd { - - cmd := exec.Command( - distro.OsquerydPath(q.RootDir), - "--force=true", - "--disable_watchdog", - "--utc", - "--pidfile="+path.Join(dir, "osquery.pid"), - "--database_path="+path.Join(dir, "osquery.db"), - "--extensions_socket="+q.SocketPath, - "--config_path="+path.Join(dir, "osquery.conf"), - "--logger_path="+dir, - "--extensions_autoload="+path.Join(dir, "osquery.autoload"), - fmt.Sprint("--extensions_timeout=", extensionsTimeout), - ) - - cmd.Args = append(cmd.Args, platformArgs()...) - - if log.IsDebug() { - cmd.Args = append(cmd.Args, "--verbose") - } - return cmd -} diff --git a/x-pack/osquerybeat/internal/config/config_test.go b/x-pack/osquerybeat/internal/testutil/testutil.go similarity index 57% rename from x-pack/osquerybeat/internal/config/config_test.go rename to x-pack/osquerybeat/internal/testutil/testutil.go index 6197c99ade29..7afb8e16d6b4 100644 --- a/x-pack/osquerybeat/internal/config/config_test.go +++ b/x-pack/osquerybeat/internal/testutil/testutil.go @@ -2,6 +2,15 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -// +build !integration +package testutil -package config +import "testing" + +func AssertPanic(t *testing.T, fn func()) { + defer func() { + if r := recover(); r == nil { + t.Errorf("Expected panic") + } + }() + fn() +} diff --git a/x-pack/osquerybeat/magefile.go b/x-pack/osquerybeat/magefile.go index f16696dba5ef..ac0c920502ad 100644 --- a/x-pack/osquerybeat/magefile.go +++ b/x-pack/osquerybeat/magefile.go @@ -8,6 +8,9 @@ package main import ( "fmt" + "os" + "path/filepath" + "runtime" "time" "github.com/magefile/mage/mg" @@ -15,9 +18,6 @@ import ( devtools "github.com/elastic/beats/v7/dev-tools/mage" osquerybeat "github.com/elastic/beats/v7/x-pack/osquerybeat/scripts/mage" - // mage:import - _ "github.com/elastic/beats/v7/dev-tools/mage/target/common" - // mage:import _ "github.com/elastic/beats/v7/dev-tools/mage/target/pkg" // mage:import @@ -33,11 +33,50 @@ func init() { devtools.BeatLicense = "Elastic License" } +func Check() error { + return devtools.Check() +} + func Build() error { params := devtools.DefaultBuildArgs() - // Building functionbeat manager - return devtools.Build(params) + // Building osquerybeat + err := devtools.Build(params) + if err != nil { + return err + } + + // Building osquery-extension.ext + inputFiles := filepath.Join("ext/osquery-extension/main.go") + params.InputFiles = []string{inputFiles} + params.Name = "osquery-extension" + params.CGO = false + params.Env = make(map[string]string) + err = devtools.Build(params) + if err != nil { + return err + } + + // Rename osquery-extension to osquery-extension.ext on non windows platforms + if runtime.GOOS != "windows" { + err = os.Rename("osquery-extension", "osquery-extension.ext") + if err != nil { + return err + } + } + + return nil +} + +// Clean cleans all generated files and build artifacts. +func Clean() error { + paths := devtools.DefaultCleanPaths + paths = append(paths, []string{ + "osquery-extension", + "osquery-extension.exe", + filepath.Join("ext", "osquery-extension", "build"), + }...) + return devtools.Clean(paths) } // GolangCrossBuild build the Beat binary inside of the golang-builder. @@ -53,7 +92,22 @@ func BuildGoDaemon() error { // CrossBuild cross-builds the beat for all target platforms. func CrossBuild() error { - return devtools.CrossBuild() + // Building osquerybeat + err := devtools.CrossBuild() + if err != nil { + return err + } + + if runtime.GOARCH != "amd64" { + fmt.Println("Crossbuilding functions only works on amd64 architecture.") + return nil + } + + err = devtools.CrossBuild(devtools.InDir("x-pack", "osquerybeat", "ext", "osquery-extension")) + if err != nil { + return err + } + return nil } // CrossBuildGoDaemon cross-builds the go-daemon binary using Docker.