-
Notifications
You must be signed in to change notification settings - Fork 344
/
Copy pathnamenode.go
343 lines (288 loc) · 9.08 KB
/
namenode.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
package rpc
import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"
hadoop "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common"
hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
krb "github.com/jcmturner/gokrb5/v8/client"
"google.golang.org/protobuf/proto"
)
const (
rpcVersion byte = 0x09
serviceClass byte = 0x0
noneAuthProtocol byte = 0x0
saslAuthProtocol byte = 0xdf
protocolClass = "org.apache.hadoop.hdfs.protocol.ClientProtocol"
protocolClassVersion = 1
handshakeCallID = -3
standbyExceptionClass = "org.apache.hadoop.ipc.StandbyException"
)
const (
backoffDuration = 5 * time.Second
leaseRenewInterval = 1 * time.Second
)
// NamenodeConnection represents an open connection to a namenode.
type NamenodeConnection struct {
ClientID []byte
ClientName string
User string
currentRequestID int32
kerberosClient *krb.Client
kerberosServicePrincipleName string
kerberosRealm string
dialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
conn net.Conn
host *namenodeHost
hostList []*namenodeHost
transport transport
reqLock sync.Mutex
done chan struct{}
}
// NamenodeConnectionOptions represents the configurable options available
// for a NamenodeConnection.
type NamenodeConnectionOptions struct {
// Addresses specifies the namenode(s) to connect to.
Addresses []string
// User specifies which HDFS user the client will act as. It is required
// unless kerberos authentication is enabled, in which case it is overridden
// by the username set in KerberosClient.
User string
// DialFunc is used to connect to the namenodes. If nil, then
// (&net.Dialer{}).DialContext is used.
DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
// KerberosClient is used to connect to kerberized HDFS clusters. If provided,
// the NamenodeConnection will always mutually athenticate when connecting
// to the namenode(s).
KerberosClient *krb.Client
// KerberosServicePrincipleName specifiesthe Service Principle Name
// (<SERVICE>/<FQDN>) for the namenode(s). Like in the
// dfs.namenode.kerberos.principal property of core-site.xml, the special
// string '_HOST' can be substituted for the hostname in a multi-namenode
// setup (for example: 'nn/[email protected]'). It is required if
// KerberosClient is provided.
KerberosServicePrincipleName string
}
type namenodeHost struct {
address string
lastError error
lastErrorAt time.Time
}
// NewNamenodeConnectionWithOptions creates a new connection to a namenode with
// the given options and performs an initial handshake.
func NewNamenodeConnection(options NamenodeConnectionOptions) (*NamenodeConnection, error) {
// Build the list of hosts to be used for failover.
hostList := make([]*namenodeHost, len(options.Addresses))
for i, addr := range options.Addresses {
hostList[i] = &namenodeHost{address: addr}
}
var user, realm string
user = options.User
if options.KerberosClient != nil {
creds := options.KerberosClient.Credentials
user = creds.UserName()
realm = creds.Realm()
} else if user == "" {
return nil, errors.New("user not specified")
}
// The ClientID is reused here both in the RPC headers (which requires a
// "globally unique" ID) and as the "client name" in various requests.
clientId := newClientID()
c := &NamenodeConnection{
ClientID: clientId,
ClientName: "go-hdfs-" + string(clientId),
User: user,
kerberosClient: options.KerberosClient,
kerberosServicePrincipleName: options.KerberosServicePrincipleName,
kerberosRealm: realm,
dialFunc: options.DialFunc,
hostList: hostList,
transport: &basicTransport{clientID: clientId},
done: make(chan struct{}),
}
err := c.resolveConnection()
if err != nil {
return nil, err
}
// Periodically renew any file leases.
go c.renewLeases()
return c, nil
}
func (c *NamenodeConnection) resolveConnection() error {
if c.conn != nil {
return nil
}
var err error
if c.host != nil {
err = c.host.lastError
}
for _, host := range c.hostList {
if host.lastErrorAt.After(time.Now().Add(-backoffDuration)) {
continue
}
if c.dialFunc == nil {
c.dialFunc = (&net.Dialer{}).DialContext
}
c.host = host
c.conn, err = c.dialFunc(context.Background(), "tcp", host.address)
if err != nil {
c.markFailure(err)
continue
}
err = c.doNamenodeHandshake()
if err != nil {
c.markFailure(err)
continue
}
break
}
if c.conn == nil {
return fmt.Errorf("no available namenodes: %s", err)
}
return nil
}
func (c *NamenodeConnection) markFailure(err error) {
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.host.lastError = err
c.host.lastErrorAt = time.Now()
}
// Execute performs an rpc call. It does this by sending req over the wire and
// unmarshaling the result into resp.
func (c *NamenodeConnection) Execute(method string, req proto.Message, resp proto.Message) error {
c.reqLock.Lock()
defer c.reqLock.Unlock()
c.currentRequestID++
requestID := c.currentRequestID
for {
err := c.resolveConnection()
if err != nil {
return err
}
err = c.transport.writeRequest(c.conn, method, requestID, req)
if err != nil {
c.markFailure(err)
continue
}
err = c.transport.readResponse(c.conn, method, requestID, resp)
if err != nil {
// Only retry on a standby exception.
if nerr, ok := err.(*NamenodeError); ok && nerr.exception == standbyExceptionClass {
c.markFailure(err)
continue
}
return err
}
break
}
return nil
}
// A handshake packet:
// +-----------------------------------------------------------+
// | Header, 4 bytes ("hrpc") |
// +-----------------------------------------------------------+
// | Version, 1 byte (default verion 0x09) |
// +-----------------------------------------------------------+
// | RPC service class, 1 byte (0x00) |
// +-----------------------------------------------------------+
// | Auth protocol, 1 byte (Auth method None = 0x00) |
// +-----------------------------------------------------------+
//
// If the auth protocol is something other than 'none', the authentication
// handshake happens here. Otherwise, everything can be sent as one packet.
//
// +-----------------------------------------------------------+
// | uint32 length of the next two parts |
// +-----------------------------------------------------------+
// | varint length + RpcRequestHeaderProto |
// +-----------------------------------------------------------+
// | varint length + IpcConnectionContextProto |
// +-----------------------------------------------------------+
func (c *NamenodeConnection) doNamenodeHandshake() error {
authProtocol := noneAuthProtocol
kerberos := false
if c.kerberosClient != nil {
authProtocol = saslAuthProtocol
kerberos = true
}
rpcHeader := []byte{
0x68, 0x72, 0x70, 0x63, // "hrpc"
rpcVersion, serviceClass, authProtocol,
}
_, err := c.conn.Write(rpcHeader)
if err != nil {
return err
}
if kerberos {
err = c.doKerberosHandshake()
if err != nil {
return fmt.Errorf("SASL handshake: %s", err)
}
}
rrh := newRPCRequestHeader(handshakeCallID, c.ClientID)
cc := newConnectionContext(c.User, c.kerberosRealm)
packet, err := makeRPCPacket(rrh, cc)
if err != nil {
return err
}
_, err = c.conn.Write(packet)
return err
}
// renewLeases periodically renews all leases for the connection.
func (c *NamenodeConnection) renewLeases() {
ticker := time.NewTicker(leaseRenewInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
req := &hdfs.RenewLeaseRequestProto{ClientName: proto.String(c.ClientName)}
resp := &hdfs.RenewLeaseResponseProto{}
// Ignore any errors.
c.Execute("renewLease", req, resp)
case <-c.done:
return
}
}
}
// Close terminates all underlying socket connections to remote server.
func (c *NamenodeConnection) Close() error {
close(c.done)
// Ensure that we're not concurrently renewing leases.
c.reqLock.Lock()
defer c.reqLock.Unlock()
if c.conn != nil {
return c.conn.Close()
}
return nil
}
func newRPCRequestHeader(id int32, clientID []byte) *hadoop.RpcRequestHeaderProto {
return &hadoop.RpcRequestHeaderProto{
RpcKind: hadoop.RpcKindProto_RPC_PROTOCOL_BUFFER.Enum(),
RpcOp: hadoop.RpcRequestHeaderProto_RPC_FINAL_PACKET.Enum(),
CallId: proto.Int32(id),
ClientId: clientID,
}
}
func newRequestHeader(methodName string) *hadoop.RequestHeaderProto {
return &hadoop.RequestHeaderProto{
MethodName: proto.String(methodName),
DeclaringClassProtocolName: proto.String(protocolClass),
ClientProtocolVersion: proto.Uint64(uint64(protocolClassVersion)),
}
}
func newConnectionContext(user, kerberosRealm string) *hadoop.IpcConnectionContextProto {
if kerberosRealm != "" {
user = user + "@" + kerberosRealm
}
return &hadoop.IpcConnectionContextProto{
UserInfo: &hadoop.UserInformationProto{
EffectiveUser: proto.String(user),
},
Protocol: proto.String(protocolClass),
}
}