Skip to content

Commit

Permalink
Add support for wire encryption when talking to the datanodes
Browse files Browse the repository at this point in the history
This implements the hadoop version of DIGEST-MD5 SASL data protection, loosely
based on RFC 2831. Authentication/encryption using rc4 is supported, but not
3des.

Closes #236, fixes #145.
  • Loading branch information
Matthew Topol authored and colinmarc committed Jul 17, 2020
1 parent 15f6da0 commit 1596ee1
Show file tree
Hide file tree
Showing 29 changed files with 1,415 additions and 296 deletions.
7 changes: 4 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ env:
- PLATFORM=hdp2
- PLATFORM=cdh5
- PLATFORM=cdh6
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=authentication
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=integrity
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=authentication DATA_TRANSFER_PROTECTION=authentication
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=integrity DATA_TRANSFER_PROTECTION=integrity
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy DATA_TRANSFER_PROTECTION=privacy
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy DATA_TRANSFER_PROTECTION=privacy AES=true
before_install:
- export GO111MODULE=on # Travis installs into $GOPATH/src, which disables module support by default.
install:
Expand Down
101 changes: 99 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,35 @@ import (
"net"
"os"
"os/user"
"sort"
"strings"

krb "gopkg.in/jcmturner/gokrb5.v7/client"

"github.com/colinmarc/hdfs/v2/hadoopconf"
hadoop "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common"
hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
"github.com/colinmarc/hdfs/v2/internal/rpc"
krb "gopkg.in/jcmturner/gokrb5.v7/client"
"github.com/colinmarc/hdfs/v2/internal/transfer"
)

type dialContext func(ctx context.Context, network, addr string) (net.Conn, error)

const (
DataTransferProtectionAuthentication = "authentication"
DataTransferProtectionIntegrity = "integrity"
DataTransferProtectionPrivacy = "privacy"
)

// Client represents a connection to an HDFS cluster. A Client will
// automatically maintain leases for any open files, preventing other clients
// from modifying them, until Close is called.
type Client struct {
namenode *rpc.NamenodeConnection
defaults *hdfs.FsServerDefaultsProto
options ClientOptions

defaults *hdfs.FsServerDefaultsProto
encryptionKey *hdfs.DataEncryptionKeyProto
}

// ClientOptions represents the configurable options for a client.
Expand Down Expand Up @@ -68,6 +82,16 @@ type ClientOptions struct {
// multi-namenode setup (for example: 'nn/_HOST'). It is required if
// KerberosClient is provided.
KerberosServicePrincipleName string
// DataTransferProtection specifies whether or not authentication, data
// signature integrity checks, and wire encryption is required when
// communicating the the datanodes. A value of "authentication" implies
// just authentication, a value of "integrity" implies both authentication
// and integrity checks, and a value of "privacy" implies all three. The
// Client may negotiate a higher level of protection if it is requested
// by the datanode; for example, if the datanode and namenode hdfs-site.xml
// has dfs.encrypt.data.transfer enabled, this setting is ignored and
// a level of "privacy" is used.
DataTransferProtection string
}

// ClientOptionsFromConf attempts to load any relevant configuration options
Expand All @@ -91,6 +115,10 @@ type ClientOptions struct {
// // (everything after the first '@') chopped off.
// KerberosServicePrincipleName string
//
// // Determined by dfs.data.transfer.protection or dfs.encrypt.data.transfer
// // (in the latter case, it is set to 'privacy').
// DataTransferProtection string
//
// Because of the way Kerberos can be forced by the Hadoop configuration but not
// actually configured, you should check for whether KerberosClient is set in
// the resulting ClientOptions before proceeding:
Expand All @@ -116,6 +144,28 @@ func ClientOptionsFromConf(conf hadoopconf.HadoopConf) ClientOptions {
options.KerberosServicePrincipleName = strings.Split(conf["dfs.namenode.kerberos.principal"], "@")[0]
}

// Note that we take the highest setting, rather than allowing a range of
// alternatives. 'authentication', 'integrity', and 'privacy' are
// alphabetical for our convenience.
dataTransferProt := strings.Split(
strings.ToLower(conf["dfs.data.transfer.protection"]), ",")
sort.Strings(dataTransferProt)

for _, val := range dataTransferProt {
switch val {
case "privacy":
options.DataTransferProtection = "privacy"
case "integrity":
options.DataTransferProtection = "integrity"
case "authentication":
options.DataTransferProtection = "authentication"
}
}

if strings.ToLower(conf["dfs.encrypt.data.transfer"]) == "true" {
options.DataTransferProtection = "privacy"
}

return options
}

