Skip to content

Commit

Permalink
Add support for data wire encryption
Browse files Browse the repository at this point in the history
This implements the hadoop version of DIGEST-MD5 SASL data protection, loosely
based on RFC 2831. Authentication/encryption using rc4 is supported, but not
3des.

Fixes #145, closes #236.
  • Loading branch information
Matthew Topol authored and colinmarc committed Jul 17, 2020
1 parent 15f6da0 commit 8ef8936
Show file tree
Hide file tree
Showing 29 changed files with 1,354 additions and 293 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ env:
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=authentication
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=integrity
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy DATA_TRANSFER_PROTECTION=authentication
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy DATA_TRANSFER_PROTECTION=integrity
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy DATA_TRANSFER_PROTECTION=privacy
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy DATA_TRANSFER_PROTECTION=privacy AES=true
before_install:
- export GO111MODULE=on # Travis installs into $GOPATH/src, which disables module support by default.
install:
Expand Down
50 changes: 48 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,26 @@ import (
"os/user"
"strings"

krb "gopkg.in/jcmturner/gokrb5.v7/client"

"github.com/colinmarc/hdfs/v2/hadoopconf"
hadoop "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common"
hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
"github.com/colinmarc/hdfs/v2/internal/rpc"
krb "gopkg.in/jcmturner/gokrb5.v7/client"
"github.com/colinmarc/hdfs/v2/internal/transfer"
)

type dialContext func(ctx context.Context, network, addr string) (net.Conn, error)

// Client represents a connection to an HDFS cluster. A Client will
// automatically maintain leases for any open files, preventing other clients
// from modifying them, until Close is called.
type Client struct {
namenode *rpc.NamenodeConnection
defaults *hdfs.FsServerDefaultsProto
options ClientOptions

defaults *hdfs.FsServerDefaultsProto
encryptionKey *hdfs.DataEncryptionKeyProto
}

// ClientOptions represents the configurable options for a client.
Expand Down Expand Up @@ -260,6 +267,45 @@ func (c *Client) fetchDefaults() (*hdfs.FsServerDefaultsProto, error) {
return c.defaults, nil
}

func (c *Client) fetchDataEncryptionKey() (*hdfs.DataEncryptionKeyProto, error) {
if c.encryptionKey != nil {
return c.encryptionKey, nil
}

req := &hdfs.GetDataEncryptionKeyRequestProto{}
resp := &hdfs.GetDataEncryptionKeyResponseProto{}

err := c.namenode.Execute("getDataEncryptionKey", req, resp)
if err != nil {
return nil, err
}

c.encryptionKey = resp.GetDataEncryptionKey()
return c.encryptionKey, nil
}

func (c *Client) wrapDatanodeDial(dc dialContext, token *hadoop.TokenProto) (dialContext, error) {
defaults, err := c.fetchDefaults()
if err != nil {
return nil, err
}

if defaults.GetEncryptDataTransfer() {
key, err := c.fetchDataEncryptionKey()
if err != nil {
return nil, err
}

return (&transfer.SaslDialer{
DialFunc: dc,
Key: key,
Token: token,
}).DialContext, nil
}

return dc, nil
}

