Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-11158-V3
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Nov 15, 2022
2 parents 829d87c + a48e8c9 commit 193fe06
Show file tree
Hide file tree
Showing 30 changed files with 2,112 additions and 776 deletions.
4 changes: 2 additions & 2 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ com.google.guava:guava:27.0-jre
com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
com.microsoft.azure:azure-storage:7.0.0
com.nimbusds:nimbus-jose-jwt:9.8.1
com.squareup.okhttp3:okhttp:4.9.3
com.squareup.okio:okio:1.6.0
com.squareup.okhttp3:okhttp:4.10.0
com.squareup.okio:okio:3.2.0
com.zaxxer:HikariCP:4.0.3
commons-beanutils:commons-beanutils:1.9.3
commons-cli:commons-cli:1.2
Expand Down
3 changes: 3 additions & 0 deletions hadoop-client-modules/hadoop-client-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
<!-- Leave javax APIs that are stable -->
<!-- the jdk ships part of the javax.annotation namespace, so if we want to relocate this we'll have to care it out by class :( -->
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>io.netty:*</exclude>
<exclude>io.dropwizard.metrics:metrics-core</exclude>
<exclude>org.eclipse.jetty:jetty-servlet</exclude>
<exclude>org.eclipse.jetty:jetty-security</exclude>
Expand All @@ -156,6 +157,8 @@
<exclude>org.bouncycastle:*</exclude>
<!-- Leave snappy that includes native methods which cannot be relocated. -->
<exclude>org.xerial.snappy:*</exclude>
<!-- leave out kotlin classes -->
<exclude>org.jetbrains.kotlin:*</exclude>
</excludes>
</artifactSet>
<filters>
Expand Down
5 changes: 5 additions & 0 deletions hadoop-common-project/hadoop-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okio</groupId>
<artifactId>okio-jvm</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>dnsjava</groupId>
<artifactId>dnsjava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ The following table lists the configuration property names that are deprecated i
| mapred.task.profile.params | mapreduce.task.profile.params |
| mapred.task.profile.reduces | mapreduce.task.profile.reduces |
| mapred.task.timeout | mapreduce.task.timeout |
| mapred.tasktracker.indexcache.mb | mapreduce.tasktracker.indexcache.mb |
| mapred.tasktracker.indexcache.mb | mapreduce.reduce.shuffle.indexcache.mb |
| mapreduce.tasktracker.indexcache.mb | mapreduce.reduce.shuffle.indexcache.mb |
| mapred.tasktracker.map.tasks.maximum | mapreduce.tasktracker.map.tasks.maximum |
| mapred.tasktracker.memory\_calculator\_plugin | mapreduce.tasktracker.resourcecalculatorplugin |
| mapred.tasktracker.memorycalculatorplugin | mapreduce.tasktracker.resourcecalculatorplugin |
Expand Down
10 changes: 10 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<exclusions>
<exclusion>
<groupId>com.squareup.okio</groupId>
<artifactId>okio-jvm</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.squareup.okio</groupId>
<artifactId>okio-jvm</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header)
*/
@Override
public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) {
long maxStateId = Long.max(poolLocalStateId.get(), sharedGlobalStateId.get());
header.setStateId(maxStateId);
header.setStateId(poolLocalStateId.get());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;


public class TestPoolAlignmentContext {
@Test
public void testNamenodeRequestsOnlyUsePoolLocalStateID() {
RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new Configuration());
String namespaceId = "namespace1";
routerStateIdContext.getNamespaceStateId(namespaceId).accumulate(20L);
PoolAlignmentContext poolContext1 = new PoolAlignmentContext(routerStateIdContext, namespaceId);
PoolAlignmentContext poolContext2 = new PoolAlignmentContext(routerStateIdContext, namespaceId);

assertRequestHeaderStateId(poolContext1, Long.MIN_VALUE);
assertRequestHeaderStateId(poolContext2, Long.MIN_VALUE);
Assertions.assertEquals(20L, poolContext1.getLastSeenStateId());
Assertions.assertEquals(20L, poolContext2.getLastSeenStateId());

poolContext1.advanceClientStateId(30L);
assertRequestHeaderStateId(poolContext1, 30L);
assertRequestHeaderStateId(poolContext2, Long.MIN_VALUE);
Assertions.assertEquals(20L, poolContext1.getLastSeenStateId());
Assertions.assertEquals(20L, poolContext2.getLastSeenStateId());
}

private void assertRequestHeaderStateId(PoolAlignmentContext poolAlignmentContext,
Long expectedValue) {
RpcRequestHeaderProto.Builder builder = RpcRequestHeaderProto.newBuilder();
poolAlignmentContext.updateRequestState(builder);
Assertions.assertEquals(expectedValue, builder.getStateId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ public void addVolume(final StorageLocation location,

for (final NamespaceInfo nsInfo : nsInfos) {
String bpid = nsInfo.getBlockPoolID();
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try {
fsVolume.addBlockPool(bpid, this.conf, this.timer);
fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,7 +43,7 @@ class IndexCache {
public IndexCache(JobConf conf) {
this.conf = conf;
totalMemoryAllowed =
conf.getInt(TTConfig.TT_INDEX_CACHE, 10) * 1024 * 1024;
conf.getInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 10) * 1024 * 1024;
LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ public interface MRJobConfig {
public static final String MAX_SHUFFLE_FETCH_HOST_FAILURES = "mapreduce.reduce.shuffle.max-host-failures";
public static final int DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES = 5;

public static final String SHUFFLE_INDEX_CACHE = "mapreduce.reduce.shuffle.indexcache.mb";

public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";

public static final String REDUCE_SKIP_MAXGROUPS = "mapreduce.reduce.skip.maxgroups";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
@InterfaceStability.Evolving
public interface TTConfig extends MRConfig {

/**
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.MRJobConfig#SHUFFLE_INDEX_CACHE}
* instead
*/
@Deprecated
public static final String TT_INDEX_CACHE =
"mapreduce.tasktracker.indexcache.mb";
public static final String TT_MAP_SLOTS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@

import org.apache.hadoop.classification.VisibleForTesting;

class Fetcher<K,V> extends Thread {
@VisibleForTesting
public class Fetcher<K, V> extends Thread {

private static final Logger LOG = LoggerFactory.getLogger(Fetcher.class);

/** Number of ms before timing out a copy */
/** Number of ms before timing out a copy. */
private static final int DEFAULT_STALLED_COPY_TIMEOUT = 3 * 60 * 1000;

/** Basic/unit connection timeout (in milliseconds) */
/** Basic/unit connection timeout (in milliseconds). */
private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;

/* Default read timeout (in milliseconds) */
Expand All @@ -72,19 +73,21 @@ class Fetcher<K,V> extends Thread {
private static final String FETCH_RETRY_AFTER_HEADER = "Retry-After";

protected final Reporter reporter;
private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
@VisibleForTesting
public enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
CONNECTION, WRONG_REDUCE}

private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";

@VisibleForTesting
public final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
private final JobConf jobConf;
private final Counters.Counter connectionErrs;
private final Counters.Counter ioErrs;
private final Counters.Counter wrongLengthErrs;
private final Counters.Counter badIdErrs;
private final Counters.Counter wrongMapErrs;
private final Counters.Counter wrongReduceErrs;
protected final MergeManager<K,V> merger;
protected final ShuffleSchedulerImpl<K,V> scheduler;
protected final MergeManager<K, V> merger;
protected final ShuffleSchedulerImpl<K, V> scheduler;
protected final ShuffleClientMetrics metrics;
protected final ExceptionReporter exceptionReporter;
protected final int id;
Expand All @@ -111,7 +114,7 @@ private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
private static SSLFactory sslFactory;

public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleSchedulerImpl<K,V> scheduler, MergeManager<K,V> merger,
ShuffleSchedulerImpl<K, V> scheduler, MergeManager<K, V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter, SecretKey shuffleKey) {
this(job, reduceId, scheduler, merger, reporter, metrics,
Expand All @@ -120,7 +123,7 @@ public Fetcher(JobConf job, TaskAttemptID reduceId,

@VisibleForTesting
Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleSchedulerImpl<K,V> scheduler, MergeManager<K,V> merger,
ShuffleSchedulerImpl<K, V> scheduler, MergeManager<K, V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter, SecretKey shuffleKey,
int id) {
Expand Down Expand Up @@ -315,9 +318,8 @@ protected void copyFromHost(MapHost host) throws IOException {
return;
}

if(LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+ maps);
if (LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: " + maps);
}

// List of maps to be fetched yet
Expand Down Expand Up @@ -411,8 +413,8 @@ private void openConnectionWithRetry(URL url) throws IOException {
shouldWait = false;
} catch (IOException e) {
if (!fetchRetryEnabled) {
// throw exception directly if fetch's retry is not enabled
throw e;
// throw exception directly if fetch's retry is not enabled
throw e;
}
if ((Time.monotonicNow() - startTime) >= this.fetchRetryTimeout) {
LOG.warn("Failed to connect to host: " + url + "after "
Expand Down Expand Up @@ -489,7 +491,7 @@ private TaskAttemptID[] copyMapOutput(MapHost host,
DataInputStream input,
Set<TaskAttemptID> remaining,
boolean canRetry) throws IOException {
MapOutput<K,V> mapOutput = null;
MapOutput<K, V> mapOutput = null;
TaskAttemptID mapId = null;
long decompressedLength = -1;
long compressedLength = -1;
Expand Down Expand Up @@ -611,7 +613,7 @@ private void checkTimeoutOrRetry(MapHost host, IOException ioe)
// First time to retry.
long currentTime = Time.monotonicNow();
if (retryStartTime == 0) {
retryStartTime = currentTime;
retryStartTime = currentTime;
}

// Retry is not timeout, let's do retry with throwing an exception.
Expand All @@ -628,7 +630,7 @@ private void checkTimeoutOrRetry(MapHost host, IOException ioe)
}

/**
* Do some basic verification on the input received -- Being defensive
* Do some basic verification on the input received -- Being defensive.
* @param compressedLength
* @param decompressedLength
* @param forReduce
Expand Down Expand Up @@ -695,8 +697,7 @@ private URL getMapOutputURL(MapHost host, Collection<TaskAttemptID> maps
* only on the last failure. Instead of connecting with a timeout of
* X, we try connecting with a timeout of x < X but multiple times.
*/
private void connect(URLConnection connection, int connectionTimeout)
throws IOException {
private void connect(URLConnection connection, int connectionTimeout) throws IOException {
int unit = 0;
if (connectionTimeout < 0) {
throw new IOException("Invalid timeout "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,4 +879,9 @@ public int compareTo(Path obj) {
return super.compareTo(obj);
}
}

@VisibleForTesting
OnDiskMerger getOnDiskMerger() {
return onDiskMerger;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -109,4 +110,14 @@ public void run() {
}

public abstract void merge(List<T> inputs) throws IOException;

@VisibleForTesting
int getMergeFactor() {
return mergeFactor;
}

@VisibleForTesting
LinkedList<List<T>> getPendingToBeMerged() {
return pendingToBeMerged;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ private static void addDeprecatedKeys() {
JTConfig.JT_TASKCACHE_LEVELS),
new DeprecationDelta("mapred.job.tracker.retire.jobs",
JTConfig.JT_RETIREJOBS),
new DeprecationDelta("mapred.tasktracker.indexcache.mb",
TTConfig.TT_INDEX_CACHE),
new DeprecationDelta("mapred.tasktracker.map.tasks.maximum",
TTConfig.TT_MAP_SLOTS),
new DeprecationDelta("mapred.tasktracker.memory_calculator_plugin",
Expand Down Expand Up @@ -290,6 +288,10 @@ private static void addDeprecatedKeys() {
MRJobConfig.REDUCE_LOG_LEVEL),
new DeprecationDelta("mapreduce.job.counters.limit",
MRJobConfig.COUNTERS_MAX_KEY),
new DeprecationDelta("mapred.tasktracker.indexcache.mb",
MRJobConfig.SHUFFLE_INDEX_CACHE),
new DeprecationDelta("mapreduce.tasktracker.indexcache.mb",
MRJobConfig.SHUFFLE_INDEX_CACHE),
new DeprecationDelta("jobclient.completion.poll.interval",
Job.COMPLETION_POLL_INTERVAL_KEY),
new DeprecationDelta("jobclient.progress.monitor.poll.interval",
Expand Down
Loading

0 comments on commit 193fe06

Please sign in to comment.