Expand Down Expand Up @@ -260,6 +310,53 @@ func (c *Client) fetchDefaults() (*hdfs.FsServerDefaultsProto, error) {
return c.defaults, nil
}

func (c *Client) fetchDataEncryptionKey() (*hdfs.DataEncryptionKeyProto, error) {
if c.encryptionKey != nil {
return c.encryptionKey, nil
}

req := &hdfs.GetDataEncryptionKeyRequestProto{}
resp := &hdfs.GetDataEncryptionKeyResponseProto{}

err := c.namenode.Execute("getDataEncryptionKey", req, resp)
if err != nil {
return nil, err
}

c.encryptionKey = resp.GetDataEncryptionKey()
return c.encryptionKey, nil
}

func (c *Client) wrapDatanodeDial(dc dialContext, token *hadoop.TokenProto) (dialContext, error) {
wrap := false
if c.options.DataTransferProtection != "" {
wrap = true
} else {
defaults, err := c.fetchDefaults()
if err != nil {
return nil, err
}

wrap = defaults.GetEncryptDataTransfer()
}

if wrap {
key, err := c.fetchDataEncryptionKey()
if err != nil {
return nil, err
}

return (&transfer.SaslDialer{
DialFunc: dc,
Key: key,
Token: token,
EnforceQop: c.options.DataTransferProtection,
}).DialContext, nil
}

return dc, nil
}

