Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple UDP + JSON input plugins #591

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ read-timeout = "5s"
# port = 4444
# database = ""

[[input_plugins.udp_servers]] # array of tables
enabled = false
# port = 5551
# database = "db1"

[[input_plugins.udp_servers]] # array of tables
enabled = false
# port = 5552
# database = "db2"

# Raft configuration
[raft]
# The raft port should be open between all servers in a cluster.
Expand Down
7 changes: 3 additions & 4 deletions src/api/udp/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package udp
import (
"cluster"
. "common"
"configuration"
"coordinator"
"encoding/json"
"net"
Expand All @@ -22,11 +21,11 @@ type Server struct {
shutdown chan bool
}

func NewServer(config *configuration.Configuration, coord coordinator.Coordinator, clusterConfig *cluster.ClusterConfiguration) *Server {
func NewServer(listenAddress string, database string, coord coordinator.Coordinator, clusterConfig *cluster.ClusterConfiguration) *Server {
self := &Server{}

self.listenAddress = config.UdpInputPortString()
self.database = config.UdpInputDatabase
self.listenAddress = listenAddress
self.database = database
self.coordinator = coord
self.shutdown = make(chan bool, 1)
self.clusterConfig = clusterConfig
Expand Down
13 changes: 8 additions & 5 deletions src/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,9 @@ type WalConfig struct {
}

type InputPlugins struct {
Graphite GraphiteConfig `toml:"graphite"`
UdpInput UdpInputConfig `toml:"udp"`
Graphite GraphiteConfig `toml:"graphite"`
UdpInput UdpInputConfig `toml:"udp"`
UdpServersInput []UdpInputConfig `toml:"udp_servers"`
}

type TomlConfiguration struct {
Expand Down Expand Up @@ -218,6 +219,7 @@ type Configuration struct {
UdpInputEnabled bool
UdpInputPort int
UdpInputDatabase string
UdpServers []UdpInputConfig

RaftServerPort int
RaftTimeout duration
Expand Down Expand Up @@ -332,6 +334,7 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
UdpInputEnabled: tomlConfiguration.InputPlugins.UdpInput.Enabled,
UdpInputPort: tomlConfiguration.InputPlugins.UdpInput.Port,
UdpInputDatabase: tomlConfiguration.InputPlugins.UdpInput.Database,
UdpServers: tomlConfiguration.InputPlugins.UdpServersInput,

RaftServerPort: tomlConfiguration.Raft.Port,
RaftTimeout: tomlConfiguration.Raft.Timeout,
Expand Down Expand Up @@ -446,12 +449,12 @@ func (self *Configuration) GraphitePortString() string {
return fmt.Sprintf("%s:%d", self.BindAddress, self.GraphitePort)
}

func (self *Configuration) UdpInputPortString() string {
if self.UdpInputPort <= 0 {
func (self *Configuration) UdpInputPortString(port int) string {
if port <= 0 {
return ""
}

return fmt.Sprintf("%s:%d", self.BindAddress, self.UdpInputPort)
return fmt.Sprintf("%s:%d", self.BindAddress, port)
}

func (self *Configuration) HostnameOrDetect() string {
Expand Down
30 changes: 27 additions & 3 deletions src/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Server struct {
HttpApi *http.HttpServer
GraphiteApi *graphite.Server
UdpApi *udp.Server
UdpServers []*udp.Server
AdminServer *admin.HttpServer
Coordinator coordinator.Coordinator
Config *configuration.Configuration
Expand Down Expand Up @@ -60,7 +61,6 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.ApiReadTimeout, config.AdminAssetsDir, coord, coord, clusterConfig, raftServer)
httpApi.EnableSsl(config.ApiHttpSslPortString(), config.ApiHttpCertPath)
graphiteApi := graphite.NewServer(config, coord, clusterConfig)
udpApi := udp.NewServer(config, coord, clusterConfig)
adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString())

return &Server{
Expand All @@ -69,7 +69,6 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
ClusterConfig: clusterConfig,
HttpApi: httpApi,
GraphiteApi: graphiteApi,
UdpApi: udpApi,
Coordinator: coord,
AdminServer: adminServer,
Config: config,
Expand Down Expand Up @@ -138,15 +137,40 @@ func (self *Server) ListenAndServe() error {
}
}

// singular UDP input
if self.Config.UdpInputEnabled {
if self.Config.UdpInputPort <= 0 || self.Config.UdpInputDatabase == "" {
log.Warn("Cannot start udp server. please check your configuration")
} else {
log.Info("Starting UDP Listener on port %d", self.Config.UdpInputPort)
log.Info("Starting UDP Listener on port %d to database %s", self.Config.UdpInputPort, self.Config.UdpInputDatabase)

self.UdpApi = udp.NewServer(self.Config.UdpInputPortString(self.Config.UdpInputPort), self.Config.UdpInputDatabase, self.Coordinator, self.ClusterConfig)
go self.UdpApi.ListenAndServe()
}
}

// multiple UDP input
udpServersCount := len(self.Config.UdpServers)
if udpServersCount > 0 {
for i := 0; i < udpServersCount; i++ {

port := self.Config.UdpServers[i].Port
database := self.Config.UdpServers[i].Database

if port <= 0 || database == "" {
log.Warn("Cannot start udp server. please check your configuration")
} else {
log.Info("Starting UDP Listener on port %d to database %s", port, database)

listenAddress := self.Config.UdpInputPortString(port)

self.UdpServers[i] = udp.NewServer(listenAddress, database, self.Coordinator, self.ClusterConfig)
go self.UdpServers[i].ListenAndServe()
}

}
}

// start processing continuous queries
self.RaftServer.StartProcessingContinuousQueries()

Expand Down