Skip to content

Commit

Permalink
feat: register on lookupd with seperate broadcast ports accepted as f…
Browse files Browse the repository at this point in the history
…lags
  • Loading branch information
shyam-king committed Nov 13, 2020
1 parent 0a05a01 commit fa10dc1
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 6 deletions.
2 changes: 2 additions & 0 deletions apps/nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
authHTTPAddresses := app.StringArray{}
flagSet.Var(&authHTTPAddresses, "auth-http-address", "<addr>:<port> to query auth server (may be given multiple times)")
flagSet.String("broadcast-address", opts.BroadcastAddress, "address that will be registered with lookupd (defaults to the OS hostname)")
flagSet.Int("broadcast-tcp-port", opts.BroadcastTCPPort, "TCP port that will be registered with lookupd (defaults to the TCP port that this nsqd is listening to)")
flagSet.Int("broadcast-http-port", opts.BroadcastHTTPPort, "HTTP port that will be registered with lookupd (defaults to the HTTP port that this nsqd is listening to)")
lookupdTCPAddrs := app.StringArray{}
flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)")
flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect")
Expand Down
4 changes: 2 additions & 2 deletions nsqd/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
return func(lp *lookupPeer) {
ci := make(map[string]interface{})
ci["version"] = version.Binary
ci["tcp_port"] = n.RealTCPAddr().Port
ci["http_port"] = n.RealHTTPAddr().Port
ci["tcp_port"] = n.getOpts().BroadcastTCPPort
ci["http_port"] = n.getOpts().BroadcastHTTPPort
ci["hostname"] = hostname
ci["broadcast_address"] = n.getOpts().BroadcastAddress

Expand Down
5 changes: 5 additions & 0 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ type NSQD struct {
func New(opts *Options) (*NSQD, error) {
var err error

err = applyDefaultOptions(opts)
if err != nil {
return nil, err
}

dataPath := opts.DataPath
if opts.DataPath == "" {
cwd, _ := os.Getwd()
Expand Down
39 changes: 35 additions & 4 deletions nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package nsqd
import (
"crypto/md5"
"crypto/tls"
"fmt"
"hash/crc32"
"io"
"log"
"net"
"os"
"strconv"
"time"

"github.com/nsqio/nsq/internal/lg"
Expand All @@ -23,6 +26,8 @@ type Options struct {
HTTPAddress string `flag:"http-address"`
HTTPSAddress string `flag:"https-address"`
BroadcastAddress string `flag:"broadcast-address"`
BroadcastTCPPort int `flag:"broadcast-tcp-port"`
BroadcastHTTPPort int `flag:"broadcast-http-port"`
NSQLookupdTCPAddresses []string `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"`
AuthHTTPAddresses []string `flag:"auth-http-address" cfg:"auth_http_addresses"`
HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout" cfg:"http_client_connect_timeout"`
Expand Down Expand Up @@ -98,10 +103,12 @@ func NewOptions() *Options {
LogPrefix: "[nsqd] ",
LogLevel: lg.INFO,

TCPAddress: "0.0.0.0:4150",
HTTPAddress: "0.0.0.0:4151",
HTTPSAddress: "0.0.0.0:4152",
BroadcastAddress: hostname,
TCPAddress: "0.0.0.0:4150",
HTTPAddress: "0.0.0.0:4151",
HTTPSAddress: "0.0.0.0:4152",
BroadcastAddress: hostname,
BroadcastTCPPort: 0,
BroadcastHTTPPort: 0,

NSQLookupdTCPAddresses: make([]string, 0),
AuthHTTPAddresses: make([]string, 0),
Expand Down Expand Up @@ -149,3 +156,27 @@ func NewOptions() *Options {
TLSMinVersion: tls.VersionTLS10,
}
}

func applyDefaultOptions(opts *Options) error {
if opts.BroadcastHTTPPort == 0 {
_, portStr, err := net.SplitHostPort(opts.HTTPAddress)
if err != nil {
return fmt.Errorf("failed to parse HTTP address (%s) - %s", opts.HTTPAddress, err)
}

portInt, _ := strconv.ParseInt(portStr, 10, 32)
opts.BroadcastHTTPPort = int(portInt)
}

if opts.BroadcastTCPPort == 0 {
_, portStr, err := net.SplitHostPort(opts.TCPAddress)
if err != nil {
return fmt.Errorf("failed to parse TCP address (%s) - %s", opts.TCPAddress, err)
}

portInt, _ := strconv.ParseInt(portStr, 10, 32)
opts.BroadcastTCPPort = int(portInt)
}

return nil
}

0 comments on commit fa10dc1

Please sign in to comment.