// Close terminates all underlying socket connections to remote server.
func (c *Client) Close() error {
return c.namenode.Close()
Expand Down
4 changes: 4 additions & 0 deletions cmd/hdfs/test/helper.bash
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ export HADOOP_FS=${HADOOP_FS-"hadoop fs"}
export ROOT_TEST_DIR="$BATS_TEST_DIRNAME/../../.."
export HDFS="$ROOT_TEST_DIR/hdfs"

# jdk11 is missing some APIs that the older jars here rely on
# so point at openjdk8 for now
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

# stolen from https://github.com/sstephenson/rbenv/blob/master/test/test_helper.bash

flunk() {
Expand Down
28 changes: 21 additions & 7 deletions file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
"github.com/colinmarc/hdfs/v2/internal/rpc"
"github.com/colinmarc/hdfs/v2/internal/transfer"
"github.com/golang/protobuf/proto"
)

Expand All @@ -22,7 +22,7 @@ type FileReader struct {
info os.FileInfo

blocks []*hdfs.LocatedBlockProto
blockReader *rpc.BlockReader
blockReader *transfer.BlockReader
deadline time.Time
offset int64

Expand Down Expand Up @@ -97,14 +97,21 @@ func (f *FileReader) Checksum() ([]byte, error) {
paddedLength := 32
totalLength := 0
checksum := md5.New()

for _, block := range f.blocks {
cr := &rpc.ChecksumReader{
d, err := f.client.wrapDatanodeDial(f.client.options.DatanodeDialFunc,
block.GetBlockToken())
if err != nil {
return nil, err
}

cr := &transfer.ChecksumReader{
Block: block,
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: d,
}

err := cr.SetDeadline(f.deadline)
err = cr.SetDeadline(f.deadline)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -400,12 +407,19 @@ func (f *FileReader) getNewBlockReader() error {
end := start + block.GetB().GetNumBytes()

if start <= off && off < end {
f.blockReader = &rpc.BlockReader{
dialFunc, err := f.client.wrapDatanodeDial(
f.client.options.DatanodeDialFunc,
block.GetBlockToken())
if err != nil {
return err
}

f.blockReader = &transfer.BlockReader{
ClientName: f.client.namenode.ClientName,
Block: block,
Offset: int64(off - start),
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: dialFunc,
}

return f.SetDeadline(f.deadline)
Expand Down
8 changes: 4 additions & 4 deletions file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestFileReadAt(t *testing.T) {
off := 0
for off < len(buf) {
n, err := file.ReadAt(buf[off:], int64(testStrOff+off))
assert.NoError(t, err)
require.NoError(t, err)
assert.True(t, n > 0)
off += n
}
Expand All @@ -175,7 +175,7 @@ func TestFileReadAt(t *testing.T) {
off = 0
for off < len(buf) {
n, err := file.ReadAt(buf[off:], int64(testStr2Off+off))
assert.NoError(t, err)
require.NoError(t, err)
assert.True(t, n > 0)
off += n
}
Expand Down Expand Up @@ -362,11 +362,11 @@ func TestFileReadDeadline(t *testing.T) {
file, err := client.Open("/_test/foo.txt")
require.NoError(t, err)

file.SetDeadline(time.Now().Add(100 * time.Millisecond))
file.SetDeadline(time.Now().Add(200 * time.Millisecond))
_, err = file.Read([]byte{0, 0})
assert.NoError(t, err)

time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
_, err = file.Read([]byte{0, 0})
assert.NotNil(t, err)
}
Expand Down
30 changes: 22 additions & 8 deletions file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
"github.com/colinmarc/hdfs/v2/internal/rpc"
"github.com/colinmarc/hdfs/v2/internal/transfer"
"github.com/golang/protobuf/proto"
)

Expand All @@ -19,7 +19,7 @@ type FileWriter struct {
replication int
blockSize int64

blockWriter *rpc.BlockWriter
blockWriter *transfer.BlockWriter
deadline time.Time
closed bool
}
Expand Down Expand Up @@ -112,14 +112,21 @@ func (c *Client) Append(name string) (*FileWriter, error) {
return f, nil
}

f.blockWriter = &rpc.BlockWriter{
dialFunc, err := f.client.wrapDatanodeDial(
f.client.options.DatanodeDialFunc,
block.GetBlockToken())
if err != nil {
return nil, err
}

f.blockWriter = &transfer.BlockWriter{
ClientName: f.client.namenode.ClientName,
Block: block,
BlockSize: f.blockSize,
Offset: int64(block.B.GetNumBytes()),
Append: true,
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: dialFunc,
}

err = f.blockWriter.SetDeadline(f.deadline)
Expand Down Expand Up @@ -176,7 +183,7 @@ func (f *FileWriter) Write(b []byte) (int, error) {
for off < len(b) {
n, err := f.blockWriter.Write(b[off:])
off += n
if err == rpc.ErrEndOfBlock {
if err == transfer.ErrEndOfBlock {
err = f.startNewBlock()
}

Expand Down Expand Up @@ -262,12 +269,19 @@ func (f *FileWriter) startNewBlock() error {
return &os.PathError{"create", f.name, interpretException(err)}
}

f.blockWriter = &rpc.BlockWriter{
block := addBlockResp.GetBlock()
dialFunc, err := f.client.wrapDatanodeDial(
f.client.options.DatanodeDialFunc, block.GetBlockToken())
if err != nil {
return err
}

f.blockWriter = &transfer.BlockWriter{
ClientName: f.client.namenode.ClientName,
Block: addBlockResp.GetBlock(),
Block: block,
BlockSize: f.blockSize,
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: dialFunc,
}

return f.blockWriter.SetDeadline(f.deadline)
Expand Down
4 changes: 4 additions & 0 deletions fixtures.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
set -e

# jdk11 is missing some APIs that the older jars here rely on
# so point at openjdk8 for now
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

HADOOP_FS=${HADOOP_FS-"hadoop fs"}
$HADOOP_FS -mkdir -p "/_test"
$HADOOP_FS -chmod 777 "/_test"
Expand Down
66 changes: 0 additions & 66 deletions internal/rpc/challenge.go

This file was deleted.

14 changes: 8 additions & 6 deletions internal/rpc/kerberos.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"net"
"regexp"

hadoop "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common"
"gopkg.in/jcmturner/gokrb5.v7/gssapi"
"gopkg.in/jcmturner/gokrb5.v7/iana/keyusage"
"gopkg.in/jcmturner/gokrb5.v7/spnego"
krbtypes "gopkg.in/jcmturner/gokrb5.v7/types"

hadoop "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common"
"github.com/colinmarc/hdfs/v2/internal/sasl"
)

const saslRpcCallId = -33
Expand Down Expand Up @@ -56,22 +58,22 @@ func (c *NamenodeConnection) doKerberosHandshake() error {
}

if tokenAuth != nil {
challenge, err := parseChallenge(tokenAuth)
challenge, err := sasl.ParseChallenge(tokenAuth.Challenge)
if err != nil {
return err
}

switch challenge.qop {
case qopPrivacy, qopIntegrity:
switch challenge.Qop {
case sasl.QopPrivacy, sasl.QopIntegrity:
// Switch to SASL RPC handler
c.transport = &saslTransport{
basicTransport: basicTransport{
clientID: c.ClientID,
},
sessionKey: sessionKey,
privacy: challenge.qop == qopPrivacy,
privacy: challenge.Qop == sasl.QopPrivacy,
}
case qopAuthentication:
case sasl.QopAuthentication:
// No special transport is required.
default:
return errors.New("unexpected QOP in challenge")
Expand Down
Loading

0 comments on commit 8ef8936

Please sign in to comment.