diff --git a/config.sample.toml b/config.sample.toml index 00155beacff..252dcb3006f 100644 --- a/config.sample.toml +++ b/config.sample.toml @@ -42,6 +42,16 @@ read-timeout = "5s" # port = 4444 # database = "" + [[input_plugins.udp_servers]] # array of tables + enabled = false + # port = 5551 + # database = "db1" + + [[input_plugins.udp_servers]] # array of tables + enabled = false + # port = 5552 + # database = "db2" + # Raft configuration [raft] # The raft port should be open between all servers in a cluster. diff --git a/src/api/udp/api.go b/src/api/udp/api.go index d70f6b9eac6..4f87f22a3af 100644 --- a/src/api/udp/api.go +++ b/src/api/udp/api.go @@ -3,7 +3,6 @@ package udp import ( "cluster" . "common" - "configuration" "coordinator" "encoding/json" "net" @@ -22,11 +21,11 @@ type Server struct { shutdown chan bool } -func NewServer(config *configuration.Configuration, coord coordinator.Coordinator, clusterConfig *cluster.ClusterConfiguration) *Server { +func NewServer(listenAddress string, database string, coord coordinator.Coordinator, clusterConfig *cluster.ClusterConfiguration) *Server { self := &Server{} - self.listenAddress = config.UdpInputPortString() - self.database = config.UdpInputDatabase + self.listenAddress = listenAddress + self.database = database self.coordinator = coord self.shutdown = make(chan bool, 1) self.clusterConfig = clusterConfig diff --git a/src/configuration/configuration.go b/src/configuration/configuration.go index 291f6b1572f..13ddf2dc4f9 100644 --- a/src/configuration/configuration.go +++ b/src/configuration/configuration.go @@ -183,8 +183,9 @@ type WalConfig struct { } type InputPlugins struct { - Graphite GraphiteConfig `toml:"graphite"` - UdpInput UdpInputConfig `toml:"udp"` + Graphite GraphiteConfig `toml:"graphite"` + UdpInput UdpInputConfig `toml:"udp"` + UdpServersInput []UdpInputConfig `toml:"udp_servers"` } type TomlConfiguration struct { @@ -218,6 +219,7 @@ type Configuration struct { UdpInputEnabled bool UdpInputPort int UdpInputDatabase string + UdpServers []UdpInputConfig RaftServerPort int RaftTimeout duration @@ -332,6 +334,7 @@ func parseTomlConfiguration(filename string) (*Configuration, error) { UdpInputEnabled: tomlConfiguration.InputPlugins.UdpInput.Enabled, UdpInputPort: tomlConfiguration.InputPlugins.UdpInput.Port, UdpInputDatabase: tomlConfiguration.InputPlugins.UdpInput.Database, + UdpServers: tomlConfiguration.InputPlugins.UdpServersInput, RaftServerPort: tomlConfiguration.Raft.Port, RaftTimeout: tomlConfiguration.Raft.Timeout, @@ -446,12 +449,12 @@ func (self *Configuration) GraphitePortString() string { return fmt.Sprintf("%s:%d", self.BindAddress, self.GraphitePort) } -func (self *Configuration) UdpInputPortString() string { - if self.UdpInputPort <= 0 { +func (self *Configuration) UdpInputPortString(port int) string { + if port <= 0 { return "" } - return fmt.Sprintf("%s:%d", self.BindAddress, self.UdpInputPort) + return fmt.Sprintf("%s:%d", self.BindAddress, port) } func (self *Configuration) HostnameOrDetect() string { diff --git a/src/server/server.go b/src/server/server.go index f75d952392f..50271b98f49 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -22,6 +22,7 @@ type Server struct { HttpApi *http.HttpServer GraphiteApi *graphite.Server UdpApi *udp.Server + UdpServers []*udp.Server AdminServer *admin.HttpServer Coordinator coordinator.Coordinator Config *configuration.Configuration @@ -60,7 +61,6 @@ func NewServer(config *configuration.Configuration) (*Server, error) { httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.ApiReadTimeout, config.AdminAssetsDir, coord, coord, clusterConfig, raftServer) httpApi.EnableSsl(config.ApiHttpSslPortString(), config.ApiHttpCertPath) graphiteApi := graphite.NewServer(config, coord, clusterConfig) - udpApi := udp.NewServer(config, coord, clusterConfig) adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString()) return &Server{ @@ -69,7 +69,6 @@ func NewServer(config *configuration.Configuration) (*Server, error) { ClusterConfig: clusterConfig, HttpApi: httpApi, GraphiteApi: graphiteApi, - UdpApi: udpApi, Coordinator: coord, AdminServer: adminServer, Config: config, @@ -138,15 +137,40 @@ func (self *Server) ListenAndServe() error { } } + // singular UDP input if self.Config.UdpInputEnabled { if self.Config.UdpInputPort <= 0 || self.Config.UdpInputDatabase == "" { log.Warn("Cannot start udp server. please check your configuration") } else { - log.Info("Starting UDP Listener on port %d", self.Config.UdpInputPort) + log.Info("Starting UDP Listener on port %d to database %s", self.Config.UdpInputPort, self.Config.UdpInputDatabase) + + self.UdpApi = udp.NewServer(self.Config.UdpInputPortString(self.Config.UdpInputPort), self.Config.UdpInputDatabase, self.Coordinator, self.ClusterConfig) go self.UdpApi.ListenAndServe() } } + // multiple UDP input + udpServersCount := len(self.Config.UdpServers) + if udpServersCount > 0 { + for i := 0; i < udpServersCount; i++ { + + port := self.Config.UdpServers[i].Port + database := self.Config.UdpServers[i].Database + + if port <= 0 || database == "" { + log.Warn("Cannot start udp server. please check your configuration") + } else { + log.Info("Starting UDP Listener on port %d to database %s", port, database) + + listenAddress := self.Config.UdpInputPortString(port) + + self.UdpServers[i] = udp.NewServer(listenAddress, database, self.Coordinator, self.ClusterConfig) + go self.UdpServers[i].ListenAndServe() + } + + } + } + // start processing continuous queries self.RaftServer.StartProcessingContinuousQueries()