Skip to content

Commit

Permalink
add support for static sets
Browse files Browse the repository at this point in the history
  • Loading branch information
Ulexus committed Jan 16, 2020
1 parent e6a61b3 commit 35068f8
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 95 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ Command-line options are available to customize and configure the operation of
port are optional. If not specified, namespace is `default` or the value of
`POD_NAMESPACE` and port is
`5060`.
* `-static <index>=<host>[:port][,<host>[:port]]...`- Specifies a static
dispatcher set. This is usually used to define a dispatcher set composed on
external resources, such as an external trunk. Multiple host:port pairs may
be passed for multiple contacts in the same dispatcher set. The option may
be declared any number of times for defining any number of unique dispatcher
sets. If not specified, the port will be assigned as `5060`.

For simple systems where the monitored services are in the same namespace as
`dispatchers`, you can set the `POD_NAMESPACE` environment variable to
Expand Down
105 changes: 105 additions & 0 deletions k8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"fmt"
"os"
"strconv"
"strings"

"github.com/pkg/errors"
)

var setDefinitions SetDefinitions

// SetDefinition describes a kubernetes dispatcher set's parameters
type SetDefinition struct {
id int
namespace string
name string
port string
}

// SetDefinitions represents a set of kubernetes dispatcher set parameter definitions
type SetDefinitions struct {
list []*SetDefinition
}

// String implements flag.Value
func (s *SetDefinitions) String() string {
var list []string
for _, d := range s.list {
list = append(list, d.String())
}

return strings.Join(list, ",")
}

// Set implements flag.Value
func (s *SetDefinitions) Set(raw string) error {
d := new(SetDefinition)

if err := d.Set(raw); err != nil {
return err
}

s.list = append(s.list, d)
return nil
}

func (s *SetDefinition) String() string {
return fmt.Sprintf("%s:%s=%d:%s", s.namespace, s.name, s.id, s.port)
}

// Set configures a kubernetes-derived dispatcher set
func (s *SetDefinition) Set(raw string) (err error) {
// Handle multiple comma-delimited arguments
if strings.Contains(raw, ",") {
args := strings.Split(raw, ",")
for _, n := range args {
if err = s.Set(n); err != nil {
return err
}
}
return nil
}

var id int
ns := "default"
var name string
port := "5060"

if os.Getenv("POD_NAMESPACE") != "" {
ns = os.Getenv("POD_NAMESPACE")
}

pieces := strings.SplitN(raw, "=", 2)
if len(pieces) < 2 {
return fmt.Errorf("failed to parse %s as the form [namespace:]name=index", raw)
}

naming := strings.SplitN(pieces[0], ":", 2)
if len(naming) < 2 {
name = naming[0]
} else {
ns = naming[0]
name = naming[1]
}

idString := pieces[1]
if pieces = strings.Split(pieces[1], ":"); len(pieces) > 1 {
idString = pieces[0]
port = pieces[1]
}

id, err = strconv.Atoi(idString)
if err != nil {
return errors.Wrap(err, "failed to parse index as an integer")
}

s.id = id
s.namespace = ns
s.name = name
s.port = port

return nil
}
112 changes: 17 additions & 95 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,108 +46,14 @@ const KamailioStartupDebounceTimer = time.Minute

func init() {
flag.Var(&setDefinitions, "set", "Dispatcher sets of the form [namespace:]name=index[:port], where index is a number and port is the port number on which SIP is to be signaled to the dispatchers. May be passed multiple times for multiple sets.")
flag.Var(&staticSetDefinitions, "static", "Static dispatcher sets of the form index=host[:port][,host[:port]]..., where index is the dispatcher set number/index and port is the port number on which SIP is to be signaled to the dispatchers. Multiple hosts may be defined using a comma-separated list.")
flag.StringVar(&outputFilename, "o", "/data/kamailio/dispatcher.list", "Output file for dispatcher list")
flag.StringVar(&rpcHost, "h", "127.0.0.1", "Host for kamailio's RPC service")
flag.StringVar(&rpcPort, "p", "9998", "Port for kamailio's RPC service")
flag.StringVar(&kubeCfg, "kubecfg", "", "Location of kubecfg file (if not running inside k8s)")
flag.StringVar(&apiAddr, "api", "", "Address on which to run web API service. Example ':8080'. (defaults to not run)")
}

// SetDefinition describes a kubernetes dispatcher set's parameters
type SetDefinition struct {
id int
namespace string
name string
port string
}

// SetDefinitions represents a set of kubernetes dispatcher set parameter definitions
type SetDefinitions struct {
list []*SetDefinition
}

// String implements flag.Value
func (s *SetDefinitions) String() string {
var list []string
for _, d := range s.list {
list = append(list, d.String())
}

return strings.Join(list, ",")
}

// Set implements flag.Value
func (s *SetDefinitions) Set(raw string) error {
d := new(SetDefinition)

if err := d.Set(raw); err != nil {
return err
}

s.list = append(s.list, d)
return nil
}