// Close terminates all underlying socket connections to remote server.
func (c *Client) Close() error {
return c.namenode.Close()
Expand Down
4 changes: 4 additions & 0 deletions cmd/hdfs/test/helper.bash
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ export HADOOP_FS=${HADOOP_FS-"hadoop fs"}
export ROOT_TEST_DIR="$BATS_TEST_DIRNAME/../../.."
export HDFS="$ROOT_TEST_DIR/hdfs"

# jdk11 is missing some APIs that the older jars here rely on
# so point at openjdk8 for now
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

# stolen from https://github.com/sstephenson/rbenv/blob/master/test/test_helper.bash

flunk() {
Expand Down
28 changes: 21 additions & 7 deletions file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
"github.com/colinmarc/hdfs/v2/internal/rpc"
"github.com/colinmarc/hdfs/v2/internal/transfer"
"github.com/golang/protobuf/proto"
)

Expand All @@ -22,7 +22,7 @@ type FileReader struct {
info os.FileInfo

blocks []*hdfs.LocatedBlockProto
blockReader *rpc.BlockReader
blockReader *transfer.BlockReader
deadline time.Time
offset int64

Expand Down Expand Up @@ -97,14 +97,21 @@ func (f *FileReader) Checksum() ([]byte, error) {
paddedLength := 32
totalLength := 0
checksum := md5.New()

for _, block := range f.blocks {
cr := &rpc.ChecksumReader{
d, err := f.client.wrapDatanodeDial(f.client.options.DatanodeDialFunc,
block.GetBlockToken())
if err != nil {
return nil, err
}

cr := &transfer.ChecksumReader{
Block: block,
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: d,
}

err := cr.SetDeadline(f.deadline)
err = cr.SetDeadline(f.deadline)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -400,12 +407,19 @@ func (f *FileReader) getNewBlockReader() error {
end := start + block.GetB().GetNumBytes()

if start <= off && off < end {
f.blockReader = &rpc.BlockReader{
dialFunc, err := f.client.wrapDatanodeDial(
f.client.options.DatanodeDialFunc,
block.GetBlockToken())
if err != nil {
return err
}

f.blockReader = &transfer.BlockReader{
ClientName: f.client.namenode.ClientName,
Block: block,
Offset: int64(off - start),
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: dialFunc,
}

return f.SetDeadline(f.deadline)
Expand Down
8 changes: 4 additions & 4 deletions file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestFileReadAt(t *testing.T) {
off := 0
for off < len(buf) {
n, err := file.ReadAt(buf[off:], int64(testStrOff+off))
assert.NoError(t, err)
require.NoError(t, err)
assert.True(t, n > 0)
off += n
}
Expand All @@ -175,7 +175,7 @@ func TestFileReadAt(t *testing.T) {
off = 0
for off < len(buf) {
n, err := file.ReadAt(buf[off:], int64(testStr2Off+off))
assert.NoError(t, err)
require.NoError(t, err)
assert.True(t, n > 0)
off += n
}
Expand Down Expand Up @@ -362,11 +362,11 @@ func TestFileReadDeadline(t *testing.T) {
file, err := client.Open("/_test/foo.txt")
require.NoError(t, err)

file.SetDeadline(time.Now().Add(100 * time.Millisecond))
file.SetDeadline(time.Now().Add(200 * time.Millisecond))
_, err = file.Read([]byte{0, 0})
assert.NoError(t, err)

time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
_, err = file.Read([]byte{0, 0})
assert.NotNil(t, err)
}
Expand Down
30 changes: 22 additions & 8 deletions file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
"github.com/colinmarc/hdfs/v2/internal/rpc"
"github.com/colinmarc/hdfs/v2/internal/transfer"
"github.com/golang/protobuf/proto"
)

Expand All @@ -19,7 +19,7 @@ type FileWriter struct {
replication int
blockSize int64

blockWriter *rpc.BlockWriter
blockWriter *transfer.BlockWriter
deadline time.Time
closed bool
}
Expand Down Expand Up @@ -112,14 +112,21 @@ func (c *Client) Append(name string) (*FileWriter, error) {
return f, nil
}

f.blockWriter = &rpc.BlockWriter{
dialFunc, err := f.client.wrapDatanodeDial(
f.client.options.DatanodeDialFunc,
block.GetBlockToken())
if err != nil {
return nil, err
}

f.blockWriter = &transfer.BlockWriter{
ClientName: f.client.namenode.ClientName,
Block: block,
BlockSize: f.blockSize,
Offset: int64(block.B.GetNumBytes()),
Append: true,
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: dialFunc,
}

err = f.blockWriter.SetDeadline(f.deadline)
Expand Down Expand Up @@ -176,7 +183,7 @@ func (f *FileWriter) Write(b []byte) (int, error) {
for off < len(b) {
n, err := f.blockWriter.Write(b[off:])
off += n
if err == rpc.ErrEndOfBlock {
if err == transfer.ErrEndOfBlock {
err = f.startNewBlock()
}

Expand Down Expand Up @@ -262,12 +269,19 @@ func (f *FileWriter) startNewBlock() error {
return &os.PathError{"create", f.name, interpretException(err)}
}

f.blockWriter = &rpc.BlockWriter{
block := addBlockResp.GetBlock()
dialFunc, err := f.client.wrapDatanodeDial(
f.client.options.DatanodeDialFunc, block.GetBlockToken())
if err != nil {
return err
}

f.blockWriter = &transfer.BlockWriter{
ClientName: f.client.namenode.ClientName,
Block: addBlockResp.GetBlock(),
Block: block,
BlockSize: f.blockSize,
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: dialFunc,
}

return f.blockWriter.SetDeadline(f.deadline)
Expand Down
4 changes: 4 additions & 0 deletions fixtures.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
set -e

# jdk11 is missing some APIs that the older jars here rely on
# so point at openjdk8 for now
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

HADOOP_FS=${HADOOP_FS-"hadoop fs"}
$HADOOP_FS -mkdir -p "/_test"
$HADOOP_FS -chmod 777 "/_test"
Expand Down
Loading

0 comments on commit 1596ee1

Please sign in to comment.