Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Commit

Permalink
xep0198: removed location option
Browse files Browse the repository at this point in the history
  • Loading branch information
ortuman committed May 2, 2022
1 parent f665e2f commit e88c4c3
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 161 deletions.
44 changes: 18 additions & 26 deletions pkg/c2s/in.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ func (a *authState) reset() {
}

type inC2S struct {
ctx context.Context
cancelFn context.CancelFunc
id stream.C2SID
cfg inCfg
tr transport.Transport
Expand Down Expand Up @@ -130,7 +128,6 @@ type inC2S struct {
}

func newInC2S(
ctx context.Context,
cfg inCfg,
tr transport.Transport,
authenticators []auth.Authenticator,
Expand Down Expand Up @@ -162,29 +159,25 @@ func newInC2S(
},
sLogger,
)
ctx, cancelFn := context.WithCancel(ctx)

// init stream
stm := &inC2S{
ctx: ctx,
cancelFn: cancelFn,
id: id,
cfg: cfg,
tr: tr,
inf: c2smodel.NewInfoMap(),
session: session,
authSt: authState{authenticators: authenticators},
hosts: hosts,
router: router,
comps: comps,
mods: mods,
resMng: resMng,
shapers: shapers,
rq: runqueue.New(id.String()),
doneCh: make(chan struct{}),
state: inConnecting,
hk: hk,
logger: sLogger,
id: id,
cfg: cfg,
tr: tr,
inf: c2smodel.NewInfoMap(),
session: session,
authSt: authState{authenticators: authenticators},
hosts: hosts,
router: router,
comps: comps,
mods: mods,
resMng: resMng,
shapers: shapers,
rq: runqueue.New(id.String()),
doneCh: make(chan struct{}),
state: inConnecting,
hk: hk,
logger: sLogger,
}
if cfg.useTLS {
stm.flags.setSecured() // stream already secured
Expand Down Expand Up @@ -1144,7 +1137,6 @@ func (s *inC2S) terminate(ctx context.Context) error {
if err != nil {
return err
}
s.cancelFn()
close(s.doneCh) // signal termination

s.setState(inTerminated)
Expand Down Expand Up @@ -1224,7 +1216,7 @@ func (s *inC2S) runHook(ctx context.Context, hookName string, inf *hook.C2SStrea
}

func (s *inC2S) requestContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(s.ctx, s.cfg.reqTimeout)
return context.WithTimeout(context.Background(), s.cfg.reqTimeout)
}

var currentID uint64
Expand Down
30 changes: 11 additions & 19 deletions pkg/c2s/in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,9 @@ func TestInC2S_SendElement(t *testing.T) {
return nil
}
s := &inC2S{
ctx: context.Background(),
cancelFn: func() {},
session: sessMock,
rq: runqueue.New("in_c2s:test"),
hk: hook.NewHooks(),
session: sessMock,
rq: runqueue.New("in_c2s:test"),
hk: hook.NewHooks(),
}
// when
stanza := stravaganza.NewBuilder("auth").
Expand Down Expand Up @@ -113,16 +111,14 @@ func TestInC2S_Disconnect(t *testing.T) {
return c2sRouterMock
}
s := &inC2S{
ctx: context.Background(),
cancelFn: func() {},
state: inBinded,
session: sessMock,
tr: trMock,
router: routerMock,
resMng: rmMock,
rq: runqueue.New("in_c2s:test"),
doneCh: make(chan struct{}),
hk: hook.NewHooks(),
state: inBinded,
session: sessMock,
tr: trMock,
router: routerMock,
resMng: rmMock,
rq: runqueue.New("in_c2s:test"),
doneCh: make(chan struct{}),
hk: hook.NewHooks(),
}
// when
s.Disconnect(streamerror.E(streamerror.SystemShutdown))
Expand Down Expand Up @@ -725,8 +721,6 @@ func TestInC2S_HandleSessionElement(t *testing.T) {

userJID, _ := jid.NewWithString("ortuman@localhost", true)
stm := &inC2S{
ctx: context.Background(),
cancelFn: func() {},
cfg: inCfg{
reqTimeout: time.Minute,
maxStanzaSize: 8192,
Expand Down Expand Up @@ -825,8 +819,6 @@ func TestInC2S_HandleSessionError(t *testing.T) {
}

stm := &inC2S{
ctx: context.Background(),
cancelFn: func() {},
cfg: inCfg{
reqTimeout: time.Minute,
maxStanzaSize: 8192,
Expand Down
5 changes: 0 additions & 5 deletions pkg/c2s/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"sync/atomic"
"time"

contextutil "github.com/ortuman/jackal/pkg/util/context"

kitlog "github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/ortuman/jackal/pkg/auth"
Expand Down Expand Up @@ -63,7 +61,6 @@ var resConflictMap = map[string]resourceConflict{

// SocketListener represents a C2S socket listener type.
type SocketListener struct {
ctx context.Context
cfg ListenerConfig
extAuth *auth.External
hosts *host.Hosts
Expand Down Expand Up @@ -139,7 +136,6 @@ func newSocketListener(
)
}
ln := &SocketListener{
ctx: contextutil.InjectListenerPort(context.Background(), cfg.Port),
cfg: cfg,
extAuth: extAuth,
hosts: hosts,
Expand Down Expand Up @@ -225,7 +221,6 @@ func (l *SocketListener) Stop(ctx context.Context) error {
func (l *SocketListener) handleConn(conn net.Conn) {
tr := transport.NewSocketTransport(conn, l.cfg.ConnectTimeout, l.cfg.KeepAliveTimeout)
stm, err := newInC2S(
l.ctx,
l.getInConfig(),
tr,
l.getAuthenticators(tr),
Expand Down
45 changes: 2 additions & 43 deletions pkg/cluster/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,25 @@
package instance

import (
"errors"
"net"
"os"

"github.com/google/uuid"
)

const (
envInstanceID = "JACKAL_INSTANCE_ID"
envHostName = "JACKAL_HOSTNAME"
)

var (
instID, hostIP string
instID string
)

var (
readCachedResults = true
interfaceAddresses = net.InterfaceAddrs
readCachedResults = true
)

func init() {
instID = getID()
hostIP = getHostname()
}

// ID returns local instance identifier.
Expand All @@ -49,46 +44,10 @@ func ID() string {
return getID()
}

// Hostname returns local instance host name.
func Hostname() string {
if readCachedResults {
return hostIP
}
return getHostname()
}

func getID() string {
id := os.Getenv(envInstanceID)
if len(id) == 0 {
return uuid.New().String() // if unspecified, assign UUID identifier
}
return id
}

func getHostname() string {
fqdn := os.Getenv(envHostName)
if len(fqdn) > 0 {
return fqdn
}
hn, err := getLocalHostname()
if err == nil && len(hn) > 0 {
return hn
}
return "localhost" // fallback to 'localhost' ip
}

func getLocalHostname() (string, error) {
addresses, err := interfaceAddresses()
if err != nil {
return "", err
}

for _, addr := range addresses {
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
if ipNet.IP.To4() != nil {
return ipNet.IP.String(), nil
}
}
}
return "", errors.New("instance: failed to get local ip")
}
58 changes: 0 additions & 58 deletions pkg/cluster/instance/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,12 @@
package instance

import (
"errors"
"net"
"os"
"testing"

"github.com/stretchr/testify/require"
)

func init() {
interfaceAddresses = func() ([]net.Addr, error) {
return []net.Addr{&net.IPNet{
IP: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 192, 168, 0, 13},
Mask: []byte{255, 255, 255, 0},
}}, nil
}
}

func TestOsEnvironmentIdentifier(t *testing.T) {
// given
readCachedResults = false
Expand All @@ -57,50 +46,3 @@ func TestRandomIdentifier(t *testing.T) {
// then
require.True(t, len(id) > 0)
}

func TestFQDNHostname(t *testing.T) {
// given
_ = os.Setenv(envHostName, "xmpp1.jackal.im")
readCachedResults = false

// when
hn := Hostname()

// then
require.Equal(t, "xmpp1.jackal.im", hn)
}

func TestIPHostname(t *testing.T) {
// given
_ = os.Setenv(envHostName, "")

interfaceAddresses = func() ([]net.Addr, error) {
return []net.Addr{&net.IPNet{
IP: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 192, 168, 0, 13},
Mask: []byte{255, 255, 255, 0},
}}, nil
}
readCachedResults = false

// when
hn := Hostname()

// then
require.Equal(t, "192.168.0.13", hn)
}

func TestFallbackHostname(t *testing.T) {
// given
_ = os.Setenv(envHostName, "")

interfaceAddresses = func() ([]net.Addr, error) {
return nil, errors.New("foo error")
}
readCachedResults = false

// when
hn := Hostname()

// then
require.Equal(t, "localhost", hn)
}
23 changes: 22 additions & 1 deletion pkg/cluster/memberlist/kv_memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package memberlist

import (
"context"
"errors"
"fmt"
"net"
"strconv"
Expand Down Expand Up @@ -194,9 +195,13 @@ func (ml *KVMemberList) getMembers(ctx context.Context) ([]clustermodel.Member,
}

func (ml *KVMemberList) getLocalMember() (*clustermodel.Member, error) {
hostIP, err := getHostIP()
if err != nil {
return nil, err
}
return &clustermodel.Member{
InstanceID: instance.ID(),
Host: instance.Hostname(),
Host: hostIP,
Port: ml.localPort,
APIVer: version.ClusterAPIVersion,
}, nil
Expand Down Expand Up @@ -276,3 +281,19 @@ func localMemberKey() string {
func isLocalMemberKey(k string) bool {
return k == localMemberKey()
}

func getHostIP() (string, error) {
addresses, err := net.InterfaceAddrs()
if err != nil {
return "", err
}

for _, addr := range addresses {
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
if ipNet.IP.To4() != nil {
return ipNet.IP.String(), nil
}
}
}
return "", errors.New("instance: failed to get local ip")
}
4 changes: 2 additions & 2 deletions pkg/cluster/memberlist/kv_memberlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestMemberList_Join(t *testing.T) {
}
kvMock.GetPrefixFunc = func(ctx context.Context, prefix string) (map[string][]byte, error) {
return map[string][]byte{
fmt.Sprintf("i://%s", instance.ID()): []byte(fmt.Sprintf("a=%s:4312 cv=v1.0.0", instance.Hostname())),
fmt.Sprintf("i://%s", instance.ID()): []byte(fmt.Sprintf("a=%s:4312 cv=v1.0.0", "10.106.0.5")),
"i://b3fd": []byte("a=192.168.0.12:1456 cv=v1.0.0"),
}, nil
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestMemberList_WatchChanges(t *testing.T) {
}
kvMock.GetPrefixFunc = func(ctx context.Context, prefix string) (map[string][]byte, error) {
return map[string][]byte{
fmt.Sprintf("i://%s", instance.ID()): []byte(fmt.Sprintf("a=%s:4312 cv=v1.0.0", instance.Hostname())),
fmt.Sprintf("i://%s", instance.ID()): []byte(fmt.Sprintf("a=%s:4312 cv=v1.0.0", "10.106.0.5")),
"i://b3fd": []byte("a=192.168.0.12:1456 cv=v1.0.0"),
}, nil
}
Expand Down
Loading

0 comments on commit e88c4c3

Please sign in to comment.