From 847cf34fde925e8fb0193c0c133430dbd369c57c Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Mon, 4 Dec 2023 17:54:23 +0800 Subject: [PATCH 1/4] HDFS-17273. Change the way of computing some local variables duration for better debugging --- .../org/apache/hadoop/hdfs/DataStreamer.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index d92f5943fd8a2..978a6f69da9f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -539,6 +539,8 @@ boolean doWaitForRestart() { private final String[] favoredNodes; private final EnumSet addBlockFlags; + private static final long NANOSECONDS_PER_MILLISECOND = 1000000; + private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, @@ -758,7 +760,7 @@ public void run() { scope = null; dataQueue.removeFirst(); ackQueue.addLast(one); - packetSendTime.put(one.getSeqno(), Time.monotonicNow()); + packetSendTime.put(one.getSeqno(), Time.monotonicNowNanos()); dataQueue.notifyAll(); } } @@ -924,7 +926,7 @@ void waitForAckedSeqno(long seqno) throws IOException { dnodes = nodes != null ? nodes.length : 3; } int writeTimeout = dfsClient.getDatanodeWriteTimeout(dnodes); - long begin = Time.monotonicNow(); + long begin = Time.monotonicNowNanos(); try { synchronized (dataQueue) { while (!streamerClosed) { @@ -934,14 +936,14 @@ void waitForAckedSeqno(long seqno) throws IOException { } try { dataQueue.wait(1000); // when we receive an ack, we notify on - long duration = Time.monotonicNow() - begin; - if (duration > writeTimeout) { + long duration = Time.monotonicNowNanos() - begin; + if (duration > writeTimeout * NANOSECONDS_PER_MILLISECOND) { LOG.error("No ack received, took {}ms (threshold={}ms). " + "File being written: {}, block: {}, " + "Write pipeline datanodes: {}.", - duration, writeTimeout, src, block, nodes); + duration / NANOSECONDS_PER_MILLISECOND , writeTimeout, src, block, nodes); throw new InterruptedIOException("No ack received after " + - duration / 1000 + "s and a timeout of " + + duration / 1000 / NANOSECONDS_PER_MILLISECOND + "s and a timeout of " + writeTimeout / 1000 + "s"); } // dataQueue @@ -955,11 +957,11 @@ void waitForAckedSeqno(long seqno) throws IOException { } catch (ClosedChannelException cce) { LOG.debug("Closed channel exception", cce); } - long duration = Time.monotonicNow() - begin; - if (duration > dfsclientSlowLogThresholdMs) { + long duration = Time.monotonicNowNanos() - begin; + if (duration > dfsclientSlowLogThresholdMs * NANOSECONDS_PER_MILLISECOND) { LOG.warn("Slow waitForAckedSeqno took {}ms (threshold={}ms). File being" + " written: {}, block: {}, Write pipeline datanodes: {}.", - duration, dfsclientSlowLogThresholdMs, src, block, nodes); + duration / NANOSECONDS_PER_MILLISECOND, dfsclientSlowLogThresholdMs, src, block, nodes); } } } @@ -1150,10 +1152,10 @@ public void run() { if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) { Long begin = packetSendTime.get(ack.getSeqno()); if (begin != null) { - long duration = Time.monotonicNow() - begin; - if (duration > dfsclientSlowLogThresholdMs) { + long duration = Time.monotonicNowNanos() - begin; + if (duration > dfsclientSlowLogThresholdMs * NANOSECONDS_PER_MILLISECOND) { LOG.info("Slow ReadProcessor read fields for block " + block - + " took " + duration + "ms (threshold=" + + " took " + duration / NANOSECONDS_PER_MILLISECOND + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " + ack + ", targets: " + Arrays.asList(targets)); } From 6a315328d4e208039a2c008b42e14175516bf9ab Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Mon, 4 Dec 2023 22:14:02 +0800 Subject: [PATCH 2/4] fix checkstyle --- .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 978a6f69da9f7..547d630c053d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -941,7 +941,7 @@ void waitForAckedSeqno(long seqno) throws IOException { LOG.error("No ack received, took {}ms (threshold={}ms). " + "File being written: {}, block: {}, " + "Write pipeline datanodes: {}.", - duration / NANOSECONDS_PER_MILLISECOND , writeTimeout, src, block, nodes); + duration / NANOSECONDS_PER_MILLISECOND, writeTimeout, src, block, nodes); throw new InterruptedIOException("No ack received after " + duration / 1000 / NANOSECONDS_PER_MILLISECOND + "s and a timeout of " + writeTimeout / 1000 + "s"); From 75bd7b0542c95b098ad4ee6dfd2b592180024d56 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Fri, 8 Dec 2023 15:57:08 +0800 Subject: [PATCH 3/4] use lib funtion. --- .../org/apache/hadoop/hdfs/DataStreamer.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 547d630c053d7..8f8f394aa2e1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -539,8 +539,6 @@ boolean doWaitForRestart() { private final String[] favoredNodes; private final EnumSet addBlockFlags; - private static final long NANOSECONDS_PER_MILLISECOND = 1000000; - private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, @@ -937,13 +935,13 @@ void waitForAckedSeqno(long seqno) throws IOException { try { dataQueue.wait(1000); // when we receive an ack, we notify on long duration = Time.monotonicNowNanos() - begin; - if (duration > writeTimeout * NANOSECONDS_PER_MILLISECOND) { + if (TimeUnit.NANOSECONDS.toMillis(duration) > writeTimeout) { LOG.error("No ack received, took {}ms (threshold={}ms). " + "File being written: {}, block: {}, " + "Write pipeline datanodes: {}.", - duration / NANOSECONDS_PER_MILLISECOND, writeTimeout, src, block, nodes); + TimeUnit.NANOSECONDS.toMillis(duration), writeTimeout, src, block, nodes); throw new InterruptedIOException("No ack received after " + - duration / 1000 / NANOSECONDS_PER_MILLISECOND + "s and a timeout of " + + TimeUnit.NANOSECONDS.toSeconds(duration) + "s and a timeout of " + writeTimeout / 1000 + "s"); } // dataQueue @@ -958,10 +956,10 @@ void waitForAckedSeqno(long seqno) throws IOException { LOG.debug("Closed channel exception", cce); } long duration = Time.monotonicNowNanos() - begin; - if (duration > dfsclientSlowLogThresholdMs * NANOSECONDS_PER_MILLISECOND) { + if (TimeUnit.NANOSECONDS.toMillis(duration) > dfsclientSlowLogThresholdMs) { LOG.warn("Slow waitForAckedSeqno took {}ms (threshold={}ms). File being" + " written: {}, block: {}, Write pipeline datanodes: {}.", - duration / NANOSECONDS_PER_MILLISECOND, dfsclientSlowLogThresholdMs, src, block, nodes); + TimeUnit.NANOSECONDS.toMillis(duration), dfsclientSlowLogThresholdMs, src, block, nodes); } } } @@ -1153,9 +1151,9 @@ public void run() { Long begin = packetSendTime.get(ack.getSeqno()); if (begin != null) { long duration = Time.monotonicNowNanos() - begin; - if (duration > dfsclientSlowLogThresholdMs * NANOSECONDS_PER_MILLISECOND) { + if (TimeUnit.NANOSECONDS.toMillis(duration) > dfsclientSlowLogThresholdMs) { LOG.info("Slow ReadProcessor read fields for block " + block - + " took " + duration / NANOSECONDS_PER_MILLISECOND + "ms (threshold=" + + " took " + TimeUnit.NANOSECONDS.toMillis(duration) + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " + ack + ", targets: " + Arrays.asList(targets)); } From 759c5e7cc692eaa8ec16c4801e49097d80deeb36 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Fri, 8 Dec 2023 20:54:58 +0800 Subject: [PATCH 4/4] fix checkstyle --- .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 8f8f394aa2e1b..c7f4f00a550d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -959,7 +959,8 @@ void waitForAckedSeqno(long seqno) throws IOException { if (TimeUnit.NANOSECONDS.toMillis(duration) > dfsclientSlowLogThresholdMs) { LOG.warn("Slow waitForAckedSeqno took {}ms (threshold={}ms). File being" + " written: {}, block: {}, Write pipeline datanodes: {}.", - TimeUnit.NANOSECONDS.toMillis(duration), dfsclientSlowLogThresholdMs, src, block, nodes); + TimeUnit.NANOSECONDS.toMillis(duration), dfsclientSlowLogThresholdMs, + src, block, nodes); } } }