var setDefinitions SetDefinitions

func (s *SetDefinition) String() string {
return fmt.Sprintf("%s:%s=%d:%s", s.namespace, s.name, s.id, s.port)
}

// Set configures a kubernetes-derived dispatcher set
func (s *SetDefinition) Set(raw string) (err error) {
// Handle multiple comma-delimited arguments
if strings.Contains(raw, ",") {
args := strings.Split(raw, ",")
for _, n := range args {
if err = s.Set(n); err != nil {
return err
}
}
return nil
}

var id int
ns := "default"
var name string
port := "5060"

if os.Getenv("POD_NAMESPACE") != "" {
ns = os.Getenv("POD_NAMESPACE")
}

pieces := strings.SplitN(raw, "=", 2)
if len(pieces) < 2 {
return fmt.Errorf("failed to parse %s as the form [namespace:]name=index", raw)
}

naming := strings.SplitN(pieces[0], ":", 2)
if len(naming) < 2 {
name = naming[0]
} else {
ns = naming[0]
name = naming[1]
}

idString := pieces[1]
if pieces = strings.Split(pieces[1], ":"); len(pieces) > 1 {
idString = pieces[0]
port = pieces[1]
}

id, err = strconv.Atoi(idString)
if err != nil {
return errors.Wrap(err, "failed to parse index as an integer")
}

s.id = id
s.namespace = ns
s.name = name
s.port = port

return nil
}

type dispatcherSets struct {
kc *k8s.Client
outputFilename string
Expand All @@ -174,6 +80,16 @@ func (s *dispatcherSets) add(ctx context.Context, args *SetDefinition) error {
return nil
}

func (s *dispatcherSets) addStatic(ctx context.Context, v *StaticSetDefinition) error {
if s.sets == nil {
s.sets = make(map[int]sets.DispatcherSet)
}

s.sets[v.id] = sets.NewStaticSet(v.id, v.Members())

return nil
}

// export dumps the output from all dispatcher sets
func (s *dispatcherSets) export() error {
f, err := os.Create(s.outputFilename)
Expand Down Expand Up @@ -328,6 +244,12 @@ func run() error {
}
}

for _, vs := range staticSetDefinitions.list {
if err = s.addStatic(ctx, vs); err != nil {
return errors.Wrapf(err, "failed to add static dispatcher set: %s", vs.String())
}
}

if err = s.update(ctx); err != nil {
return errors.Wrap(err, "failed to run initial dispatcher set update")
}
Expand Down
100 changes: 100 additions & 0 deletions static.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package main

import (
"fmt"
"strconv"
"strings"

"github.com/pkg/errors"
)

var staticSetDefinitions StaticSetDefinitions

// StaticSetMember defines the parameters of a member of a static dispatcher set
type StaticSetMember struct {
Host string
Port string
}

func (s *StaticSetMember) String() string {
return fmt.Sprintf("%s:%s", s.Host, s.Port)
}

// StaticSetDefinition defines a static dispatcher set
type StaticSetDefinition struct {
id int
members []*StaticSetMember
}

// Set configures a static dispatcher set
func (s *StaticSetDefinition) Set(raw string) (err error) {
pieces := strings.Split(raw, "=")
if len(pieces) != 2 {
return errors.New("failed to parse static set definition")
}

s.id, err = strconv.Atoi(pieces[0])
if err != nil {
return errors.Errorf("failed to parse %s as an integer", pieces[0])
}

// Handle multiple comma-delimited arguments
hostList := strings.Split(pieces[1], ",")
for _, h := range hostList {
hostPieces := strings.Split(h, ":")
switch len(hostPieces) {
case 1:
s.members = append(s.members, &StaticSetMember{
Host: hostPieces[0],
Port: "5060",
})
case 2:
s.members = append(s.members, &StaticSetMember{
Host: hostPieces[0],
Port: hostPieces[1],
})
default:
return errors.Errorf("failed to parse static set member %s", h)
}
}

return nil
}

func (s *StaticSetDefinition) String() string {
return fmt.Sprintf("%d=%s", s.id, strings.Join(s.Members(), ","))
}

// Members returns the list of set members, formatted for direct inclusion in the dispatcher set
func (s *StaticSetDefinition) Members() (list []string) {
for _, m := range s.members {
list = append(list, m.String())
}
return
}

// StaticSetDefinitions is a list of static dispatcher sets
type StaticSetDefinitions struct {
list []*StaticSetDefinition
}

// String implements flag.Value
func (s *StaticSetDefinitions) String() string {
var list []string
for _, s := range s.list {
list = append(list, s.String())
}
return strings.Join(list, ",")
}

// Set implements flag.Value
func (s *StaticSetDefinitions) Set(raw string) error {
d := new(StaticSetDefinition)

if err := d.Set(raw); err != nil {
return err
}

s.list = append(s.list, d)
return nil
}

0 comments on commit 35068f8

Please sign in to comment.