Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-9708
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Aug 24, 2022
2 parents c970950 + 4890ba5 commit 7a2d1e2
Show file tree
Hide file tree
Showing 54 changed files with 1,248 additions and 363 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ default int maxReadSizeForVectorReads() {
* As a result of the call, each range will have FileRange.setData(CompletableFuture)
* called with a future that when complete will have a ByteBuffer with the
* data from the file's range.
* <p>
* The position returned by getPos() after readVectored() is undefined.
* </p>
* <p>
* If a file is changed while the readVectored() operation is in progress, the output is
* undefined. Some ranges may have old data, some may have new and some may have both.
* </p>
* <p>
* While a readVectored() operation is in progress, normal read api calls may block.
* </p>
* @param ranges the byte ranges to read
* @param allocate the function to allocate ByteBuffer
* @throws IOException any IOE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface AlignmentContext {
void updateResponseState(RpcResponseHeaderProto.Builder header);

/**
* This is the intended client method call to implement to recieve state info
* This is the intended client method call to implement to receive state info
* during RPC response processing.
*
* @param header The RPC response header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
final Invoker invoker = new Invoker(protocol, connId, conf, factory);
ConnectionId connId, Configuration conf, SocketFactory factory,
AlignmentContext alignmentContext) throws IOException {
final Invoker invoker = new Invoker(protocol, connId, conf, factory, alignmentContext);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
}
Expand Down Expand Up @@ -126,7 +126,7 @@ public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, connId, conf,
factory)), false);
factory, null)), false);
}

protected static class Invoker implements RpcInvocationHandler {
Expand All @@ -147,9 +147,8 @@ protected Invoker(Class<?> protocol, InetSocketAddress addr,
throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
conf, factory);
conf, factory, alignmentContext);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
this.alignmentContext = alignmentContext;
}

/**
Expand All @@ -158,14 +157,16 @@ protected Invoker(Class<?> protocol, InetSocketAddress addr,
* @param connId input connId.
* @param conf input Configuration.
* @param factory input factory.
* @param alignmentContext Alignment context
*/
protected Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
Configuration conf, SocketFactory factory, AlignmentContext alignmentContext) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
this.protocolName = RPC.getProtocolName(protocol);
this.clientProtocolVersion = RPC
.getProtocolVersion(protocol);
this.alignmentContext = alignmentContext;
}

private RequestHeaderProto constructRpcRequestHeader(Method method) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ public <T> ProtocolProxy<T> getProxy(
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
final Invoker invoker = new Invoker(protocol, connId, conf, factory);
ConnectionId connId, Configuration conf, SocketFactory factory,
AlignmentContext alignmentContext) throws IOException {
final Invoker invoker = new Invoker(protocol, connId, conf, factory, alignmentContext);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
}
Expand Down Expand Up @@ -133,7 +133,7 @@ public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[]{protocol}, new Invoker(protocol, connId, conf,
factory)), false);
factory, null)), false);
}

protected static class Invoker implements RpcInvocationHandler {
Expand All @@ -154,9 +154,8 @@ protected Invoker(Class<?> protocol, InetSocketAddress addr,
throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
conf, factory);
conf, factory, alignmentContext);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
this.alignmentContext = alignmentContext;
}

/**
Expand All @@ -166,14 +165,16 @@ protected Invoker(Class<?> protocol, InetSocketAddress addr,
* @param connId input connId.
* @param conf input Configuration.
* @param factory input factory.
* @param alignmentContext Alignment context
*/
protected Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
Configuration conf, SocketFactory factory, AlignmentContext alignmentContext) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
this.protocolName = RPC.getProtocolName(protocol);
this.clientProtocolVersion = RPC
.getProtocolVersion(protocol);
this.alignmentContext = alignmentContext;
}

