From 9384c18a325f61c130fa52bcb4803f9a77ea6a1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81ngel=20Ortu=C3=B1o?= Date: Fri, 24 Dec 2021 13:57:34 +0100 Subject: [PATCH] Configuration refactoring (#182) --- README.md | 2 +- cmd/jackal/config.go | 123 -------- cmd/jackal/listener.go | 130 -------- cmd/jackal/main.go | 5 +- config/docker-compose.config.yaml | 18 +- config/example.config.yaml | 78 +++-- go.mod | 2 + go.sum | 1 + pkg/c2s/config.go | 76 ++--- pkg/c2s/in.go | 48 ++- pkg/c2s/in_test.go | 20 +- pkg/c2s/router_test.go | 3 +- pkg/c2s/socket_listener.go | 155 +++++++--- pkg/c2s/socket_listener_test.go | 2 +- pkg/cluster/server/server.go | 3 +- pkg/component/xep0114/config.go | 45 +++ pkg/component/xep0114/in.go | 22 +- pkg/component/xep0114/in_test.go | 18 +- pkg/component/xep0114/socket_listener.go | 76 ++--- pkg/component/xep0114/socket_listener_test.go | 4 +- pkg/host/hosts.go | 11 +- pkg/jackal/config.go | 112 +++++++ {cmd => pkg}/jackal/http_server.go | 4 +- cmd/jackal/app.go => pkg/jackal/jackal.go | 280 ++++++++++-------- {cmd => pkg}/jackal/modules.go | 58 ++-- pkg/module/roster/roster.go | 3 +- pkg/module/xep0191/blocklist.go | 3 +- pkg/module/xep0280/carbons.go | 3 +- pkg/s2s/config.go | 45 ++- pkg/s2s/in.go | 25 +- pkg/s2s/in_test.go | 20 +- pkg/s2s/interface.go | 1 + pkg/s2s/out.go | 30 +- pkg/s2s/out_provider.go | 31 +- pkg/s2s/out_test.go | 24 +- pkg/s2s/socket_listener.go | 83 ++++-- pkg/s2s/socket_listener_test.go | 2 +- pkg/version/version.go | 2 +- 38 files changed, 858 insertions(+), 710 deletions(-) delete mode 100644 cmd/jackal/config.go delete mode 100644 cmd/jackal/listener.go create mode 100644 pkg/component/xep0114/config.go create mode 100644 pkg/jackal/config.go rename {cmd => pkg}/jackal/http_server.go (97%) rename cmd/jackal/app.go => pkg/jackal/jackal.go (60%) rename {cmd => pkg}/jackal/modules.go (59%) diff --git a/README.md b/README.md index 7c438144b..4f95643fd 100644 --- a/README.md +++ b/README.md @@ -160,7 +160,7 @@ docker run --name=jackal \ Alternatively, and with the purpose of facilitating service mounting, you can make use of `docker-compose` as follows: ```sh -docker-compose -f dockerfiles/docker-compose.yml up +make dockerimage && docker-compose -f dockerfiles/docker-compose.yml up ``` This command will spin up a `jackal` server along with its dependencies on a docker network and start listening for incoming connections on port `5222`. diff --git a/cmd/jackal/config.go b/cmd/jackal/config.go deleted file mode 100644 index f1496fc4b..000000000 --- a/cmd/jackal/config.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2020 The jackal Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "path/filepath" - "time" - - "github.com/ortuman/jackal/pkg/module/offline" - - "github.com/ortuman/jackal/pkg/module/xep0092" - - "github.com/ortuman/jackal/pkg/module/xep0198" - - "github.com/ortuman/jackal/pkg/module/xep0199" - - "github.com/ortuman/jackal/pkg/shaper" - - "github.com/kkyr/fig" - adminserver "github.com/ortuman/jackal/pkg/admin/server" - "github.com/ortuman/jackal/pkg/auth/pepper" - "github.com/ortuman/jackal/pkg/cluster/etcd" - clusterserver "github.com/ortuman/jackal/pkg/cluster/server" - "github.com/ortuman/jackal/pkg/host" - "github.com/ortuman/jackal/pkg/storage" -) - -type listenerConfig struct { - Type string `fig:"type" default:"c2s"` - BindAddr string `fig:"bind_addr"` - Port int `fig:"port" default:"5222"` - Transport string `fig:"transport" default:"socket"` - DirectTLS bool `fig:"direct_tls"` - SASL struct { - Mechanisms []string `fig:"mechanisms" default:"[scram_sha_1, scram_sha_256, scram_sha_512, scram_sha3_512]"` - External struct { - Address string `fig:"address"` - IsSecure bool `fig:"is_secure"` - } `fig:"external"` - } `fig:"sasl"` - CompressionLevel string `fig:"compression_level" default:"default"` - ResourceConflict string `fig:"resource_conflict" default:"terminate_old"` - MaxStanzaSize int `fig:"max_stanza_size" default:"32768"` - Secret string `fig:"secret"` - ConnectTimeout time.Duration `fig:"conn_timeout" default:"3s"` - AuthenticateTimeout time.Duration `fig:"auth_timeout" default:"10s"` - KeepAliveTimeout time.Duration `fig:"keep_alive_timeout" default:"10m"` - RequestTimeout time.Duration `fig:"req_timeout" default:"15s"` -} - -type s2sOutConfig struct { - DialTimeout time.Duration `fig:"dial_timeout" default:"5s"` - DialbackSecret string `fig:"secret"` - ConnectTimeout time.Duration `fig:"conn_timeout" default:"3s"` - KeepAliveTimeout time.Duration `fig:"keep_alive_timeout" default:"120s"` - RequestTimeout time.Duration `fig:"req_timeout" default:"15s"` - MaxStanzaSize int `fig:"max_stanza_size" default:"131072"` -} - -type modulesConfig struct { - Enabled []string `fig:"enabled"` - - Offline offline.Config `fig:"offline"` - - // XEP-0092: Software Version - Version xep0092.Config `fig:"version"` - - // XEP-0198: Stream Management - Stream xep0198.Config `fig:"stream"` - - // XEP-0199: XMPP Ping - Ping xep0199.Config `fig:"ping"` -} - -type componentsConfig struct{} - -type serverConfig struct { - Logger struct { - Level string `fig:"level" default:"debug"` - OutputPath string `fig:"output_path"` - } `fig:"logger"` - - Cluster struct { - Etcd etcd.Config `fig:"etcd"` - Server clusterserver.Config `fig:"server"` - } `fig:"cluster"` - - HTTPPort int `fig:"http_port" default:"6060"` - - Peppers pepper.Config `fig:"peppers"` - Admin adminserver.Config `fig:"admin"` - Storage storage.Config `fig:"storage"` - Hosts []host.Config `fig:"hosts"` - Shapers []shaper.Config `fig:"shapers"` - Listeners []listenerConfig `fig:"listeners"` - S2SOut s2sOutConfig `fig:"s2s_out"` - Modules modulesConfig `fig:"modules"` - Components componentsConfig `fig:"components"` -} - -func loadConfig(configFile string) (*serverConfig, error) { - var cfg serverConfig - file := filepath.Base(configFile) - dir := filepath.Dir(configFile) - - err := fig.Load(&cfg, fig.File(file), fig.Dirs(dir)) - if err != nil { - return nil, err - } - return &cfg, nil -} diff --git a/cmd/jackal/listener.go b/cmd/jackal/listener.go deleted file mode 100644 index d94bbedcf..000000000 --- a/cmd/jackal/listener.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright 2020 The jackal Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "crypto/tls" - - "github.com/ortuman/jackal/pkg/auth" - "github.com/ortuman/jackal/pkg/c2s" - "github.com/ortuman/jackal/pkg/component/xep0114" - "github.com/ortuman/jackal/pkg/s2s" - "github.com/ortuman/jackal/pkg/transport/compress" -) - -const ( - c2sListener = "c2s" - s2sListener = "s2s" - componentListener = "component" -) - -var cmpLevelMap = map[string]compress.Level{ - "default": compress.DefaultCompression, - "best": compress.BestCompression, - "speed": compress.SpeedCompression, -} - -var resConflictMap = map[string]c2s.ResourceConflict{ - "override": c2s.Override, - "disallow": c2s.Disallow, - "terminate_old": c2s.TerminateOld, -} - -var lnFns = map[string]func(a *serverApp, cfg listenerConfig) startStopper{ - c2sListener: func(a *serverApp, cfg listenerConfig) startStopper { - var extAuth *auth.External - if len(cfg.SASL.External.Address) > 0 { - extAuth = auth.NewExternal( - cfg.SASL.External.Address, - cfg.SASL.External.IsSecure, - ) - } - return c2s.NewSocketListener( - cfg.BindAddr, - cfg.Port, - cfg.SASL.Mechanisms, - extAuth, - a.hosts, - a.router, - a.comps, - a.mods, - a.resMng, - a.rep, - a.peppers, - a.shapers, - a.hk, - c2s.Config{ - ConnectTimeout: cfg.ConnectTimeout, - AuthenticateTimeout: cfg.AuthenticateTimeout, - KeepAliveTimeout: cfg.KeepAliveTimeout, - RequestTimeout: cfg.RequestTimeout, - MaxStanzaSize: cfg.MaxStanzaSize, - CompressionLevel: cmpLevelMap[cfg.CompressionLevel], - ResourceConflict: resConflictMap[cfg.ResourceConflict], - UseTLS: cfg.DirectTLS, - TLSConfig: &tls.Config{ - Certificates: a.hosts.Certificates(), - MinVersion: tls.VersionTLS12, - }, - }, - ) - }, - s2sListener: func(a *serverApp, cfg listenerConfig) startStopper { - return s2s.NewSocketListener( - cfg.BindAddr, - cfg.Port, - a.hosts, - a.router, - a.comps, - a.mods, - a.s2sOutProvider, - a.s2sInHub, - a.kv, - a.shapers, - a.hk, - s2s.Config{ - ConnectTimeout: cfg.ConnectTimeout, - KeepAliveTimeout: cfg.KeepAliveTimeout, - RequestTimeout: cfg.RequestTimeout, - MaxStanzaSize: cfg.MaxStanzaSize, - DirectTLS: cfg.DirectTLS, - TLSConfig: &tls.Config{ - Certificates: a.hosts.Certificates(), - ClientAuth: tls.RequireAndVerifyClientCert, - MinVersion: tls.VersionTLS12, - }, - }, - ) - }, - componentListener: func(a *serverApp, cfg listenerConfig) startStopper { - return xep0114.NewSocketListener( - cfg.BindAddr, - cfg.Port, - a.hosts, - a.comps, - a.extCompMng, - a.router, - a.shapers, - a.hk, - xep0114.Config{ - ConnectTimeout: cfg.ConnectTimeout, - KeepAliveTimeout: cfg.KeepAliveTimeout, - RequestTimeout: cfg.RequestTimeout, - MaxStanzaSize: cfg.MaxStanzaSize, - Secret: cfg.Secret, - }, - ) - }, -} diff --git a/cmd/jackal/main.go b/cmd/jackal/main.go index 3b382318e..fb270f756 100644 --- a/cmd/jackal/main.go +++ b/cmd/jackal/main.go @@ -17,10 +17,13 @@ package main import ( "log" "os" + + "github.com/ortuman/jackal/pkg/jackal" ) func main() { - if err := run(os.Stdout, os.Args); err != nil { + j := jackal.New(os.Stdout, os.Args) + if err := j.Run(); err != nil { log.Fatal(err) } } diff --git a/config/docker-compose.config.yaml b/config/docker-compose.config.yaml index 4f6137dbb..cfcba86da 100644 --- a/config/docker-compose.config.yaml +++ b/config/docker-compose.config.yaml @@ -25,14 +25,10 @@ cluster: endpoints: - 'http://etcd:2379' -listeners: - - type: c2s - port: 5222 - req_timeout: 60s - transport: socket - sasl: - mechanisms: - - scram_sha_1 - - scram_sha_256 - - scram_sha_512 - - scram_sha3_512 +c2s: + listeners: + - port: 5222 + req_timeout: 60s + transport: socket + sasl: + mechanisms: [scram_sha_1, scram_sha_256, scram_sha_512, scram_sha3_512] diff --git a/config/example.config.yaml b/config/example.config.yaml index 3d4b42e6f..df002539e 100644 --- a/config/example.config.yaml +++ b/config/example.config.yaml @@ -56,49 +56,44 @@ shapers: limit: 65536 burst: 32768 -listeners: - - type: c2s - port: 5222 - req_timeout: 60s - transport: socket - sasl: - mechanisms: [scram_sha_1, scram_sha_256, scram_sha_512, scram_sha3_512] - - # Authentication gateway - # (proto: https://github.com/jackal-xmpp/jackal-proto/blob/master/jackal/proto/authenticator/v1/authenticator.proto) - external: - address: 127.0.0.1:4567 - is_secure: false - - - type: c2s - port: 5223 - direct_tls: true - req_timeout: 60s - transport: socket - sasl: - mechanisms: [scram_sha_1, scram_sha_256, scram_sha_512, scram_sha3_512] - - - type: s2s - port: 5269 - req_timeout: 60s - max_stanza_size: 131072 - - - type: s2s - port: 5270 - direct_tls: true +c2s: + listeners: + - port: 5222 + req_timeout: 60s + transport: socket + sasl: + mechanisms: [scram_sha_1, scram_sha_256, scram_sha_512, scram_sha3_512] + + # Authentication gateway + # (proto: https://github.com/jackal-xmpp/jackal-proto/blob/master/jackal/proto/authenticator/v1/authenticator.proto) + external: + address: 127.0.0.1:4567 + is_secure: false + + - port: 5223 + direct_tls: true + req_timeout: 60s + transport: socket + sasl: + mechanisms: [scram_sha_1, scram_sha_256, scram_sha_512, scram_sha3_512] + +s2s: + listeners: + - port: 5269 + req_timeout: 60s + max_stanza_size: 131072 + + - port: 5270 + direct_tls: true + req_timeout: 60s + max_stanza_size: 131072 + + out: + dialback_secret: s3cr3tf0rd14lb4ck + dial_timeout: 5s req_timeout: 60s max_stanza_size: 131072 - - type: component - port: 5275 - secret: hqcUrfHtgE73FktcXwfrP - -s2s_out: - dialback_secret: s3cr3tf0rd14lb4ck - dial_timeout: 5s - req_timeout: 60s - max_stanza_size: 131072 - modules: # enabled: # - roster @@ -128,4 +123,7 @@ modules: # timeout_action: kill components: + listeners: + - port: 5275 + secret: hqcUrfHtgE73FktcXwfrP diff --git a/go.mod b/go.mod index 9e5b53259..630b2dd27 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,8 @@ require ( sigs.k8s.io/yaml v1.2.0 // indirect ) +require golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 + require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect diff --git a/go.sum b/go.sum index f73e5bb3e..4d9cc1c54 100644 --- a/go.sum +++ b/go.sum @@ -458,6 +458,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/c2s/config.go b/pkg/c2s/config.go index 77f960430..8aff394f5 100644 --- a/pkg/c2s/config.go +++ b/pkg/c2s/config.go @@ -14,56 +14,58 @@ package c2s -import ( - "crypto/tls" - "time" +import "time" - "github.com/ortuman/jackal/pkg/transport/compress" -) +// ListenersConfig defines a set of C2S listener configurations. +type ListenersConfig []ListenerConfig -// ResourceConflict represents a resource conflict policy. -type ResourceConflict int8 +// ListenerConfig contains a C2S listener configuration. +type ListenerConfig struct { + // BindAddr defines listener incoming connections address. + BindAddr string `fig:"bind_addr"` -const ( - // Override represents 'override' resource conflict policy. - Override ResourceConflict = iota + // Port defines listener incoming connections port. + Port int `fig:"port" default:"5222"` - // Disallow represents 'disallow' resource conflict policy. - Disallow + // Transport specifies the type of transport used for incoming connections. + Transport string `fig:"transport" default:"socket"` - // TerminateOld represents 'terminate_old' resource conflict policy. - TerminateOld -) + // DirectTLS, if true, tls.Listen will be used as network listener. + DirectTLS bool `fig:"direct_tls"` -// Config defines C2S connection configuration. -type Config struct { - // ConnectTimeout defines connection timeout. - ConnectTimeout time.Duration - - // AuthenticateTimeout defines authentication timeout. - AuthenticateTimeout time.Duration + // SASL contains authentication related configuration. + SASL struct { + // Mechanisms contains enabled SASL mechanisms. + Mechanisms []string `fig:"mechanisms" default:"[scram_sha_1, scram_sha_256, scram_sha_512, scram_sha3_512]"` - // KeepAliveTimeout defines the maximum amount of time that an inactive connection - // would be considered alive. - KeepAliveTimeout time.Duration - - // RequestTimeout defines C2S stream request timeout. - RequestTimeout time.Duration - - // MaxStanzaSize is the maximum size a listener incoming stanza may have. - MaxStanzaSize int + // External contains external authenticator configuration. + External struct { + Address string `fig:"address"` + IsSecure bool `fig:"is_secure"` + } `fig:"external"` + } `fig:"sasl"` // CompressionLevel is the compression level that may be applied to the stream. // Valid values are 'default', 'best', 'speed' and 'no_compression'. - CompressionLevel compress.Level + CompressionLevel string `fig:"compression_level" default:"default"` // ResourceConflict defines the which rule should be applied in a resource conflict is detected. // Valid values are `override`, `disallow` and `terminate_old`. - ResourceConflict ResourceConflict + ResourceConflict string `fig:"resource_conflict" default:"terminate_old"` - // UseTLS, if true, tls.Listen will be used as network listener. - UseTLS bool + // MaxStanzaSize is the maximum size a listener incoming stanza may have. + MaxStanzaSize int `fig:"max_stanza_size" default:"32768"` + + // ConnectTimeout defines connection timeout. + ConnectTimeout time.Duration `fig:"conn_timeout" default:"3s"` + + // AuthenticateTimeout defines authentication timeout. + AuthenticateTimeout time.Duration `fig:"auth_timeout" default:"10s"` - // TLSConfig contains configuration to be used when TLS listener is enabled. - TLSConfig *tls.Config + // KeepAliveTimeout defines the maximum amount of time that an inactive connection + // would be considered alive. + KeepAliveTimeout time.Duration `fig:"keep_alive_timeout" default:"2m"` + + // RequestTimeout defines C2S stream request timeout. + RequestTimeout time.Duration `fig:"req_timeout" default:"15s"` } diff --git a/pkg/c2s/in.go b/pkg/c2s/in.go index f2e2ac518..561619e4b 100644 --- a/pkg/c2s/in.go +++ b/pkg/c2s/in.go @@ -69,6 +69,26 @@ var ( disconnectTimeout = time.Second * 5 ) +type resourceConflict int8 + +const ( + override resourceConflict = iota + disallow + terminateOld +) + +type inCfg struct { + connectTimeout time.Duration + authenticateTimeout time.Duration + keepAliveTimeout time.Duration + reqTimeout time.Duration + maxStanzaSize int + compressionLevel compress.Level + resConflict resourceConflict + useTLS bool + tlsConfig *tls.Config +} + type authState struct { authenticators []auth.Authenticator active auth.Authenticator @@ -83,7 +103,7 @@ func (a *authState) reset() { type inC2S struct { id stream.C2SID - cfg Config + cfg inCfg tr transport.Transport authSt authState hosts hosts @@ -108,6 +128,7 @@ type inC2S struct { } func newInC2S( + cfg inCfg, tr transport.Transport, authenticators []auth.Authenticator, hosts *host.Hosts, @@ -117,7 +138,6 @@ func newInC2S( resMng *ResourceManager, shapers shaper.Shapers, hk *hook.Hooks, - cfg Config, ) (*inC2S, error) { // set default rate limiter rLim := shapers.DefaultC2S().RateLimiter() @@ -132,7 +152,7 @@ func newInC2S( tr, hosts, xmppsession.Config{ - MaxStanzaSize: cfg.MaxStanzaSize, + MaxStanzaSize: cfg.maxStanzaSize, }, ) // init stream @@ -154,7 +174,7 @@ func newInC2S( state: inConnecting, hk: hk, } - if cfg.UseTLS { + if cfg.useTLS { stm.flags.setSecured() // stream already secured } return stm, nil @@ -338,11 +358,11 @@ func (s *inC2S) start() error { func (s *inC2S) readLoop() { s.restartSession() - tm := time.AfterFunc(s.cfg.ConnectTimeout, s.connTimeout) // schedule connect timeout + tm := time.AfterFunc(s.cfg.connectTimeout, s.connTimeout) // schedule connect timeout elem, sErr := s.session.Receive() tm.Stop() - authTm := time.AfterFunc(s.cfg.AuthenticateTimeout, s.connTimeout) // schedule authenticate timeout + authTm := time.AfterFunc(s.cfg.authenticateTimeout, s.connTimeout) // schedule authenticate timeout defer authTm.Stop() for { @@ -358,7 +378,7 @@ func (s *inC2S) readLoop() { s.handleSessionResult(elem, sErr) doRead: - tm := time.AfterFunc(s.cfg.KeepAliveTimeout, s.connTimeout) // schedule read timeout + tm := time.AfterFunc(s.cfg.keepAliveTimeout, s.connTimeout) // schedule read timeout elem, sErr = s.session.Receive() tm.Stop() } @@ -780,7 +800,7 @@ func (s *inC2S) authenticatedFeatures(ctx context.Context) ([]stravaganza.Elemen isSocketTr := s.tr.Type() == transport.Socket // compression feature - compressionAvailable := isSocketTr && s.cfg.CompressionLevel != compress.NoCompression + compressionAvailable := isSocketTr && s.cfg.compressionLevel != compress.NoCompression if !s.flags.isCompressed() && compressionAvailable { compressionElem := stravaganza.NewBuilder("compression"). @@ -948,7 +968,7 @@ func (s *inC2S) compress(ctx context.Context, elem stravaganza.Element) error { return err } // compress transport - s.tr.EnableCompression(s.cfg.CompressionLevel) + s.tr.EnableCompression(s.cfg.compressionLevel) s.flags.setCompressed() log.Infow("Compressed C2S stream", "id", s.id, "username", s.Username()) @@ -986,14 +1006,14 @@ func (s *inC2S) bindResource(ctx context.Context, iq *stravaganza.IQ) error { if rs.JID.Resource() != res { continue } - switch s.cfg.ResourceConflict { + switch s.cfg.resConflict { // replace by a server generated resourcepart - case Override: + case override: res = uuid.New().String() break // disconnect previously connected resource - case TerminateOld: + case terminateOld: se := streamerror.E(streamerror.PolicyViolation) se.ApplicationElement = stravaganza.NewBuilder("resource-conflict"). WithAttribute(stravaganza.Namespace, "urn:xmpp:errors"). @@ -1004,7 +1024,7 @@ func (s *inC2S) bindResource(ctx context.Context, iq *stravaganza.IQ) error { break // disallow resource binding - case Disallow: + case disallow: return s.sendElement(ctx, stanzaerror.E(stanzaerror.Conflict, iq).Element()) } break @@ -1213,7 +1233,7 @@ func (s *inC2S) runHook(ctx context.Context, hookName string, inf *hook.C2SStrea } func (s *inC2S) requestContext() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.Background(), s.cfg.RequestTimeout) + return context.WithTimeout(context.Background(), s.cfg.reqTimeout) } var currentID uint64 diff --git a/pkg/c2s/in_test.go b/pkg/c2s/in_test.go index ce864f91f..bd761f32f 100644 --- a/pkg/c2s/in_test.go +++ b/pkg/c2s/in_test.go @@ -720,12 +720,12 @@ func TestInC2S_HandleSessionElement(t *testing.T) { userJID, _ := jid.NewWithString("ortuman@localhost", true) stm := &inC2S{ - cfg: Config{ - KeepAliveTimeout: time.Minute, - RequestTimeout: time.Minute, - MaxStanzaSize: 8192, - CompressionLevel: compress.DefaultCompression, - ResourceConflict: Disallow, + cfg: inCfg{ + keepAliveTimeout: time.Minute, + reqTimeout: time.Minute, + maxStanzaSize: 8192, + compressionLevel: compress.DefaultCompression, + resConflict: disallow, }, state: tt.state, flags: flags{flg: tt.flags}, @@ -817,10 +817,10 @@ func TestInC2S_HandleSessionError(t *testing.T) { } stm := &inC2S{ - cfg: Config{ - KeepAliveTimeout: time.Minute, - RequestTimeout: time.Minute, - MaxStanzaSize: 8192, + cfg: inCfg{ + keepAliveTimeout: time.Minute, + reqTimeout: time.Minute, + maxStanzaSize: 8192, }, state: tt.state, rq: runqueue.New(tt.name), diff --git a/pkg/c2s/router_test.go b/pkg/c2s/router_test.go index 50769adb6..83033f21d 100644 --- a/pkg/c2s/router_test.go +++ b/pkg/c2s/router_test.go @@ -18,12 +18,11 @@ import ( "context" "testing" - c2smodel "github.com/ortuman/jackal/pkg/model/c2s" - "github.com/jackal-xmpp/stravaganza/v2" "github.com/jackal-xmpp/stravaganza/v2/jid" "github.com/ortuman/jackal/pkg/cluster/instance" "github.com/ortuman/jackal/pkg/hook" + c2smodel "github.com/ortuman/jackal/pkg/model/c2s" "github.com/ortuman/jackal/pkg/router" "github.com/stretchr/testify/suite" ) diff --git a/pkg/c2s/socket_listener.go b/pkg/c2s/socket_listener.go index 76a86c52e..291567745 100644 --- a/pkg/c2s/socket_listener.go +++ b/pkg/c2s/socket_listener.go @@ -34,6 +34,7 @@ import ( "github.com/ortuman/jackal/pkg/shaper" "github.com/ortuman/jackal/pkg/storage/repository" "github.com/ortuman/jackal/pkg/transport" + "github.com/ortuman/jackal/pkg/transport/compress" ) const ( @@ -45,33 +46,73 @@ const ( scramSHA3512Mechanism = "scram_sha3_512" ) +var cmpLevelMap = map[string]compress.Level{ + "default": compress.DefaultCompression, + "best": compress.BestCompression, + "speed": compress.SpeedCompression, +} + +var resConflictMap = map[string]resourceConflict{ + "override": override, + "disallow": disallow, + "terminate_old": terminateOld, +} + // SocketListener represents a C2S socket listener type. type SocketListener struct { - addr string - cfg Config - saslMechanisms []string - extAuth *auth.External - hosts *host.Hosts - router router.Router - comps *component.Components - mods *module.Modules - resMng *ResourceManager - rep repository.Repository - peppers *pepper.Keys - shapers shaper.Shapers - hk *hook.Hooks - connHandlerFn func(conn net.Conn) + cfg ListenerConfig + extAuth *auth.External + hosts *host.Hosts + router router.Router + comps *component.Components + mods *module.Modules + resMng *ResourceManager + rep repository.Repository + peppers *pepper.Keys + shapers shaper.Shapers + hk *hook.Hooks + + tlsCfg *tls.Config + connHandlerFn func(conn net.Conn) ln net.Listener active uint32 } -// NewSocketListener returns a new C2S socket listener. -func NewSocketListener( - bindAddr string, - port int, - saslMechanisms []string, - extAuth *auth.External, +// NewListeners creates and initializes a set of C2S listeners based of cfg configuration. +func NewListeners( + cfg ListenersConfig, + hosts *host.Hosts, + router router.Router, + comps *component.Components, + mods *module.Modules, + resMng *ResourceManager, + rep repository.Repository, + peppers *pepper.Keys, + shapers shaper.Shapers, + hk *hook.Hooks, +) []*SocketListener { + var listeners []*SocketListener + for _, lnCfg := range cfg { + ln := newSocketListener( + lnCfg, + hosts, + router, + comps, + mods, + resMng, + rep, + peppers, + shapers, + hk, + ) + listeners = append(listeners, ln) + } + return listeners +} + +func newSocketListener( + cfg ListenerConfig, hosts *host.Hosts, router router.Router, comps *component.Components, @@ -81,28 +122,32 @@ func NewSocketListener( peppers *pepper.Keys, shapers shaper.Shapers, hk *hook.Hooks, - cfg Config, ) *SocketListener { + var extAuth *auth.External + if len(cfg.SASL.External.Address) > 0 { + extAuth = auth.NewExternal( + cfg.SASL.External.Address, + cfg.SASL.External.IsSecure, + ) + } ln := &SocketListener{ - addr: getAddress(bindAddr, port), - saslMechanisms: saslMechanisms, - extAuth: extAuth, - cfg: cfg, - hosts: hosts, - router: router, - comps: comps, - mods: mods, - resMng: resMng, - rep: rep, - peppers: peppers, - shapers: shapers, - hk: hk, + cfg: cfg, + extAuth: extAuth, + hosts: hosts, + router: router, + comps: comps, + mods: mods, + resMng: resMng, + rep: rep, + peppers: peppers, + shapers: shapers, + hk: hk, } ln.connHandlerFn = ln.handleConn return ln } -// Start starts listening on the TCP network address bindAddr to handle incoming C2S connections. +// Start starts listening on a TCP network address to handle incoming C2S connections. func (l *SocketListener) Start(ctx context.Context) error { if l.extAuth != nil { // dial external authenticator @@ -116,12 +161,16 @@ func (l *SocketListener) Start(ctx context.Context) error { lc := net.ListenConfig{ KeepAlive: listenKeepAlive, } - ln, err = lc.Listen(ctx, "tcp", l.addr) + ln, err = lc.Listen(ctx, "tcp", l.getAddress()) if err != nil { return err } - if l.cfg.UseTLS { - ln = tls.NewListener(ln, l.cfg.TLSConfig) + if l.cfg.DirectTLS { + l.tlsCfg = &tls.Config{ + Certificates: l.hosts.Certificates(), + MinVersion: tls.VersionTLS12, + } + ln = tls.NewListener(ln, l.tlsCfg) } l.ln = ln l.active = 1 @@ -133,7 +182,7 @@ func (l *SocketListener) Start(ctx context.Context) error { continue } log.Infow( - fmt.Sprintf("Received C2S incoming connection at %s", l.addr), + fmt.Sprintf("Received C2S incoming connection at %s", l.getAddress()), "remote_address", conn.RemoteAddr().String(), ) @@ -141,8 +190,8 @@ func (l *SocketListener) Start(ctx context.Context) error { } }() log.Infow( - fmt.Sprintf("Accepting C2S socket connections at %s", l.addr), - "direct_tls", l.cfg.UseTLS, + fmt.Sprintf("Accepting C2S socket connections at %s", l.getAddress()), + "direct_tls", l.cfg.DirectTLS, ) return nil } @@ -159,13 +208,14 @@ func (l *SocketListener) Stop(ctx context.Context) error { return err } } - log.Infof("Stopped C2S listener at %s", l.addr) + log.Infof("Stopped C2S listener at %s", l.getAddress()) return nil } func (l *SocketListener) handleConn(conn net.Conn) { tr := transport.NewSocketTransport(conn) stm, err := newInC2S( + l.getInConfig(), tr, l.getAuthenticators(tr), l.hosts, @@ -175,7 +225,6 @@ func (l *SocketListener) handleConn(conn net.Conn) { l.resMng, l.shapers, l.hk, - l.cfg, ) if err != nil { log.Warnf("Failed to initialize C2S stream: %v", err) @@ -193,7 +242,7 @@ func (l *SocketListener) getAuthenticators(tr transport.Transport) []auth.Authen if l.extAuth != nil { res = append(res, l.extAuth) } - for _, mechanism := range l.saslMechanisms { + for _, mechanism := range l.cfg.SASL.Mechanisms { switch mechanism { case scramSHA1Mechanism: res = append(res, auth.NewScram(tr, auth.ScramSHA1, false, l.rep, l.peppers)) @@ -217,6 +266,20 @@ func (l *SocketListener) getAuthenticators(tr transport.Transport) []auth.Authen return res } -func getAddress(bindAddr string, port int) string { - return bindAddr + ":" + strconv.Itoa(port) +func (l *SocketListener) getInConfig() inCfg { + return inCfg{ + connectTimeout: l.cfg.ConnectTimeout, + authenticateTimeout: l.cfg.AuthenticateTimeout, + keepAliveTimeout: l.cfg.KeepAliveTimeout, + reqTimeout: l.cfg.RequestTimeout, + maxStanzaSize: l.cfg.MaxStanzaSize, + compressionLevel: cmpLevelMap[l.cfg.CompressionLevel], + resConflict: resConflictMap[l.cfg.ResourceConflict], + useTLS: l.cfg.DirectTLS, + tlsConfig: l.tlsCfg, + } +} + +func (l *SocketListener) getAddress() string { + return l.cfg.BindAddr + ":" + strconv.Itoa(l.cfg.Port) } diff --git a/pkg/c2s/socket_listener_test.go b/pkg/c2s/socket_listener_test.go index 491c149f6..47dc4c26c 100644 --- a/pkg/c2s/socket_listener_test.go +++ b/pkg/c2s/socket_listener_test.go @@ -29,7 +29,7 @@ func TestSocketListener_Listen(t *testing.T) { var handledConn uint32 s := &SocketListener{ - addr: ":51124", + cfg: ListenerConfig{BindAddr: "", Port: 51124}, connHandlerFn: func(_ net.Conn) { atomic.StoreUint32(&handledConn, 1) }, diff --git a/pkg/cluster/server/server.go b/pkg/cluster/server/server.go index e98f1fea5..bb62bc47a 100644 --- a/pkg/cluster/server/server.go +++ b/pkg/cluster/server/server.go @@ -20,8 +20,9 @@ import ( "strconv" "sync/atomic" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/ortuman/jackal/pkg/c2s" + + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" clusterpb "github.com/ortuman/jackal/pkg/cluster/pb" "github.com/ortuman/jackal/pkg/component" "github.com/ortuman/jackal/pkg/log" diff --git a/pkg/component/xep0114/config.go b/pkg/component/xep0114/config.go new file mode 100644 index 000000000..b7ad7c0cb --- /dev/null +++ b/pkg/component/xep0114/config.go @@ -0,0 +1,45 @@ +// Copyright 2021 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xep0114 + +import "time" + +// ListenersConfig defines a set of component listener configurations. +type ListenersConfig []ListenerConfig + +// ListenerConfig defines component connection configuration. +type ListenerConfig struct { + // BindAddr defines listener incoming connections address. + BindAddr string `fig:"bind_addr"` + + // Port defines listener incoming connections port. + Port int `fig:"port" default:"5275"` + + // ConnectTimeout defines connection timeout. + ConnectTimeout time.Duration + + // KeepAliveTimeout defines the maximum amount of time that an inactive connection + // would be considered alive. + KeepAliveTimeout time.Duration + + // RequestTimeout defines component stream request timeout. + RequestTimeout time.Duration + + // MaxStanzaSize is the maximum size a listener incoming stanza may have. + MaxStanzaSize int + + // Secret is the external component's shared secret. + Secret string +} diff --git a/pkg/component/xep0114/in.go b/pkg/component/xep0114/in.go index 82da5cade..a89f88bf8 100644 --- a/pkg/component/xep0114/in.go +++ b/pkg/component/xep0114/in.go @@ -56,9 +56,17 @@ const ( var disconnectTimeout = time.Second * 5 +type inConfig struct { + connectTimeout time.Duration + keepAliveTimeout time.Duration + reqTimeout time.Duration + maxStanzaSize int + secret string +} + type inComponent struct { id inComponentID - cfg Config + cfg inConfig tr transport.Transport shapers shaper.Shapers session session @@ -88,7 +96,7 @@ func newInComponent( router router.Router, shapers shaper.Shapers, hk *hook.Hooks, - cfg Config, + cfg inConfig, ) (*inComponent, error) { // set default rate limiter rLim := shapers.DefaultS2S().RateLimiter() @@ -104,7 +112,7 @@ func newInComponent( tr, hosts, xmppsession.Config{ - MaxStanzaSize: cfg.MaxStanzaSize, + MaxStanzaSize: cfg.maxStanzaSize, }, ) // init stream @@ -173,7 +181,7 @@ func (s *inComponent) done() <-chan struct{} { func (s *inComponent) readLoop() { s.restartSession() - tm := time.AfterFunc(s.cfg.ConnectTimeout, s.connTimeout) // schedule connect timeout + tm := time.AfterFunc(s.cfg.connectTimeout, s.connTimeout) // schedule connect timeout elem, sErr := s.session.Receive() tm.Stop() @@ -187,7 +195,7 @@ func (s *inComponent) readLoop() { s.handleSessionResult(elem, sErr) doRead: - tm := time.AfterFunc(s.cfg.KeepAliveTimeout, s.connTimeout) // schedule read timeout + tm := time.AfterFunc(s.cfg.keepAliveTimeout, s.connTimeout) // schedule read timeout elem, sErr = s.session.Receive() tm.Stop() } @@ -282,7 +290,7 @@ func (s *inComponent) handleHandshaking(ctx context.Context, elem stravaganza.El } // compute handshake h := sha1.New() - h.Write([]byte(s.session.StreamID() + s.cfg.Secret)) + h.Write([]byte(s.session.StreamID() + s.cfg.secret)) hs := hex.EncodeToString(h.Sum(nil)) if elem.Text() != hs { @@ -453,7 +461,7 @@ func (s *inComponent) runHook(ctx context.Context, hookName string, inf *hook.Ex } func (s *inComponent) requestContext() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.Background(), s.cfg.RequestTimeout) + return context.WithTimeout(context.Background(), s.cfg.reqTimeout) } var currentID uint64 diff --git a/pkg/component/xep0114/in_test.go b/pkg/component/xep0114/in_test.go index 8eb1df773..98926c636 100644 --- a/pkg/component/xep0114/in_test.go +++ b/pkg/component/xep0114/in_test.go @@ -246,11 +246,11 @@ func TestInComponent_HandleSessionElement(t *testing.T) { } stm := &inComponent{ - cfg: Config{ - KeepAliveTimeout: time.Minute, - RequestTimeout: time.Minute, - MaxStanzaSize: 8192, - Secret: "a-secret-1", + cfg: inConfig{ + keepAliveTimeout: time.Minute, + reqTimeout: time.Minute, + maxStanzaSize: 8192, + secret: "a-secret-1", }, state: uint32(tt.state), rq: runqueue.New(tt.name), @@ -342,10 +342,10 @@ func TestInComponent_HandleSessionError(t *testing.T) { } stm := &inComponent{ - cfg: Config{ - KeepAliveTimeout: time.Minute, - RequestTimeout: time.Minute, - MaxStanzaSize: 8192, + cfg: inConfig{ + keepAliveTimeout: time.Minute, + reqTimeout: time.Minute, + maxStanzaSize: 8192, }, state: uint32(tt.state), rq: runqueue.New(tt.name), diff --git a/pkg/component/xep0114/socket_listener.go b/pkg/component/xep0114/socket_listener.go index eb3247c11..da0555c26 100644 --- a/pkg/component/xep0114/socket_listener.go +++ b/pkg/component/xep0114/socket_listener.go @@ -36,29 +36,9 @@ const ( listenKeepAlive = time.Second * 15 ) -// Config defines component connection configuration. -type Config struct { - // ConnectTimeout defines connection timeout. - ConnectTimeout time.Duration - - // KeepAliveTimeout defines the maximum amount of time that an inactive connection - // would be considered alive. - KeepAliveTimeout time.Duration - - // RequestTimeout defines component stream request timeout. - RequestTimeout time.Duration - - // MaxStanzaSize is the maximum size a listener incoming stanza may have. - MaxStanzaSize int - - // Secret is the external component shared secret. - Secret string -} - // SocketListener represents a component socket listener type. type SocketListener struct { - addr string - cfg Config + cfg ListenerConfig hosts *host.Hosts comps *component.Components router router.Router @@ -72,20 +52,42 @@ type SocketListener struct { active uint32 } -// NewSocketListener returns a new external component socket listener. -func NewSocketListener( - bindAddr string, - port int, +// NewListeners creates and initializes a set of component listeners based of cfg configuration. +func NewListeners( + cfg ListenersConfig, + hosts *host.Hosts, + comps *component.Components, + extCompMng *extcomponentmanager.Manager, + router router.Router, + shapers shaper.Shapers, + hk *hook.Hooks, +) []*SocketListener { + var listeners []*SocketListener + for _, lnCfg := range cfg { + ln := newSocketListener( + lnCfg, + hosts, + comps, + extCompMng, + router, + shapers, + hk, + ) + listeners = append(listeners, ln) + } + return listeners +} + +func newSocketListener( + cfg ListenerConfig, hosts *host.Hosts, comps *component.Components, extCompMng *extcomponentmanager.Manager, router router.Router, shapers shaper.Shapers, hk *hook.Hooks, - cfg Config, ) *SocketListener { ln := &SocketListener{ - addr: getAddress(bindAddr, port), hosts: hosts, comps: comps, router: router, @@ -106,7 +108,7 @@ func (l *SocketListener) Start(ctx context.Context) error { lc := net.ListenConfig{ KeepAlive: listenKeepAlive, } - ln, err := lc.Listen(ctx, "tcp", l.addr) + ln, err := lc.Listen(ctx, "tcp", l.getAddress()) if err != nil { return err } @@ -120,13 +122,13 @@ func (l *SocketListener) Start(ctx context.Context) error { continue } log.Infow( - fmt.Sprintf("Received component incoming connection at %s", l.addr), + fmt.Sprintf("Received component incoming connection at %s", l.getAddress()), "remote_address", conn.RemoteAddr().String(), ) go l.connHandlerFn(conn) } }() - log.Infof("Accepting external component connections at %s", l.addr) + log.Infof("Accepting external component connections at %s", l.getAddress()) return nil } @@ -138,7 +140,7 @@ func (l *SocketListener) Stop(ctx context.Context) error { } l.stmHub.stop(ctx) - log.Infof("Stopped external component listener at %s", l.addr) + log.Infof("Stopped external component listener at %s", l.getAddress()) return nil } @@ -153,7 +155,13 @@ func (l *SocketListener) handleConn(conn net.Conn) { l.router, l.shapers, l.hk, - l.cfg, + inConfig{ + connectTimeout: l.cfg.ConnectTimeout, + keepAliveTimeout: l.cfg.KeepAliveTimeout, + reqTimeout: l.cfg.RequestTimeout, + maxStanzaSize: l.cfg.MaxStanzaSize, + secret: l.cfg.Secret, + }, ) if err != nil { log.Warnf("Failed to initialize component stream: %v", err) @@ -166,6 +174,6 @@ func (l *SocketListener) handleConn(conn net.Conn) { } } -func getAddress(bindAddr string, port int) string { - return bindAddr + ":" + strconv.Itoa(port) +func (l *SocketListener) getAddress() string { + return l.cfg.BindAddr + ":" + strconv.Itoa(l.cfg.Port) } diff --git a/pkg/component/xep0114/socket_listener_test.go b/pkg/component/xep0114/socket_listener_test.go index f0e124c99..a6344dd87 100644 --- a/pkg/component/xep0114/socket_listener_test.go +++ b/pkg/component/xep0114/socket_listener_test.go @@ -29,7 +29,7 @@ func TestSocketListener_Listen(t *testing.T) { var handledConn uint32 s := &SocketListener{ - addr: ":51123", + cfg: ListenerConfig{Port: 51126}, stmHub: newInHub(), connHandlerFn: func(_ net.Conn) { atomic.StoreUint32(&handledConn, 1) @@ -40,7 +40,7 @@ func TestSocketListener_Listen(t *testing.T) { err := s.Start(context.Background()) require.Nil(t, err) - _, err = net.Dial("tcp", ":51123") + _, err = net.Dial("tcp", ":51126") require.Nil(t, err) time.Sleep(time.Second) // wait to accept diff --git a/pkg/host/hosts.go b/pkg/host/hosts.go index 00ead6a39..e3000f628 100644 --- a/pkg/host/hosts.go +++ b/pkg/host/hosts.go @@ -31,6 +31,9 @@ type Hosts struct { hosts map[string]tls.Certificate } +// Configs contains a set of host configurations. +type Configs []Config + // Config contains host configuration parameters. type Config struct { Domain string `fig:"domain"` @@ -40,12 +43,12 @@ type Config struct { } `fig:"tls"` } -// NewHost creates and initializes a Hosts instance. -func NewHost(configs []Config) (*Hosts, error) { +// NewHosts creates and initializes a Hosts instance. +func NewHosts(cfg Configs) (*Hosts, error) { hs := &Hosts{ hosts: make(map[string]tls.Certificate), } - if len(configs) == 0 { + if len(cfg) == 0 { cer, err := tlsutil.LoadCertificate("", "", defaultDomain) if err != nil { return nil, err @@ -53,7 +56,7 @@ func NewHost(configs []Config) (*Hosts, error) { hs.RegisterDefaultHost(defaultDomain, cer) return hs, nil } - for i, config := range configs { + for i, config := range cfg { cer, err := tlsutil.LoadCertificate(config.TLS.PrivateKeyFile, config.TLS.CertFile, config.Domain) if err != nil { return nil, err diff --git a/pkg/jackal/config.go b/pkg/jackal/config.go new file mode 100644 index 000000000..efccda662 --- /dev/null +++ b/pkg/jackal/config.go @@ -0,0 +1,112 @@ +// Copyright 2021 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jackal + +import ( + "path/filepath" + + "github.com/kkyr/fig" + adminserver "github.com/ortuman/jackal/pkg/admin/server" + "github.com/ortuman/jackal/pkg/auth/pepper" + "github.com/ortuman/jackal/pkg/c2s" + "github.com/ortuman/jackal/pkg/cluster/etcd" + clusterserver "github.com/ortuman/jackal/pkg/cluster/server" + "github.com/ortuman/jackal/pkg/component/xep0114" + "github.com/ortuman/jackal/pkg/host" + "github.com/ortuman/jackal/pkg/module/offline" + "github.com/ortuman/jackal/pkg/module/xep0092" + "github.com/ortuman/jackal/pkg/module/xep0198" + "github.com/ortuman/jackal/pkg/module/xep0199" + "github.com/ortuman/jackal/pkg/s2s" + "github.com/ortuman/jackal/pkg/shaper" + "github.com/ortuman/jackal/pkg/storage" +) + +// LoggerConfig defines logger configuration. +type LoggerConfig struct { + Level string `fig:"level" default:"debug"` + OutputPath string `fig:"output_path"` +} + +// ClusterConfig defines cluster configuration. +type ClusterConfig struct { + Etcd etcd.Config `fig:"etcd"` + Server clusterserver.Config `fig:"server"` +} + +// C2SConfig defines C2S subsystem configuration. +type C2SConfig struct { + Listeners c2s.ListenersConfig `fig:"listeners"` +} + +// S2SConfig defines S2S subsystem configuration. +type S2SConfig struct { + Listeners s2s.ListenersConfig `fig:"listeners"` + Out s2s.OutConfig `fig:"out"` +} + +// ComponentsConfig defines application components configuration. +type ComponentsConfig struct { + Listeners xep0114.ListenersConfig `fig:"listeners"` +} + +// ModulesConfig defines application modules configuration. +type ModulesConfig struct { + // Enabled specifies total set of enabled modules + Enabled []string `fig:"enabled"` + + // Offline: offline storage + Offline offline.Config `fig:"offline"` + + // XEP-0092: Software Version + Version xep0092.Config `fig:"version"` + + // XEP-0198: Stream Management + Stream xep0198.Config `fig:"stream"` + + // XEP-0199: XMPP Ping + Ping xep0199.Config `fig:"ping"` +} + +// Config defines jackal application configuration. +type Config struct { + Logger LoggerConfig `fig:"logger"` + Cluster ClusterConfig `fig:"cluster"` + + HTTPPort int `fig:"http_port" default:"6060"` + + Peppers pepper.Config `fig:"peppers"` + Admin adminserver.Config `fig:"admin"` + Storage storage.Config `fig:"storage"` + Hosts host.Configs `fig:"hosts"` + Shapers []shaper.Config `fig:"shapers"` + + C2S C2SConfig `fig:"c2s"` + S2S S2SConfig `fig:"s2s"` + Components ComponentsConfig `fig:"components"` + Modules ModulesConfig `fig:"modules"` +} + +func loadConfig(configFile string) (*Config, error) { + var cfg Config + file := filepath.Base(configFile) + dir := filepath.Dir(configFile) + + err := fig.Load(&cfg, fig.File(file), fig.Dirs(dir)) + if err != nil { + return nil, err + } + return &cfg, nil +} diff --git a/cmd/jackal/http_server.go b/pkg/jackal/http_server.go similarity index 97% rename from cmd/jackal/http_server.go rename to pkg/jackal/http_server.go index a82d43cc8..2727c2c9c 100644 --- a/cmd/jackal/http_server.go +++ b/pkg/jackal/http_server.go @@ -1,4 +1,4 @@ -// Copyright 2020 The jackal Authors +// Copyright 2021 The jackal Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package jackal import ( "context" diff --git a/cmd/jackal/app.go b/pkg/jackal/jackal.go similarity index 60% rename from cmd/jackal/app.go rename to pkg/jackal/jackal.go index 794ecd44b..9fe3c5e13 100644 --- a/cmd/jackal/app.go +++ b/pkg/jackal/jackal.go @@ -1,4 +1,4 @@ -// Copyright 2020 The jackal Authors +// Copyright 2021 The jackal Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package jackal import ( "context" @@ -28,29 +28,31 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" adminserver "github.com/ortuman/jackal/pkg/admin/server" + "github.com/ortuman/jackal/pkg/cluster/etcd" + clusterserver "github.com/ortuman/jackal/pkg/cluster/server" + "github.com/ortuman/jackal/pkg/component/xep0114" + "github.com/ortuman/jackal/pkg/log" + "github.com/ortuman/jackal/pkg/log/zap" + "github.com/ortuman/jackal/pkg/storage" + "github.com/ortuman/jackal/pkg/util/crashreporter" + "github.com/ortuman/jackal/pkg/version" + "github.com/ortuman/jackal/pkg/auth/pepper" "github.com/ortuman/jackal/pkg/c2s" clusterconnmanager "github.com/ortuman/jackal/pkg/cluster/connmanager" - "github.com/ortuman/jackal/pkg/cluster/etcd" "github.com/ortuman/jackal/pkg/cluster/kv" "github.com/ortuman/jackal/pkg/cluster/locker" "github.com/ortuman/jackal/pkg/cluster/memberlist" clusterrouter "github.com/ortuman/jackal/pkg/cluster/router" - clusterserver "github.com/ortuman/jackal/pkg/cluster/server" "github.com/ortuman/jackal/pkg/component" "github.com/ortuman/jackal/pkg/component/extcomponentmanager" "github.com/ortuman/jackal/pkg/hook" "github.com/ortuman/jackal/pkg/host" - "github.com/ortuman/jackal/pkg/log" - "github.com/ortuman/jackal/pkg/log/zap" "github.com/ortuman/jackal/pkg/module" "github.com/ortuman/jackal/pkg/router" "github.com/ortuman/jackal/pkg/s2s" "github.com/ortuman/jackal/pkg/shaper" - "github.com/ortuman/jackal/pkg/storage" "github.com/ortuman/jackal/pkg/storage/repository" - "github.com/ortuman/jackal/pkg/util/crashreporter" - "github.com/ortuman/jackal/pkg/version" ) const ( @@ -92,7 +94,8 @@ type startStopper interface { stopper } -type serverApp struct { +// Jackal is the root data structure for Jackal. +type Jackal struct { output io.Writer args []string @@ -109,10 +112,10 @@ type serverApp struct { shapers shaper.Shapers hosts *host.Hosts clusterConnMng *clusterconnmanager.Manager + localRouter *c2s.LocalRouter clusterRouter *clusterrouter.Router s2sOutProvider *s2s.OutProvider - s2sInHub *s2s.InHub router router.Router mods *module.Modules comps *component.Components @@ -124,19 +127,24 @@ type serverApp struct { waitStopCh chan os.Signal } -func run(output io.Writer, args []string) error { - // Seed the math/rand RNG from crypto/rand. - rand.Seed(time.Now().UnixNano()) - - defer crashreporter.RecoverAndReportPanic() - - a := &serverApp{ +// New makes a new Jackal. +func New(output io.Writer, args []string) *Jackal { + return &Jackal{ output: output, args: args, waitStopCh: make(chan os.Signal, 1), } +} + +// Run starts Jackal running, and blocks until a Jackal stops. +func (j *Jackal) Run() error { + // seed the math/rand RNG from crypto/rand. + rand.Seed(time.Now().UnixNano()) + + defer crashreporter.RecoverAndReportPanic() + fs := flag.NewFlagSet("jackal", flag.ExitOnError) - fs.SetOutput(a.output) + fs.SetOutput(j.output) var configFile string var showVersion, showUsage bool @@ -146,11 +154,11 @@ func run(output io.Writer, args []string) error { fs.StringVar(&configFile, "config", "config.yaml", "Configuration file path.") fs.Usage = func() { for i := range logoStr { - _, _ = fmt.Fprintf(a.output, "%s\n", logoStr[i]) + _, _ = fmt.Fprintf(j.output, "%s\n", logoStr[i]) } - _, _ = fmt.Fprintf(a.output, "%s\n", usageStr) + _, _ = fmt.Fprintf(j.output, "%s\n", usageStr) } - _ = fs.Parse(a.args[1:]) + _ = fs.Parse(j.args[1:]) // print usage if showUsage { @@ -159,7 +167,7 @@ func run(output io.Writer, args []string) error { } // print version if showVersion { - _, _ = fmt.Fprintf(a.output, "jackal version: %v\n", version.Version) + _, _ = fmt.Fprintf(j.output, "jackal version: %v\n", version.Version) return nil } // if present, override config file url with env var @@ -196,108 +204,108 @@ func run(output io.Writer, args []string) error { if err != nil { return err } - a.peppers = peppers + j.peppers = peppers // init hooks - a.hk = hook.NewHooks() + j.hk = hook.NewHooks() // init etcd - a.initLocker(cfg.Cluster.Etcd) - a.initKVStore(cfg.Cluster.Etcd) + j.initLocker(cfg.Cluster.Etcd) + j.initKVStore(cfg.Cluster.Etcd) // init cluster connection manager - a.initClusterConnManager() + j.initClusterConnManager() // init repository - if err := a.initRepository(cfg.Storage); err != nil { + if err := j.initRepository(cfg.Storage); err != nil { return err } // init C2S/S2S routers - if err := a.initHosts(cfg.Hosts); err != nil { + if err := j.initHosts(cfg.Hosts); err != nil { return err } - if err := a.initShapers(cfg.Shapers); err != nil { + if err := j.initShapers(cfg.Shapers); err != nil { return err } - a.initS2S(cfg.S2SOut) - a.initRouters() + j.initS2SOut(cfg.S2S.Out) + j.initRouters() // init components & modules - a.initComponents(cfg.Components) + j.initComponents() - if err := a.initModules(cfg.Modules); err != nil { + if err := j.initModules(cfg.Modules); err != nil { return err } // init HTTP server - a.registerStartStopper(newHTTPServer(cfg.HTTPPort)) + j.registerStartStopper(newHTTPServer(cfg.HTTPPort)) // init admin server - a.initAdminServer(cfg.Admin) + j.initAdminServer(cfg.Admin) // init cluster server - a.initClusterServer(cfg.Cluster.Server) + j.initClusterServer(cfg.Cluster.Server) // init memberlist - a.initMemberList(cfg.Cluster.Server.Port) + j.initMemberList(cfg.Cluster.Server.Port) // init C2S/S2S listeners - if err := a.initListeners(cfg.Listeners); err != nil { + if err := j.initListeners(cfg.C2S.Listeners, cfg.S2S.Listeners, cfg.Components.Listeners); err != nil { return err } - if err := a.bootstrap(); err != nil { + if err := j.bootstrap(); err != nil { return err } // ...wait for stop signal to shut down - sig := a.waitForStopSignal() + sig := j.waitForStopSignal() log.Infof("Received %s signal... shutting down...", sig.String()) - return a.shutdown() + return j.shutdown() } -func (a *serverApp) initLocker(cfg etcd.Config) { - a.locker = etcd.NewLocker(cfg) - a.registerStartStopper(a.locker) +func (j *Jackal) initLocker(cfg etcd.Config) { + j.locker = etcd.NewLocker(cfg) + j.registerStartStopper(j.locker) } -func (a *serverApp) initKVStore(cfg etcd.Config) { +func (j *Jackal) initKVStore(cfg etcd.Config) { etcdKV := etcd.NewKV(cfg) - a.kv = kv.NewMeasured(etcdKV) - a.registerStartStopper(a.kv) + j.kv = kv.NewMeasured(etcdKV) + j.registerStartStopper(j.kv) } -func (a *serverApp) initClusterConnManager() { - a.clusterConnMng = clusterconnmanager.NewManager(a.hk) - a.registerStartStopper(a.clusterConnMng) +func (j *Jackal) initClusterConnManager() { + j.clusterConnMng = clusterconnmanager.NewManager(j.hk) + j.registerStartStopper(j.clusterConnMng) } -func (a *serverApp) initRepository(cfg storage.Config) error { +func (j *Jackal) initRepository(cfg storage.Config) error { rep, err := storage.New(cfg) if err != nil { return err } - a.rep = rep - a.registerStartStopper(a.rep) + j.rep = rep + j.registerStartStopper(j.rep) return nil } -func (a *serverApp) initHosts(configs []host.Config) error { - h, err := host.NewHost(configs) +func (j *Jackal) initHosts(configs host.Configs) error { + h, err := host.NewHosts(configs) if err != nil { return err } - a.hosts = h + j.hosts = h return nil } -func (a *serverApp) initShapers(configs []shaper.Config) error { - a.shapers = make(shaper.Shapers, 0) +func (j *Jackal) initShapers(configs []shaper.Config) error { + j.shapers = make(shaper.Shapers, 0) for _, cfg := range configs { shp, err := shaper.New(cfg) if err != nil { return err } - a.shapers = append(a.shapers, shp) + j.shapers = append(j.shapers, shp) log.Infow(fmt.Sprintf("Registered '%s' shaper configuration", cfg.Name), "name", cfg.Name, @@ -308,66 +316,104 @@ func (a *serverApp) initShapers(configs []shaper.Config) error { return nil } -func (a *serverApp) initMemberList(clusterPort int) { - a.memberList = memberlist.New(a.kv, clusterPort, a.hk) - a.registerStartStopper(a.memberList) +func (j *Jackal) initMemberList(clusterPort int) { + j.memberList = memberlist.New(j.kv, clusterPort, j.hk) + j.registerStartStopper(j.memberList) return } -func (a *serverApp) initListeners(configs []listenerConfig) error { - for _, cfg := range configs { - lnFn, ok := lnFns[cfg.Type] - if !ok { - return fmt.Errorf("main: unrecognized listener type: %s", cfg.Type) +func (j *Jackal) initListeners( + c2sListenersCfg c2s.ListenersConfig, + s2sListenersCfg s2s.ListenersConfig, + cmpListenersCfg xep0114.ListenersConfig, +) error { + // c2s listeners + c2sListeners := c2s.NewListeners( + c2sListenersCfg, + j.hosts, + j.router, + j.comps, + j.mods, + j.resMng, + j.rep, + j.peppers, + j.shapers, + j.hk, + ) + for _, ln := range c2sListeners { + j.registerStartStopper(ln) + } + + // s2s listeners + if len(s2sListenersCfg) > 0 { + s2sInHub := s2s.NewInHub() + j.registerStartStopper(s2sInHub) + + s2sListeners := s2s.NewListeners( + s2sListenersCfg, + j.hosts, + j.router, + j.comps, + j.mods, + j.s2sOutProvider, + s2sInHub, + j.kv, + j.shapers, + j.hk, + ) + for _, ln := range s2sListeners { + j.registerStartStopper(ln) } - ln := lnFn(a, cfg) - a.registerStartStopper(ln) + } + + // external component listeners + cmpListeners := xep0114.NewListeners( + cmpListenersCfg, + j.hosts, + j.comps, + j.extCompMng, + j.router, + j.shapers, + j.hk, + ) + for _, ln := range cmpListeners { + j.registerStartStopper(ln) } return nil } -func (a *serverApp) initS2S(cfg s2sOutConfig) { - a.s2sOutProvider = s2s.NewOutProvider(a.hosts, a.kv, a.shapers, a.hk, s2s.Config{ - DialTimeout: cfg.DialTimeout, - DialbackSecret: cfg.DialbackSecret, - ConnectTimeout: cfg.ConnectTimeout, - KeepAliveTimeout: cfg.KeepAliveTimeout, - RequestTimeout: cfg.RequestTimeout, - MaxStanzaSize: cfg.MaxStanzaSize, - }) - a.s2sInHub = s2s.NewInHub() - - a.registerStartStopper(a.s2sOutProvider) - a.registerStartStopper(a.s2sInHub) +func (j *Jackal) initS2SOut(cfg s2s.OutConfig) { + j.s2sOutProvider = s2s.NewOutProvider(cfg, j.hosts, j.kv, j.shapers, j.hk) + j.registerStartStopper(j.s2sOutProvider) } -func (a *serverApp) initRouters() { +func (j *Jackal) initRouters() { // init shared resource hub - a.resMng = c2s.NewResourceManager(a.kv) + j.resMng = c2s.NewResourceManager(j.kv) // init C2S router - a.localRouter = c2s.NewLocalRouter(a.hosts) - a.clusterRouter = clusterrouter.New(a.clusterConnMng) + j.localRouter = c2s.NewLocalRouter(j.hosts) + j.clusterRouter = clusterrouter.New(j.clusterConnMng) - c2sRouter := c2s.NewRouter(a.localRouter, a.clusterRouter, a.resMng, a.rep, a.hk) - s2sRouter := s2s.NewRouter(a.s2sOutProvider) + c2sRouter := c2s.NewRouter(j.localRouter, j.clusterRouter, j.resMng, j.rep, j.hk) + s2sRouter := s2s.NewRouter(j.s2sOutProvider) // init global router - a.router = router.New(a.hosts, c2sRouter, s2sRouter) + j.router = router.New(j.hosts, c2sRouter, s2sRouter) - a.registerStartStopper(a.router) + j.registerStartStopper(j.router) return } -func (a *serverApp) initComponents(_ componentsConfig) { - a.comps = component.NewComponents(nil, a.hk) - a.extCompMng = extcomponentmanager.New(a.kv, a.clusterConnMng, a.comps) +func (j *Jackal) initComponents() { + j.comps = component.NewComponents(nil, j.hk) + j.extCompMng = extcomponentmanager.New(j.kv, j.clusterConnMng, j.comps) - a.registerStartStopper(a.comps) - a.registerStartStopper(a.extCompMng) + j.registerStartStopper(j.comps) + j.registerStartStopper(j.extCompMng) } -func (a *serverApp) initModules(cfg modulesConfig) error { +func (j *Jackal) initModules(cfg ModulesConfig) error { var mods []module.Module // enabled modules @@ -380,33 +426,33 @@ func (a *serverApp) initModules(cfg modulesConfig) error { if !ok { return fmt.Errorf("main: unrecognized module name: %s", mName) } - mods = append(mods, fn(a, cfg)) + mods = append(mods, fn(j, &cfg)) } - a.mods = module.NewModules(mods, a.hosts, a.router, a.hk) - a.registerStartStopper(a.mods) + j.mods = module.NewModules(mods, j.hosts, j.router, j.hk) + j.registerStartStopper(j.mods) return nil } -func (a *serverApp) initAdminServer(cfg adminserver.Config) { - adminSrv := adminserver.New(cfg, a.rep, a.peppers, a.hk) - a.registerStartStopper(adminSrv) +func (j *Jackal) initAdminServer(cfg adminserver.Config) { + adminSrv := adminserver.New(cfg, j.rep, j.peppers, j.hk) + j.registerStartStopper(adminSrv) } -func (a *serverApp) initClusterServer(cfg clusterserver.Config) { - clusterSrv := clusterserver.New(cfg, a.localRouter, a.comps) - a.registerStartStopper(clusterSrv) +func (j *Jackal) initClusterServer(cfg clusterserver.Config) { + clusterSrv := clusterserver.New(cfg, j.localRouter, j.comps) + j.registerStartStopper(clusterSrv) return } -func (a *serverApp) registerStartStopper(ss startStopper) { +func (j *Jackal) registerStartStopper(ss startStopper) { if ss == nil { return } - a.starters = append(a.starters, ss) - a.stoppers = append([]stopper{ss}, a.stoppers...) + j.starters = append(j.starters, ss) + j.stoppers = append([]stopper{ss}, j.stoppers...) } -func (a *serverApp) bootstrap() error { +func (j *Jackal) bootstrap() error { // spin up all service subsystems ctx, cancel := context.WithTimeout(context.Background(), defaultBootstrapTimeout) defer cancel() @@ -414,7 +460,7 @@ func (a *serverApp) bootstrap() error { errCh := make(chan error, 1) go func() { // invoke all registered starters... - for _, s := range a.starters { + for _, s := range j.starters { if err := s.Start(ctx); err != nil { errCh <- err return @@ -430,7 +476,7 @@ func (a *serverApp) bootstrap() error { } } -func (a *serverApp) shutdown() error { +func (j *Jackal) shutdown() error { // wait until shutdown has been completed ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) defer cancel() @@ -438,7 +484,7 @@ func (a *serverApp) shutdown() error { errCh := make(chan error, 1) go func() { // invoke all registered stoppers... - for _, st := range a.stoppers { + for _, st := range j.stoppers { if err := st.Stop(ctx); err != nil { errCh <- err return @@ -455,9 +501,9 @@ func (a *serverApp) shutdown() error { } } -func (a *serverApp) waitForStopSignal() os.Signal { - signal.Notify(a.waitStopCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) - return <-a.waitStopCh +func (j *Jackal) waitForStopSignal() os.Signal { + signal.Notify(j.waitStopCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) + return <-j.waitStopCh } func setRLimit() error { diff --git a/cmd/jackal/modules.go b/pkg/jackal/modules.go similarity index 59% rename from cmd/jackal/modules.go rename to pkg/jackal/modules.go index 3e02fd46f..10c9eb46f 100644 --- a/cmd/jackal/modules.go +++ b/pkg/jackal/modules.go @@ -1,4 +1,4 @@ -// Copyright 2020 The jackal Authors +// Copyright 2021 The jackal Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package jackal import ( "github.com/ortuman/jackal/pkg/module" @@ -46,70 +46,70 @@ var defaultModules = []string{ xep0280.ModuleName, } -var modFns = map[string]func(a *serverApp, cfg modulesConfig) module.Module{ +var modFns = map[string]func(a *Jackal, cfg *ModulesConfig) module.Module{ // Roster // (https://xmpp.org/rfcs/rfc6121.html#roster) - roster.ModuleName: func(a *serverApp, _ modulesConfig) module.Module { - return roster.New(a.router, a.hosts, a.resMng, a.rep, a.hk) + roster.ModuleName: func(j *Jackal, _ *ModulesConfig) module.Module { + return roster.New(j.router, j.hosts, j.resMng, j.rep, j.hk) }, // Offline // (https://xmpp.org/extensions/xep-0160.html) - offline.ModuleName: func(a *serverApp, cfg modulesConfig) module.Module { - return offline.New(cfg.Offline, a.router, a.hosts, a.resMng, a.rep, a.locker, a.hk) + offline.ModuleName: func(j *Jackal, cfg *ModulesConfig) module.Module { + return offline.New(cfg.Offline, j.router, j.hosts, j.resMng, j.rep, j.locker, j.hk) }, // XEP-0012: Last Activity // (https://xmpp.org/extensions/xep-0012.html) - xep0012.ModuleName: func(a *serverApp, _ modulesConfig) module.Module { - return xep0012.New(a.router, a.hosts, a.resMng, a.rep, a.hk) + xep0012.ModuleName: func(j *Jackal, _ *ModulesConfig) module.Module { + return xep0012.New(j.router, j.hosts, j.resMng, j.rep, j.hk) }, // XEP-0030: Service Discovery // (https://xmpp.org/extensions/xep-0030.html) - xep0030.ModuleName: func(a *serverApp, _ modulesConfig) module.Module { - return xep0030.New(a.router, a.comps, a.rep, a.resMng, a.hk) + xep0030.ModuleName: func(j *Jackal, _ *ModulesConfig) module.Module { + return xep0030.New(j.router, j.comps, j.rep, j.resMng, j.hk) }, // XEP-0049: Private XML Storage // (https://xmpp.org/extensions/xep-0049.html) - xep0049.ModuleName: func(a *serverApp, _ modulesConfig) module.Module { - return xep0049.New(a.router, a.rep, a.hk) + xep0049.ModuleName: func(j *Jackal, _ *ModulesConfig) module.Module { + return xep0049.New(j.router, j.rep, j.hk) }, // XEP-0054: vcard-temp // (https://xmpp.org/extensions/xep-0054.html) - xep0054.ModuleName: func(a *serverApp, _ modulesConfig) module.Module { - return xep0054.New(a.router, a.rep, a.hk) + xep0054.ModuleName: func(j *Jackal, _ *ModulesConfig) module.Module { + return xep0054.New(j.router, j.rep, j.hk) }, // XEP-0092: Software Version // (https://xmpp.org/extensions/xep-0092.html) - xep0092.ModuleName: func(a *serverApp, cfg modulesConfig) module.Module { - return xep0092.New(cfg.Version, a.router) + xep0092.ModuleName: func(j *Jackal, cfg *ModulesConfig) module.Module { + return xep0092.New(cfg.Version, j.router) }, // XEP-0115: Entity Capabilities // (https://xmpp.org/extensions/xep-0115.html) - xep0115.ModuleName: func(a *serverApp, _ modulesConfig) module.Module { - return xep0115.New(a.router, a.rep, a.hk) + xep0115.ModuleName: func(j *Jackal, _ *ModulesConfig) module.Module { + return xep0115.New(j.router, j.rep, j.hk) }, // XEP-0191: Blocking Command // (https://xmpp.org/extensions/xep-0191.html) - xep0191.ModuleName: func(a *serverApp, _ modulesConfig) module.Module { - return xep0191.New(a.router, a.hosts, a.resMng, a.rep, a.hk) + xep0191.ModuleName: func(j *Jackal, _ *ModulesConfig) module.Module { + return xep0191.New(j.router, j.hosts, j.resMng, j.rep, j.hk) }, // XEP-0198: Stream Management // (https://xmpp.org/extensions/xep-0198.html) - xep0198.ModuleName: func(a *serverApp, cfg modulesConfig) module.Module { - return xep0198.New(cfg.Stream, a.router, a.hosts, a.resMng, a.hk) + xep0198.ModuleName: func(j *Jackal, cfg *ModulesConfig) module.Module { + return xep0198.New(cfg.Stream, j.router, j.hosts, j.resMng, j.hk) }, // XEP-0199: XMPP Ping // (https://xmpp.org/extensions/xep-0199.html) - xep0199.ModuleName: func(a *serverApp, cfg modulesConfig) module.Module { - return xep0199.New(cfg.Ping, a.router, a.hk) + xep0199.ModuleName: func(j *Jackal, cfg *ModulesConfig) module.Module { + return xep0199.New(cfg.Ping, j.router, j.hk) }, // XEP-0202: Entity Time // (https://xmpp.org/extensions/xep-0202.html) - xep0202.ModuleName: func(a *serverApp, _ modulesConfig) module.Module { - return xep0202.New(a.router) + xep0202.ModuleName: func(j *Jackal, _ *ModulesConfig) module.Module { + return xep0202.New(j.router) }, // XEP-0280: Message Carbons // (https://xmpp.org/extensions/xep-0280.html) - xep0280.ModuleName: func(a *serverApp, _ modulesConfig) module.Module { - return xep0280.New(a.router, a.hosts, a.resMng, a.hk) + xep0280.ModuleName: func(j *Jackal, _ *ModulesConfig) module.Module { + return xep0280.New(j.router, j.hosts, j.resMng, j.hk) }, } diff --git a/pkg/module/roster/roster.go b/pkg/module/roster/roster.go index f374bc6ad..1c5536744 100644 --- a/pkg/module/roster/roster.go +++ b/pkg/module/roster/roster.go @@ -20,13 +20,14 @@ import ( "fmt" "strconv" + "github.com/ortuman/jackal/pkg/c2s" + "github.com/ortuman/jackal/pkg/router/stream" "github.com/google/uuid" "github.com/jackal-xmpp/stravaganza/v2" stanzaerror "github.com/jackal-xmpp/stravaganza/v2/errors/stanza" "github.com/jackal-xmpp/stravaganza/v2/jid" - "github.com/ortuman/jackal/pkg/c2s" "github.com/ortuman/jackal/pkg/hook" "github.com/ortuman/jackal/pkg/host" "github.com/ortuman/jackal/pkg/log" diff --git a/pkg/module/xep0191/blocklist.go b/pkg/module/xep0191/blocklist.go index b1f95c369..5aafedf24 100644 --- a/pkg/module/xep0191/blocklist.go +++ b/pkg/module/xep0191/blocklist.go @@ -18,8 +18,6 @@ import ( "context" "fmt" - c2smodel "github.com/ortuman/jackal/pkg/model/c2s" - "github.com/google/uuid" "github.com/jackal-xmpp/stravaganza/v2" stanzaerror "github.com/jackal-xmpp/stravaganza/v2/errors/stanza" @@ -29,6 +27,7 @@ import ( "github.com/ortuman/jackal/pkg/host" "github.com/ortuman/jackal/pkg/log" blocklistmodel "github.com/ortuman/jackal/pkg/model/blocklist" + c2smodel "github.com/ortuman/jackal/pkg/model/c2s" rostermodel "github.com/ortuman/jackal/pkg/model/roster" "github.com/ortuman/jackal/pkg/router" "github.com/ortuman/jackal/pkg/storage/repository" diff --git a/pkg/module/xep0280/carbons.go b/pkg/module/xep0280/carbons.go index 4b6ca6b15..5c91b1b53 100644 --- a/pkg/module/xep0280/carbons.go +++ b/pkg/module/xep0280/carbons.go @@ -18,8 +18,6 @@ import ( "context" "fmt" - c2smodel "github.com/ortuman/jackal/pkg/model/c2s" - "github.com/jackal-xmpp/stravaganza/v2" stanzaerror "github.com/jackal-xmpp/stravaganza/v2/errors/stanza" "github.com/jackal-xmpp/stravaganza/v2/jid" @@ -27,6 +25,7 @@ import ( "github.com/ortuman/jackal/pkg/hook" "github.com/ortuman/jackal/pkg/host" "github.com/ortuman/jackal/pkg/log" + c2smodel "github.com/ortuman/jackal/pkg/model/c2s" "github.com/ortuman/jackal/pkg/router" xmpputil "github.com/ortuman/jackal/pkg/util/xmpp" ) diff --git a/pkg/s2s/config.go b/pkg/s2s/config.go index 787fa2fc5..1213bfd0e 100644 --- a/pkg/s2s/config.go +++ b/pkg/s2s/config.go @@ -15,33 +15,50 @@ package s2s import ( - "crypto/tls" "time" ) -// Config defines S2S connection configuration. -type Config struct { - // DialTimeout defines S2S out dialer timeout. - DialTimeout time.Duration +// ListenersConfig defines a set of S2S listener configurations. +type ListenersConfig []ListenerConfig - // DialbackSecret defines S2S dialback secret key. - DialbackSecret string +// ListenerConfig defines S2S listener configuration. +type ListenerConfig struct { + // BindAddr defines listener incoming connections address. + BindAddr string `fig:"bind_addr"` + + // Port defines listener incoming connections port. + Port int `fig:"port" default:"5269"` // ConnectTimeout defines connection timeout. - ConnectTimeout time.Duration + ConnectTimeout time.Duration `fig:"conn_timeout" default:"3s"` // KeepAliveTimeout defines stream read timeout. - KeepAliveTimeout time.Duration + KeepAliveTimeout time.Duration `fig:"keep_alive_timeout" default:"10m"` // RequestTimeout defines S2S stream request timeout. - RequestTimeout time.Duration + RequestTimeout time.Duration `fig:"req_timeout" default:"15s"` // MaxStanzaSize is the maximum size a listener incoming stanza may have. - MaxStanzaSize int + MaxStanzaSize int `fig:"max_stanza_size" default:"131072"` // DirectTLS, if true, tls.Listen will be used as network listener. - DirectTLS bool + DirectTLS bool `fig:"direct_tls"` +} - // TLSConfig contains configuration to be used when TLS listener is enabled. - TLSConfig *tls.Config +// OutConfig defines S2S out configuration. +type OutConfig struct { + // DialbackSecret defines S2S dialback secret key. + DialbackSecret string `fig:"dialback_secret"` + + // DialTimeout defines S2S out dialer timeout. + DialTimeout time.Duration `fig:"dial_timeout" default:"5s"` + + // KeepAliveTimeout defines stream read timeout. + KeepAliveTimeout time.Duration `fig:"keep_alive_timeout" default:"10m"` + + // RequestTimeout defines S2S stream request timeout. + RequestTimeout time.Duration `fig:"req_timeout" default:"15s"` + + // MaxStanzaSize is the maximum size a listener incoming stanza may have. + MaxStanzaSize int `fig:"max_stanza_size" default:"131072"` } diff --git a/pkg/s2s/in.go b/pkg/s2s/in.go index 12d78c5f6..1d1e8bd50 100644 --- a/pkg/s2s/in.go +++ b/pkg/s2s/in.go @@ -51,9 +51,18 @@ const ( var inDisconnectTimeout = time.Second * 5 +type inConfig struct { + connectTimeout time.Duration + keepAliveTimeout time.Duration + reqTimeout time.Duration + maxStanzaSize int + directTLS bool + tlsConfig *tls.Config +} + type inS2S struct { id stream.S2SInID - cfg Config + cfg inConfig tr transport.Transport session session hosts hosts @@ -89,7 +98,7 @@ func newInS2S( kv kv.KV, shapers shaper.Shapers, hk *hook.Hooks, - cfg Config, + cfg inConfig, ) (*inS2S, error) { // set default rate limiter rLim := shapers.DefaultS2S().RateLimiter() @@ -104,7 +113,7 @@ func newInS2S( tr, hosts, xmppsession.Config{ - MaxStanzaSize: cfg.MaxStanzaSize, + MaxStanzaSize: cfg.maxStanzaSize, }, ) // init stream @@ -126,7 +135,7 @@ func newInS2S( doneCh: make(chan struct{}), state: inConnecting, } - if cfg.DirectTLS { + if cfg.directTLS { stm.flags.setSecured() // stream already secured } return stm, nil @@ -174,7 +183,7 @@ func (s *inS2S) start() error { func (s *inS2S) readLoop() { s.restartSession() - tm := time.AfterFunc(s.cfg.ConnectTimeout, s.connTimeout) // schedule connect timeout + tm := time.AfterFunc(s.cfg.connectTimeout, s.connTimeout) // schedule connect timeout elem, sErr := s.session.Receive() tm.Stop() @@ -188,7 +197,7 @@ func (s *inS2S) readLoop() { s.handleSessionResult(elem, sErr) doRead: - tm := time.AfterFunc(s.cfg.KeepAliveTimeout, s.connTimeout) // schedule read timeout + tm := time.AfterFunc(s.cfg.keepAliveTimeout, s.connTimeout) // schedule read timeout elem, sErr = s.session.Receive() tm.Stop() } @@ -662,7 +671,7 @@ func (s *inS2S) verifyDialbackKey(ctx context.Context, elem stravaganza.Element) return err } expectedKey := dbKey( - s.cfg.DialbackSecret, + s.outProvider.DialbackSecret(), sender, target, streamID, @@ -832,7 +841,7 @@ func (s *inS2S) runHook(ctx context.Context, hookName string, inf *hook.S2SStrea } func (s *inS2S) requestContext() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.Background(), s.cfg.RequestTimeout) + return context.WithTimeout(context.Background(), s.cfg.reqTimeout) } var currentID uint64 diff --git a/pkg/s2s/in_test.go b/pkg/s2s/in_test.go index bce706d8d..3149f3f6b 100644 --- a/pkg/s2s/in_test.go +++ b/pkg/s2s/in_test.go @@ -403,16 +403,18 @@ func TestInS2S_HandleSessionElement(t *testing.T) { } // Out provider mock outProviderMock := &outProviderMock{} + outProviderMock.DialbackSecretFunc = func() string { + return "adialbacksecret" + } outProviderMock.GetDialbackFunc = func(ctx context.Context, sender string, target string, params DialbackParams) (stream.S2SDialback, error) { return dbStreamMock, nil } stm := &inS2S{ - cfg: Config{ - DialbackSecret: "adialbacksecret", - KeepAliveTimeout: time.Minute, - RequestTimeout: time.Minute, - MaxStanzaSize: 8192, + cfg: inConfig{ + keepAliveTimeout: time.Minute, + reqTimeout: time.Minute, + maxStanzaSize: 8192, }, state: tt.state, flags: flags{fs: tt.flags}, @@ -500,10 +502,10 @@ func TestInS2S_HandleSessionError(t *testing.T) { } stm := &inS2S{ - cfg: Config{ - KeepAliveTimeout: time.Minute, - RequestTimeout: time.Minute, - MaxStanzaSize: 8192, + cfg: inConfig{ + keepAliveTimeout: time.Minute, + reqTimeout: time.Minute, + maxStanzaSize: 8192, }, state: tt.state, rq: runqueue.New(tt.name), diff --git a/pkg/s2s/interface.go b/pkg/s2s/interface.go index d72e0aeae..318670c26 100644 --- a/pkg/s2s/interface.go +++ b/pkg/s2s/interface.go @@ -83,6 +83,7 @@ type modules interface { //go:generate moq -out outprovider.mock_test.go . outProvider type outProvider interface { + DialbackSecret() string GetOut(ctx context.Context, sender, target string) (stream.S2SOut, error) GetDialback(ctx context.Context, sender, target string, params DialbackParams) (stream.S2SDialback, error) } diff --git a/pkg/s2s/out.go b/pkg/s2s/out.go index fb733d2b7..afe5be7da 100644 --- a/pkg/s2s/out.go +++ b/pkg/s2s/out.go @@ -85,11 +85,19 @@ type DialbackParams struct { Key string } +type outConfig struct { + dbSecret string + dialTimeout time.Duration + keepAliveTimeout time.Duration + reqTimeout time.Duration + maxStanzaSize int +} + type outS2S struct { typ outType + cfg outConfig sender string target string - cfg Config tr transport.Transport kv kv.KV session session @@ -114,11 +122,11 @@ func newOutS2S( target string, tlsCfg *tls.Config, hosts *host.Hosts, - cfg Config, kv kv.KV, shapers shaper.Shapers, hk *hook.Hooks, onClose func(s *outS2S), + cfg outConfig, ) *outS2S { stm := &outS2S{ typ: defaultType, @@ -131,7 +139,7 @@ func newOutS2S( kv: kv, shapers: shapers, hk: hk, - dialer: newDialer(cfg.DialTimeout, tlsCfg), + dialer: newDialer(cfg.dialTimeout, tlsCfg), } stm.rq = runqueue.New(stm.ID().String()) return stm @@ -142,9 +150,9 @@ func newDialbackS2S( target string, tlsCfg *tls.Config, hosts *host.Hosts, - cfg Config, - dbParams DialbackParams, shapers shaper.Shapers, + cfg outConfig, + dbParams DialbackParams, ) *outS2S { stm := &outS2S{ typ: dialbackType, @@ -154,7 +162,7 @@ func newDialbackS2S( tlsCfg: tlsCfg, cfg: cfg, dbParams: dbParams, - dialer: newDialer(cfg.DialTimeout, tlsCfg), + dialer: newDialer(cfg.dialTimeout, tlsCfg), dbResCh: make(chan stream.DialbackResult, 1), shapers: shapers, } @@ -216,7 +224,7 @@ func (s *outS2S) dial(ctx context.Context) error { s.tr, s.hosts, xmppsession.Config{ - MaxStanzaSize: s.cfg.MaxStanzaSize, + MaxStanzaSize: s.cfg.maxStanzaSize, IsOut: true, }, ) @@ -258,7 +266,7 @@ func (s *outS2S) start() error { } func (s *outS2S) readLoop() { - tm := time.AfterFunc(s.cfg.KeepAliveTimeout, s.connTimeout) + tm := time.AfterFunc(s.cfg.keepAliveTimeout, s.connTimeout) elem, sErr := s.session.Receive() tm.Stop() @@ -273,7 +281,7 @@ func (s *outS2S) readLoop() { doRead: if s.getState() != outAuthenticated { - tm = time.AfterFunc(s.cfg.KeepAliveTimeout, s.connTimeout) // schedule read timeout + tm = time.AfterFunc(s.cfg.keepAliveTimeout, s.connTimeout) // schedule read timeout } elem, sErr = s.session.Receive() if tm != nil { @@ -389,7 +397,7 @@ func (s *outS2S) handleConnected(ctx context.Context, elem stravaganza.Element) WithAttribute(stravaganza.To, s.target). WithText( dbKey( - s.cfg.DialbackSecret, + s.cfg.dbSecret, s.target, s.sender, streamID, @@ -604,7 +612,7 @@ func (s *outS2S) runHook(ctx context.Context, hookName string, inf *hook.S2SStre } func (s *outS2S) requestContext() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.Background(), s.cfg.RequestTimeout) + return context.WithTimeout(context.Background(), s.cfg.reqTimeout) } func hasExternalAuthMechanism(streamFeatures stravaganza.Element) bool { diff --git a/pkg/s2s/out_provider.go b/pkg/s2s/out_provider.go index 2f16484fb..c23bd02f5 100644 --- a/pkg/s2s/out_provider.go +++ b/pkg/s2s/out_provider.go @@ -32,8 +32,8 @@ import ( // OutProvider is an outgoing S2S stream provider. type OutProvider struct { + cfg OutConfig hosts *host.Hosts - cfg Config kv kv.KV shapers shaper.Shapers hk *hook.Hooks @@ -48,18 +48,18 @@ type OutProvider struct { // NewOutProvider creates and initializes a new OutProvider instance. func NewOutProvider( + cfg OutConfig, hosts *host.Hosts, kv kv.KV, shapers shaper.Shapers, hk *hook.Hooks, - cfg Config, ) *OutProvider { op := &OutProvider{ + cfg: cfg, hosts: hosts, shapers: shapers, kv: kv, hk: hk, - cfg: cfg, outStreams: make(map[string]s2sOut), doneCh: make(chan chan struct{}), } @@ -68,6 +68,11 @@ func NewOutProvider( return op } +// DialbackSecret returns dialback secret value. +func (p *OutProvider) DialbackSecret() string { + return p.cfg.DialbackSecret +} + // GetOut returns associated outgoing S2S stream given a sender-target pair domain. func (p *OutProvider) GetOut(ctx context.Context, sender, target string) (stream.S2SOut, error) { domainPair := getDomainPair(sender, target) @@ -189,23 +194,35 @@ func (p *OutProvider) newOutS2S(sender, target string) s2sOut { target, p.tlsConfig(target), p.hosts, - p.cfg, p.kv, p.shapers, p.hk, p.unregister, + outConfig{ + dbSecret: p.cfg.DialbackSecret, + dialTimeout: p.cfg.DialTimeout, + keepAliveTimeout: p.cfg.KeepAliveTimeout, + reqTimeout: p.cfg.RequestTimeout, + maxStanzaSize: p.cfg.MaxStanzaSize, + }, ) } -func (p *OutProvider) newDialbackS2S(sender, target string, dbParam DialbackParams) s2sDialback { +func (p *OutProvider) newDialbackS2S(sender, target string, dbParams DialbackParams) s2sDialback { return newDialbackS2S( sender, target, p.tlsConfig(target), p.hosts, - p.cfg, - dbParam, p.shapers, + outConfig{ + dbSecret: p.cfg.DialbackSecret, + dialTimeout: p.cfg.DialTimeout, + keepAliveTimeout: p.cfg.KeepAliveTimeout, + reqTimeout: p.cfg.RequestTimeout, + maxStanzaSize: p.cfg.MaxStanzaSize, + }, + dbParams, ) } diff --git a/pkg/s2s/out_test.go b/pkg/s2s/out_test.go index cb1921074..0da246067 100644 --- a/pkg/s2s/out_test.go +++ b/pkg/s2s/out_test.go @@ -296,10 +296,10 @@ func TestOutS2S_HandleSessionElement(t *testing.T) { stm := &outS2S{ sender: "jackal.im", target: "jabber.org", - cfg: Config{ - KeepAliveTimeout: time.Minute, - RequestTimeout: time.Minute, - MaxStanzaSize: 8192, + cfg: outConfig{ + keepAliveTimeout: time.Minute, + reqTimeout: time.Minute, + maxStanzaSize: 8192, }, typ: defaultType, state: tt.state, @@ -397,10 +397,10 @@ func TestDialbackS2S_HandleSessionElement(t *testing.T) { trMock.CloseFunc = func() error { return nil } stm := &outS2S{ - cfg: Config{ - KeepAliveTimeout: time.Minute, - RequestTimeout: time.Minute, - MaxStanzaSize: 8192, + cfg: outConfig{ + keepAliveTimeout: time.Minute, + reqTimeout: time.Minute, + maxStanzaSize: 8192, }, typ: dialbackType, dbParams: DialbackParams{ @@ -484,10 +484,10 @@ func TestOutS2S_HandleSessionError(t *testing.T) { } stm := &outS2S{ - cfg: Config{ - KeepAliveTimeout: time.Minute, - RequestTimeout: time.Minute, - MaxStanzaSize: 8192, + cfg: outConfig{ + keepAliveTimeout: time.Minute, + reqTimeout: time.Minute, + maxStanzaSize: 8192, }, typ: defaultType, state: tt.state, diff --git a/pkg/s2s/socket_listener.go b/pkg/s2s/socket_listener.go index 58ce83d74..7358bcb76 100644 --- a/pkg/s2s/socket_listener.go +++ b/pkg/s2s/socket_listener.go @@ -40,27 +40,25 @@ const ( // SocketListener represents a S2S socket listener type. type SocketListener struct { - addr string + cfg ListenerConfig hosts *host.Hosts router router.Router comps *component.Components mods *module.Modules outProvider *OutProvider - inHub *InHub + inHUB *InHub kv kv.KV shapers shaper.Shapers hk *hook.Hooks - cfg Config connHandlerFn func(conn net.Conn) ln net.Listener active uint32 } -// NewSocketListener returns a new S2S socket listener. -func NewSocketListener( - bindAddr string, - port int, +// NewListeners creates and initializes a set of S2S listeners based of cfg configuration. +func NewListeners( + cfg ListenersConfig, hosts *host.Hosts, router router.Router, comps *component.Components, @@ -70,19 +68,47 @@ func NewSocketListener( kv kv.KV, shapers shaper.Shapers, hk *hook.Hooks, - cfg Config, +) []*SocketListener { + var listeners []*SocketListener + for _, lnCfg := range cfg { + ln := newSocketListener( + lnCfg, + hosts, + router, + comps, + mods, + outProvider, + kv, + inHub, + shapers, + hk, + ) + listeners = append(listeners, ln) + } + return listeners +} + +func newSocketListener( + cfg ListenerConfig, + hosts *host.Hosts, + router router.Router, + comps *component.Components, + mods *module.Modules, + outProvider *OutProvider, + kv kv.KV, + hub *InHub, + shapers shaper.Shapers, + hk *hook.Hooks, ) *SocketListener { - addr := getAddress(bindAddr, port) ln := &SocketListener{ - addr: addr, cfg: cfg, hosts: hosts, router: router, comps: comps, mods: mods, outProvider: outProvider, - inHub: inHub, kv: kv, + inHUB: hub, shapers: shapers, hk: hk, } @@ -98,12 +124,12 @@ func (l *SocketListener) Start(ctx context.Context) error { lc := net.ListenConfig{ KeepAlive: listenKeepAlive, } - ln, err = lc.Listen(ctx, "tcp", l.addr) + ln, err = lc.Listen(ctx, "tcp", l.getAddress()) if err != nil { return err } if l.cfg.DirectTLS { - ln = tls.NewListener(ln, l.cfg.TLSConfig) + ln = tls.NewListener(ln, l.getTLSConfig()) } l.ln = ln l.active = 1 @@ -115,7 +141,7 @@ func (l *SocketListener) Start(ctx context.Context) error { continue } log.Infow( - fmt.Sprintf("Received S2S incoming connection at %s", l.addr), + fmt.Sprintf("Received S2S incoming connection at %s", l.getAddress()), "remote_address", conn.RemoteAddr().String(), ) @@ -123,19 +149,19 @@ func (l *SocketListener) Start(ctx context.Context) error { } }() log.Infow( - fmt.Sprintf("Accepting S2S socket connections at %s", l.addr), + fmt.Sprintf("Accepting S2S socket connections at %s", l.getAddress()), "direct_tls", l.cfg.DirectTLS, ) return nil } // Stop stops handling incoming S2S connections and closes underlying TCP listener. -func (l *SocketListener) Stop(_ context.Context) error { +func (l *SocketListener) Stop(ctx context.Context) error { atomic.StoreUint32(&l.active, 0) if err := l.ln.Close(); err != nil { return err } - log.Infof("Stopped S2S listener at %s", l.addr) + log.Infof("Stopped S2S listener at %s", l.getAddress()) return nil } @@ -148,11 +174,18 @@ func (l *SocketListener) handleConn(conn net.Conn) { l.comps, l.mods, l.outProvider, - l.inHub, + l.inHUB, l.kv, l.shapers, l.hk, - l.cfg, + inConfig{ + connectTimeout: l.cfg.ConnectTimeout, + keepAliveTimeout: l.cfg.KeepAliveTimeout, + reqTimeout: l.cfg.RequestTimeout, + maxStanzaSize: l.cfg.MaxStanzaSize, + directTLS: l.cfg.DirectTLS, + tlsConfig: l.getTLSConfig(), + }, ) if err != nil { log.Warnf("Failed to initialize incoming S2S stream: %v", err) @@ -165,6 +198,14 @@ func (l *SocketListener) handleConn(conn net.Conn) { } } -func getAddress(bindAddr string, port int) string { - return bindAddr + ":" + strconv.Itoa(port) +func (l *SocketListener) getTLSConfig() *tls.Config { + return &tls.Config{ + Certificates: l.hosts.Certificates(), + ClientAuth: tls.RequireAndVerifyClientCert, + MinVersion: tls.VersionTLS12, + } +} + +func (l *SocketListener) getAddress() string { + return l.cfg.BindAddr + ":" + strconv.Itoa(l.cfg.Port) } diff --git a/pkg/s2s/socket_listener_test.go b/pkg/s2s/socket_listener_test.go index a07e8eb4f..462c01877 100644 --- a/pkg/s2s/socket_listener_test.go +++ b/pkg/s2s/socket_listener_test.go @@ -29,7 +29,7 @@ func TestSocketListener_Listen(t *testing.T) { var handledConn uint32 s := &SocketListener{ - addr: ":51125", + cfg: ListenerConfig{Port: 51125}, connHandlerFn: func(_ net.Conn) { atomic.StoreUint32(&handledConn, 1) }, diff --git a/pkg/version/version.go b/pkg/version/version.go index 108b59f70..fb2066cb1 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -19,7 +19,7 @@ import ( ) // Version represents application version. -var Version = NewVersion(0, 54, 1) +var Version = NewVersion(0, 55, 0) // APIVersion represents admin API version. var APIVersion = NewVersion(1, 0, 0)