Skip to content

Commit

Permalink
Fix elastic#1
Browse files Browse the repository at this point in the history
Receive config options from the beats
  • Loading branch information
monicasarbu committed Apr 6, 2015
1 parent e3da228 commit fda15a7
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 84 deletions.
21 changes: 0 additions & 21 deletions common/config.go

This file was deleted.

25 changes: 8 additions & 17 deletions common/droppriv/droppriv_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,34 @@ import (
"fmt"
"syscall"

"github.com/elastic/infrabeat/common"
"github.com/BurntSushi/toml"
"github.com/elastic/infrabeat/logp"
)

type DropPrivConfig struct {
RunOptions RunOptions
}

type RunOptions struct {
Uid int
Gid int
}

func DropPrivileges(cfg common.Config) error {
var config DropPrivConfig

err := common.DecodeConfig(cfg, &config)
if err != nil {
return err
}
func DropPrivileges(config RunOptions, configMeta toml.MetaData) error {
var err error

if !cfg.Meta.IsDefined("runoptions", "uid") {
if !configMeta.IsDefined("runoptions", "uid") {
// not found, no dropping privileges but no err
return nil
}

if !cfg.Meta.IsDefined("runoptions", "gid") {
if !configMeta.IsDefined("runoptions", "gid") {
return errors.New("GID must be specified for dropping privileges")
}

logp.Info("Switching to user: %d.%d", config.RunOptions.Uid, config.RunOptions.Gid)
logp.Info("Switching to user: %d.%d", config.Uid, config.Gid)

if err = syscall.Setgid(config.RunOptions.Gid); err != nil {
if err = syscall.Setgid(config.Gid); err != nil {
return fmt.Errorf("setgid: %s", err.Error())
}

if err = syscall.Setuid(config.RunOptions.Uid); err != nil {
if err = syscall.Setuid(config.Uid); err != nil {
return fmt.Errorf("setuid: %s", err.Error())
}

Expand Down
11 changes: 8 additions & 3 deletions common/droppriv/droppriv_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ package droppriv
import (
"errors"

"github.com/elastic/infrabeat/common"
"github.com/BurntSushi/toml"
)

func DropPrivileges(cfg common.Config) error {
type RunOptions struct {
Uid int
Gid int
}

func DropPrivileges(config RunOptions, configMeta toml.MetaData) error {

if !cfg.Meta.IsDefined("runoptions", "uid") {
if !configMeta.IsDefined("runoptions", "uid") {
// not found, no dropping privileges but no err
return nil
}
Expand Down
20 changes: 5 additions & 15 deletions outputs/geolite.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,26 @@ import (
"os"
"path/filepath"

"github.com/elastic/infrabeat/common"
"github.com/BurntSushi/toml"
"github.com/elastic/infrabeat/logp"

"github.com/nranchev/go-libGeoIP"
)

var _GeoLite *libgeo.GeoIP

type GeoIPConfig struct {
Geoip Geoip
}

type Geoip struct {
Paths []string
}

func LoadGeoIPData(cfg common.Config) error {

var config GeoIPConfig

err := common.DecodeConfig(cfg, &config)
if err != nil {
return err
}
func LoadGeoIPData(config Geoip, configMeta toml.MetaData) error {

geoip_paths := []string{
"/usr/share/GeoIP/GeoIP.dat",
"/usr/local/var/GeoIP/GeoIP.dat",
}
if cfg.Meta.IsDefined("geoip", "paths") {
geoip_paths = config.Geoip.Paths
if configMeta.IsDefined("geoip", "paths") {
geoip_paths = config.Paths
}
if len(geoip_paths) == 0 {
// disabled
Expand Down Expand Up @@ -67,6 +56,7 @@ func LoadGeoIPData(cfg common.Config) error {
return nil
}

var err error
_GeoLite, err = libgeo.Load(geoip_path)
if err != nil {
logp.Warn("Could not load GeoIP data: %s", err.Error())
Expand Down
42 changes: 14 additions & 28 deletions outputs/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"strings"
"time"

"github.com/BurntSushi/toml"

"github.com/elastic/infrabeat/common"
"github.com/elastic/infrabeat/logp"
)
Expand All @@ -24,18 +22,12 @@ type PublisherType struct {
ElasticsearchOutput ElasticsearchOutputType
RedisOutput RedisOutputType
FileOutput FileOutputType
Config OutputsConfig
ConfigMeta toml.MetaData
IgnoreOutgoing bool

RefreshTopologyTimer <-chan time.Time
Queue chan common.MapStr
}

type OutputsConfig struct {
Output map[string]MothershipConfig
Agent AgentConfig
}

type MothershipConfig struct {
Enabled bool
Save_topology bool
Expand Down Expand Up @@ -149,7 +141,7 @@ func (publisher *PublisherType) publishEvent(event common.MapStr) error {
delete(event, "dst")
}

if publisher.Config.Agent.Ignore_outgoing && dst_server != "" &&
if publisher.IgnoreOutgoing && dst_server != "" &&
dst_server != publisher.name {
// duplicated transaction -> ignore it
logp.Debug("publish", "Ignore duplicated transaction on %s: %s -> %s", publisher.name, src_server, dst_server)
Expand Down Expand Up @@ -232,26 +224,20 @@ func (publisher *PublisherType) PublishTopology(params ...string) error {
}

func (publisher *PublisherType) Init(publishDisabled bool,
config common.Config) error {
outputs map[string]MothershipConfig, agent AgentConfig) error {

var err error

publisher.ConfigMeta = config.Meta

err = common.DecodeConfig(config, &publisher.Config)
if err != nil {
return err
}
publisher.IgnoreOutgoing = agent.Ignore_outgoing

publisher.disabled = publishDisabled
if publisher.disabled {
logp.Info("Dry run mode. All output types except the file based one are disabled.")
}

output, exists := publisher.Config.Output["elasticsearch"]
output, exists := outputs["elasticsearch"]
if exists && output.Enabled && !publisher.disabled {
err := publisher.ElasticsearchOutput.Init(output,
publisher.Config.Agent.Topology_expire)
agent.Topology_expire)
if err != nil {
logp.Err("Fail to initialize Elasticsearch as output: %s", err)
return err
Expand All @@ -268,11 +254,11 @@ func (publisher *PublisherType) Init(publishDisabled bool,
}
}

output, exists = publisher.Config.Output["redis"]
output, exists = outputs["redis"]
if exists && output.Enabled && !publisher.disabled {
logp.Debug("publish", "REDIS publisher enabled")
err := publisher.RedisOutput.Init(output,
publisher.Config.Agent.Topology_expire)
agent.Topology_expire)
if err != nil {
logp.Err("Fail to initialize Redis as output: %s", err)
return err
Expand All @@ -289,7 +275,7 @@ func (publisher *PublisherType) Init(publishDisabled bool,
}
}

output, exists = publisher.Config.Output["file"]
output, exists = outputs["file"]
if exists && output.Enabled {
err := publisher.FileOutput.Init(output)
if err != nil {
Expand All @@ -312,7 +298,7 @@ func (publisher *PublisherType) Init(publishDisabled bool,
}
}

publisher.name = publisher.Config.Agent.Name
publisher.name = agent.Name
if len(publisher.name) == 0 {
// use the hostname
publisher.name, err = os.Hostname()
Expand All @@ -323,14 +309,14 @@ func (publisher *PublisherType) Init(publishDisabled bool,
logp.Info("No agent name configured, using hostname '%s'", publisher.name)
}

if len(publisher.Config.Agent.Tags) > 0 {
publisher.tags = strings.Join(publisher.Config.Agent.Tags, " ")
if len(agent.Tags) > 0 {
publisher.tags = strings.Join(agent.Tags, " ")
}

if !publisher.disabled && publisher.TopologyOutput != nil {
RefreshTopologyFreq := 10 * time.Second
if publisher.Config.Agent.Refresh_topology_freq != 0 {
RefreshTopologyFreq = time.Duration(publisher.Config.Agent.Refresh_topology_freq) * time.Second
if agent.Refresh_topology_freq != 0 {
RefreshTopologyFreq = time.Duration(agent.Refresh_topology_freq) * time.Second
}
publisher.RefreshTopologyTimer = time.Tick(RefreshTopologyFreq)
logp.Info("Topology map refreshed every %s", RefreshTopologyFreq)
Expand Down

0 comments on commit fda15a7

Please sign in to comment.