Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17273. Improve local variables duration of DataStreamer for better debugging. #6321

Merged
merged 4 commits into from
Dec 24, 2023
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ public void run() {
scope = null;
dataQueue.removeFirst();
ackQueue.addLast(one);
packetSendTime.put(one.getSeqno(), Time.monotonicNow());
packetSendTime.put(one.getSeqno(), Time.monotonicNowNanos());
dataQueue.notifyAll();
}
}
Expand Down Expand Up @@ -924,7 +924,7 @@ void waitForAckedSeqno(long seqno) throws IOException {
dnodes = nodes != null ? nodes.length : 3;
}
int writeTimeout = dfsClient.getDatanodeWriteTimeout(dnodes);
long begin = Time.monotonicNow();
long begin = Time.monotonicNowNanos();
try {
synchronized (dataQueue) {
while (!streamerClosed) {
Expand All @@ -934,14 +934,14 @@ void waitForAckedSeqno(long seqno) throws IOException {
}
try {
dataQueue.wait(1000); // when we receive an ack, we notify on
long duration = Time.monotonicNow() - begin;
if (duration > writeTimeout) {
long duration = Time.monotonicNowNanos() - begin;
hfutatzhanghb marked this conversation as resolved.
Show resolved Hide resolved
if (TimeUnit.NANOSECONDS.toMillis(duration) > writeTimeout) {
LOG.error("No ack received, took {}ms (threshold={}ms). "
+ "File being written: {}, block: {}, "
+ "Write pipeline datanodes: {}.",
duration, writeTimeout, src, block, nodes);
TimeUnit.NANOSECONDS.toMillis(duration), writeTimeout, src, block, nodes);
throw new InterruptedIOException("No ack received after " +
duration / 1000 + "s and a timeout of " +
TimeUnit.NANOSECONDS.toSeconds(duration) + "s and a timeout of " +
writeTimeout / 1000 + "s");
}
// dataQueue
Expand All @@ -955,11 +955,11 @@ void waitForAckedSeqno(long seqno) throws IOException {
} catch (ClosedChannelException cce) {
LOG.debug("Closed channel exception", cce);
}
long duration = Time.monotonicNow() - begin;
if (duration > dfsclientSlowLogThresholdMs) {
long duration = Time.monotonicNowNanos() - begin;
hfutatzhanghb marked this conversation as resolved.
Show resolved Hide resolved
if (TimeUnit.NANOSECONDS.toMillis(duration) > dfsclientSlowLogThresholdMs) {
LOG.warn("Slow waitForAckedSeqno took {}ms (threshold={}ms). File being"
+ " written: {}, block: {}, Write pipeline datanodes: {}.",
duration, dfsclientSlowLogThresholdMs, src, block, nodes);
TimeUnit.NANOSECONDS.toMillis(duration), dfsclientSlowLogThresholdMs, src, block, nodes);
}
}
}
Expand Down Expand Up @@ -1150,10 +1150,10 @@ public void run() {
if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
Long begin = packetSendTime.get(ack.getSeqno());
if (begin != null) {
long duration = Time.monotonicNow() - begin;
if (duration > dfsclientSlowLogThresholdMs) {
long duration = Time.monotonicNowNanos() - begin;
hfutatzhanghb marked this conversation as resolved.
Show resolved Hide resolved
if (TimeUnit.NANOSECONDS.toMillis(duration) > dfsclientSlowLogThresholdMs) {
LOG.info("Slow ReadProcessor read fields for block " + block
+ " took " + duration + "ms (threshold="
+ " took " + TimeUnit.NANOSECONDS.toMillis(duration) + "ms (threshold="
+ dfsclientSlowLogThresholdMs + "ms); ack: " + ack
+ ", targets: " + Arrays.asList(targets));
}
Expand Down