private RequestHeaderProto constructRpcRequestHeader(Method method) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,11 +558,32 @@ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion, ConnectionId connId, Configuration conf,
SocketFactory factory) throws IOException {
return getProtocolProxy(protocol, clientVersion, connId, conf,
factory, null);
}

/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server.
*
* @param <T> Generics Type T
* @param protocol protocol class
* @param clientVersion client's version
* @param connId client connection identifier
* @param conf configuration
* @param factory socket factory
* @param alignmentContext StateID alignment context
* @return the protocol proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion, ConnectionId connId, Configuration conf,
SocketFactory factory, AlignmentContext alignmentContext) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol, conf).getProxy(
protocol, clientVersion, connId, conf, factory);
protocol, clientVersion, connId, conf, factory, alignmentContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ <T> ProtocolProxy<T> getProxy(Class<T> protocol,
* @param connId input ConnectionId.
* @param conf input Configuration.
* @param factory input factory.
* @param alignmentContext Alignment context
* @throws IOException raised on errors performing I/O.
* @return ProtocolProxy.
*/
<T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
Client.ConnectionId connId, Configuration conf, SocketFactory factory)
Client.ConnectionId connId, Configuration conf, SocketFactory factory,
AlignmentContext alignmentContext)
throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ public static class Call implements Schedulable,
private volatile String detailedMetricsName = "";
final int callId; // the client's call id
final int retryCount; // the retry count of the call
long timestampNanos; // time the call was received
private final long timestampNanos; // time the call was received
long responseTimestampNanos; // time the call was served
private AtomicInteger responseWaitCount = new AtomicInteger(1);
final RPC.RpcKind rpcKind;
Expand Down Expand Up @@ -1107,6 +1107,10 @@ public void setDeferredResponse(Writable response) {

public void setDeferredError(Throwable t) {
}

public long getTimestampNanos() {
return timestampNanos;
}
}

