Skip to content

Commit

Permalink
Added configuration possibility to specify both, an external (public)…
Browse files Browse the repository at this point in the history
… and an internal (listen) IP address for each pluto node. This should allow for cloud deployments where Elastic IPs are not available to locally bind to but to communicate with other nodes.
  • Loading branch information
numbleroot committed Jan 10, 2017
1 parent ce17eb7 commit a176bda
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 49 deletions.
8 changes: 4 additions & 4 deletions comm/receiver-sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ func TestSenderReceiver(t *testing.T) {
}

// Listen on defined worker-1 socket for TLS connections.
socketN1, err := tls.Listen("tcp", fmt.Sprintf("%s:%s", testEnv.Config.Workers[n1].IP, testEnv.Config.Workers[n1].SyncPort), internalTLSConfigN1)
socketN1, err := tls.Listen("tcp", fmt.Sprintf("%s:%s", testEnv.Config.Workers[n1].ListenIP, testEnv.Config.Workers[n1].SyncPort), internalTLSConfigN1)
if err != nil {
t.Fatalf("[comm_test.TestSenderReceiver] Expected TLS listen for worker-1 not to fail but received: %s\n", err.Error())
}

// Listen on defined storage socket for TLS connections.
socketN2, err := tls.Listen("tcp", fmt.Sprintf("%s:%s", testEnv.Config.Storage.IP, testEnv.Config.Storage.SyncPort), internalTLSConfigN2)
socketN2, err := tls.Listen("tcp", fmt.Sprintf("%s:%s", testEnv.Config.Storage.ListenIP, testEnv.Config.Storage.SyncPort), internalTLSConfigN2)
if err != nil {
t.Fatalf("[comm_test.TestSenderReceiver] Expected TLS listen for storage not to fail but received: %s\n", err.Error())
}
Expand Down Expand Up @@ -114,11 +114,11 @@ func TestSenderReceiver(t *testing.T) {

// Create map of connections for worker-1.
connsN1 := make(map[string]string)
connsN1[n2] = fmt.Sprintf("%s:%s", testEnv.Config.Storage.IP, testEnv.Config.Storage.SyncPort)
connsN1[n2] = fmt.Sprintf("%s:%s", testEnv.Config.Storage.PublicIP, testEnv.Config.Storage.SyncPort)

// Create map of connections for storage.
connsN2 := make(map[string]string)
connsN2[n1] = fmt.Sprintf("%s:%s", testEnv.Config.Workers[n1].IP, testEnv.Config.Workers[n1].SyncPort)
connsN2[n1] = fmt.Sprintf("%s:%s", testEnv.Config.Workers[n1].PublicIP, testEnv.Config.Workers[n1].SyncPort)

// Initialize sending interface for worker-1.
chan1, err := comm.InitSender(n1, "../test-comm-sending-worker-1.log", internalTLSConfigN1, testEnv.Config.IntlConnTimeout, testEnv.Config.IntlConnRetry, chanIncN1, chanUpdN1, n1DownSender, connsN1)
Expand Down
19 changes: 14 additions & 5 deletions config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ Greeting = "Pluto ready."
HierarchySeparator = "."

[Distributor]
IP = "127.0.0.1"
# PublicIP denotes the Internet-facing IP address
# this pluto node is supposed to be reachable via.
PublicIP = "127.0.0.1"
# ListenIP in turn is used by this pluto process to
# bind locally to and listen for incoming requests.
ListenIP = "127.0.0.1"
Port = "993"
AuthAdapter = "AuthPostgres"

Expand Down Expand Up @@ -51,7 +56,8 @@ AuthAdapter = "AuthPostgres"
[Workers]

[Workers.worker-1]
IP = "127.0.0.1"
PublicIP = "127.0.0.1"
ListenIP = "127.0.0.1"
# MailPort specifies the port for connections
# concerning the IMAP protocol.
MailPort = "20001"
Expand All @@ -76,7 +82,8 @@ AuthAdapter = "AuthPostgres"
KeyLoc = "/very/complicated/and/long/path/to/your/internal-worker-1-key.pem"

[Workers.worker-2]
IP = "127.0.0.1"
PublicIP = "127.0.0.1"
ListenIP = "127.0.0.1"
MailPort = "20002"
SyncPort = "30002"
UserStart = 11
Expand All @@ -89,7 +96,8 @@ AuthAdapter = "AuthPostgres"
KeyLoc = "/very/complicated/and/long/path/to/your/internal-worker-2-key.pem"

[Workers.worker-3]
IP = "127.0.0.1"
PublicIP = "127.0.0.1"
ListenIP = "127.0.0.1"
MailPort = "20003"
SyncPort = "30003"
UserStart = 21
Expand All @@ -103,7 +111,8 @@ AuthAdapter = "AuthPostgres"


[Storage]
IP = "127.0.0.1"
PublicIP = "127.0.0.1"
ListenIP = "127.0.0.1"
MailPort = "21000"
SyncPort = "31000"
MaildirRoot = "/for/example/some/very/unique/path/Maildir/"
Expand Down
9 changes: 6 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type IMAP struct {
// the first entry point of a pluto setup, the
// IMAP request authenticator and distributor.
type Distributor struct {
IP string
PublicIP string
ListenIP string
Port string
AuthAdapter string
PublicTLS TLS
Expand All @@ -46,7 +47,8 @@ type Distributor struct {
// Worker contains the connection and user sharding
// information for an individual IMAP worker node.
type Worker struct {
IP string
PublicIP string
ListenIP string
MailPort string
SyncPort string
UserStart int
Expand All @@ -59,7 +61,8 @@ type Worker struct {
// Storage configures the global database node
// storing all user data in a very safe manner.
type Storage struct {
IP string
PublicIP string
ListenIP string
MailPort string
SyncPort string
MaildirRoot string
Expand Down
6 changes: 3 additions & 3 deletions crypto/generate_pki.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func main() {
log.Println("[crypto.GeneratePKI] === Done generating root certificate ===\n")

// Generate distributor's internal key and signed certificate.
err = CreateNodeCert(*pathPrefix, "internal-distributor", *rsaBits, notBefore, notAfter, []string{config.Distributor.IP}, rootCert, rootKey)
err = CreateNodeCert(*pathPrefix, "internal-distributor", *rsaBits, notBefore, notAfter, []string{config.Distributor.PublicIP}, rootCert, rootKey)
if err != nil {
log.Fatal(err)
}
Expand All @@ -241,15 +241,15 @@ func main() {

// For each worker node, generate an internal key pair
// and a signed certificate.
err = CreateNodeCert(*pathPrefix, fmt.Sprintf("internal-%s", name), *rsaBits, notBefore, notAfter, []string{worker.IP}, rootCert, rootKey)
err = CreateNodeCert(*pathPrefix, fmt.Sprintf("internal-%s", name), *rsaBits, notBefore, notAfter, []string{worker.PublicIP}, rootCert, rootKey)
if err != nil {
log.Fatal(err)
}
}

// Generate the storage's internal key pair
// and signed certificate.
err = CreateNodeCert(*pathPrefix, "internal-storage", *rsaBits, notBefore, notAfter, []string{config.Storage.IP}, rootCert, rootKey)
err = CreateNodeCert(*pathPrefix, "internal-storage", *rsaBits, notBefore, notAfter, []string{config.Storage.PublicIP}, rootCert, rootKey)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion imap/distributor-commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (distr *Distributor) Login(c *Connection, req *Request) bool {
distr.lock.RLock()

// Store worker connection information.
workerIP := distr.Config.Workers[respWorker].IP
workerIP := distr.Config.Workers[respWorker].PublicIP
workerPort := distr.Config.Workers[respWorker].MailPort

distr.lock.RUnlock()
Expand Down
8 changes: 4 additions & 4 deletions imap/distributor-commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestMain(m *testing.M) {
func TestCapability(t *testing.T) {

// Connect to IMAP distributor.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestCapability] Error during connection attempt to IMAP distributor: %s\n", err.Error())
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestLogout(t *testing.T) {
var answer string

// Connect to IMAP distributor.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestLogout] Error during connection attempt to IMAP distributor: %s\n", err.Error())
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestLogout(t *testing.T) {
func TestStartTLS(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestStartTLS] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestStartTLS(t *testing.T) {
func TestLogin(t *testing.T) {

// Connect to IMAP distributor.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestLogin] Error during connection attempt to IMAP distributor: %s\n", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion imap/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func InitDistributor(config *config.Config) (*Distributor, error) {
}

// Start to listen for incoming public connections on defined IP and port.
distr.Socket, err = tls.Listen("tcp", fmt.Sprintf("%s:%s", config.Distributor.IP, config.Distributor.Port), publicTLSConfig)
distr.Socket, err = tls.Listen("tcp", fmt.Sprintf("%s:%s", config.Distributor.ListenIP, config.Distributor.Port), publicTLSConfig)
if err != nil {
return nil, fmt.Errorf("[imap.InitDistributor] Listening for public TLS connections failed with: %s\n", err.Error())
}
Expand Down
32 changes: 16 additions & 16 deletions imap/imap-node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ var proxiedExpungeTests = []struct {
func TestSelect(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestSelect] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestSelect(t *testing.T) {
func TestCreate(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestCreate] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestCreate(t *testing.T) {
func TestDelete(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestDelete] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -504,7 +504,7 @@ func TestDelete(t *testing.T) {
func TestList(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestList] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -569,7 +569,7 @@ func TestList(t *testing.T) {
func TestAppend(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestAppend] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -644,7 +644,7 @@ func TestAppend(t *testing.T) {
func TestStore(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestStore] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -672,7 +672,7 @@ func TestStore(t *testing.T) {
// the used connection.
c.Terminate()

conn, err = tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err = tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestStore] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -750,7 +750,7 @@ func TestStore(t *testing.T) {
func TestExpunge(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestExpunge] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -825,7 +825,7 @@ func TestExpunge(t *testing.T) {
func TestProxiedSelect(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestProxiedSelect] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -890,7 +890,7 @@ func TestProxiedSelect(t *testing.T) {
func TestProxiedCreate(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestProxiedCreate] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -955,7 +955,7 @@ func TestProxiedCreate(t *testing.T) {
func TestProxiedDelete(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestProxiedDelete] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -1020,7 +1020,7 @@ func TestProxiedDelete(t *testing.T) {
func TestProxiedList(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestProxiedList] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -1085,7 +1085,7 @@ func TestProxiedList(t *testing.T) {
func TestProxiedAppend(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestProxiedAppend] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -1160,7 +1160,7 @@ func TestProxiedAppend(t *testing.T) {
func TestProxiedStore(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestProxiedStore] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -1188,7 +1188,7 @@ func TestProxiedStore(t *testing.T) {
// the used connection.
c.Terminate()

conn, err = tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err = tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestProxiedStore] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down Expand Up @@ -1266,7 +1266,7 @@ func TestProxiedStore(t *testing.T) {
func TestProxiedExpunge(t *testing.T) {

// Connect to IMAP server.
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.IP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
conn, err := tls.Dial("tcp", (testEnv.Config.Distributor.PublicIP + ":" + testEnv.Config.Distributor.Port), testEnv.TLSConfig)
if err != nil {
t.Fatalf("[imap.TestProxiedExpunge] Error during connection attempt to IMAP server: %s\n", err.Error())
}
Expand Down
6 changes: 3 additions & 3 deletions imap/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ func InitStorage(config *config.Config) (*Storage, error) {
}

// Start to listen for incoming internal connections on defined IP and sync port.
storage.SyncSocket, err = tls.Listen("tcp", fmt.Sprintf("%s:%s", config.Storage.IP, config.Storage.SyncPort), internalTLSConfig)
storage.SyncSocket, err = tls.Listen("tcp", fmt.Sprintf("%s:%s", config.Storage.ListenIP, config.Storage.SyncPort), internalTLSConfig)
if err != nil {
return nil, fmt.Errorf("[imap.InitStorage] Listening for internal sync TLS connections failed with: %s\n", err.Error())
}

log.Printf("[imap.InitStorage] Listening for incoming sync requests on %s.\n", storage.SyncSocket.Addr())

// Start to listen for incoming internal connections on defined IP and mail port.
storage.MailSocket, err = tls.Listen("tcp", fmt.Sprintf("%s:%s", config.Storage.IP, config.Storage.MailPort), internalTLSConfig)
storage.MailSocket, err = tls.Listen("tcp", fmt.Sprintf("%s:%s", config.Storage.ListenIP, config.Storage.MailPort), internalTLSConfig)
if err != nil {
return nil, fmt.Errorf("[imap.InitStorage] Listening for internal IMAP TLS connections failed with: %s\n", err.Error())
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func InitStorage(config *config.Config) (*Storage, error) {

// Create subnet to distribute CRDT changes in.
curCRDTSubnet := make(map[string]string)
curCRDTSubnet[workerName] = fmt.Sprintf("%s:%s", workerNode.IP, workerNode.SyncPort)
curCRDTSubnet[workerName] = fmt.Sprintf("%s:%s", workerNode.PublicIP, workerNode.SyncPort)

// Init sending part of CRDT communication and send messages in background.
storage.SyncSendChans[workerName], err = comm.InitSender("storage", sendCRDTLog, internalTLSConfig, config.IntlConnTimeout, config.IntlConnRetry, chanIncVClockWorker, chanUpdVClockWorker, downSender, curCRDTSubnet)
Expand Down
Loading

0 comments on commit a176bda

Please sign in to comment.