/** A RPC extended call queued for handling. */
Expand Down Expand Up @@ -1188,7 +1192,7 @@ public Void run() throws Exception {

try {
value = call(
rpcKind, connection.protocolName, rpcRequest, timestampNanos);
rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
} catch (Throwable e) {
populateResponseParamsOnError(e, responseParams);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,18 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
* @param connId input ConnectionId.
* @param conf input Configuration.
* @param factory input factory.
* @param alignmentContext Alignment context
* @throws IOException raised on errors performing I/O.
* @return ProtocolProxy.
*/
@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
Client.ConnectionId connId, Configuration conf, SocketFactory factory)
Client.ConnectionId connId, Configuration conf, SocketFactory factory,
AlignmentContext alignmentContext)
throws IOException {
return getProxy(protocol, clientVersion, connId.getAddress(),
connId.getTicket(), conf, factory, connId.getRpcTimeout(),
connId.getRetryPolicy(), null, null);
connId.getRetryPolicy(), null, alignmentContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,10 @@ String getPassword(Configuration conf, String alias, String defaultPass) {
*/
@Override
public synchronized void destroy() {
if (trustManager != null) {
if (fileMonitoringTimer != null) {
fileMonitoringTimer.cancel();
}
if (trustManager != null) {
trustManager = null;
keyManagers = null;
trustManagers = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
optional RPCTraceInfoProto traceInfo = 6; // tracing info
optional RPCCallerContextProto callerContext = 7; // call context
optional int64 stateId = 8; // The last seen Global State ID
// Alignment context info for use with routers.
// The client should not interpret these bytes, but only forward bytes
// received from RpcResponseHeaderProto.routerFederatedState.
optional bytes routerFederatedState = 9;
}


Expand Down Expand Up @@ -157,6 +161,10 @@ message RpcResponseHeaderProto {
optional bytes clientId = 7; // Globally unique client ID
optional sint32 retryCount = 8 [default = -1];
optional int64 stateId = 9; // The last written Global State ID
// Alignment context info for use with routers.
// The client should not interpret these bytes, but only
// forward them to the router using RpcRequestHeaderProto.routerFederatedState.
optional bytes routerFederatedState = 10;
}

message RpcSaslProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,13 @@ Also, clients are encouraged to use `WeakReferencedElasticByteBufferPool` for
allocating buffers such that even direct buffers are garbage collected when
they are no longer referenced.

The position returned by `getPos()` after `readVectored()` is undefined.

If a file is changed while the `readVectored()` operation is in progress, the output is
undefined. Some ranges may have old data, some may have new, and some may have both.

While a `readVectored()` operation is in progress, normal read api calls may block.

Note: Don't use direct buffers for reading from ChecksumFileSystem as that may
lead to memory fragmentation explained in HADOOP-18296.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ public <T> ProtocolProxy<T> getProxy(

@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
ConnectionId connId, Configuration conf, SocketFactory factory)
ConnectionId connId, Configuration conf, SocketFactory factory,
AlignmentContext alignmentContext)
throws IOException {
throw new UnsupportedOperationException("This proxy is not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ protected static TestRpcService getClient(ConnectionId connId,
0,
connId,
clientConf,
NetUtils.getDefaultSocketFactory(clientConf)).getProxy();
NetUtils.getDefaultSocketFactory(clientConf),
null).getProxy();
} catch (IOException e) {
throw new ServiceException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;

/**
* This exception is thrown when can not get any mount point for the input path.
* RBF cannot forward any requests for the path.
*/
public class NoLocationException extends IOException {

private static final long serialVersionUID = 1L;

public NoLocationException(String path, Class<?> t) {
super("Cannot find locations for " + path + " in " + t.getSimpleName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ public ActiveNamenodeResolver getNamenodeResolver() {
/**
* Get the state store interface for the router heartbeats.
*
* @return FederationRouterStateStore state store API handle.
* @return RouterStore state store API handle.
*/
public RouterStore getRouterStateManager() {
if (this.routerStateManager == null && this.stateStore != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,19 +935,22 @@ public BatchedDirectoryListing getBatchedListing(String[] srcs,
public HdfsFileStatus getFileInfo(String src) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);

final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, false, false);
RemoteMethod method = new RemoteMethod("getFileInfo",
new Class<?>[] {String.class}, new RemoteParam());

HdfsFileStatus ret = null;
// If it's a directory, we check in all locations
if (rpcServer.isPathAll(src)) {
ret = getFileInfoAll(locations, method);
} else {
// Check for file information sequentially
ret = rpcClient.invokeSequential(
locations, method, HdfsFileStatus.class, null);
IOException noLocationException = null;
try {
final List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, false, false);
RemoteMethod method = new RemoteMethod("getFileInfo",
new Class<?>[] {String.class}, new RemoteParam());

// If it's a directory, we check in all locations
if (rpcServer.isPathAll(src)) {
ret = getFileInfoAll(locations, method);
} else {
// Check for file information sequentially
ret = rpcClient.invokeSequential(locations, method, HdfsFileStatus.class, null);
}
} catch (NoLocationException | RouterResolveException e) {
noLocationException = e;
}

// If there is no real path, check mount points
Expand All @@ -966,6 +969,12 @@ public HdfsFileStatus getFileInfo(String src) throws IOException {
}
}

// Can't find mount point for path and the path didn't contain any sub monit points,
// throw the NoLocationException to client.
if (ret == null && noLocationException != null) {
throw noLocationException;
}

return ret;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1731,8 +1731,7 @@ protected List<RemoteLocation> getLocationsForPath(String path,
final PathLocation location =
this.subclusterResolver.getDestinationForPath(path);
if (location == null) {
throw new IOException("Cannot find locations for " + path + " in " +
this.subclusterResolver.getClass().getSimpleName());
throw new NoLocationException(path, this.subclusterResolver.getClass());
}

// We may block some write operations
Expand Down
Loading

0 comments on commit 7a2d1e2

Please sign in to comment.