Skip to content

Commit

Permalink
[Enhancement](multi-catalog) Set hdfs native client logger to glog an…
Browse files Browse the repository at this point in the history
…d redirect jvm stdout/stderr logger to jni.log. (#41632)

Backport #39540.

Co-authored-by: Mingyu Chen <[email protected]>
  • Loading branch information
kaka11chen and morningman authored Oct 13, 2024
1 parent c3740ba commit 0513e41
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 4 deletions.
61 changes: 61 additions & 0 deletions be/src/io/hdfs_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,68 @@

namespace doris {

#ifdef USE_HADOOP_HDFS
void err_log_message(const char* fmt, ...) {
va_list args;
va_start(args, fmt);

// First, call vsnprintf to get the required buffer size
int size = vsnprintf(nullptr, 0, fmt, args) + 1; // +1 for '\0'
if (size <= 0) {
LOG(ERROR) << "Error formatting log message, invalid size";
va_end(args);
return;
}

va_end(args);
va_start(args, fmt); // Reinitialize va_list

// Allocate a buffer and format the string into it
std::vector<char> buffer(size);
vsnprintf(buffer.data(), size, fmt, args);

va_end(args);

// Use glog to log the message
LOG(ERROR) << buffer.data();
}

void va_err_log_message(const char* fmt, va_list ap) {
va_list args_copy;
va_copy(args_copy, ap);

// Call vsnprintf to get the required buffer size
int size = vsnprintf(nullptr, 0, fmt, args_copy) + 1; // +1 for '\0'
va_end(args_copy); // Release the copied va_list

if (size <= 0) {
LOG(ERROR) << "Error formatting log message, invalid size";
return;
}

// Reinitialize va_list for the second vsnprintf call
va_copy(args_copy, ap);

// Allocate a buffer and format the string into it
std::vector<char> buffer(size);
vsnprintf(buffer.data(), size, fmt, args_copy);

va_end(args_copy);

// Use glog to log the message
LOG(ERROR) << buffer.data();
}

struct hdfsLogger logger = {.errLogMessage = err_log_message,
.vaErrLogMessage = va_err_log_message};
#endif // #ifdef USE_HADOOP_HDFS

Status HDFSCommonBuilder::init_hdfs_builder() {
#ifdef USE_HADOOP_HDFS
static std::once_flag flag;
std::call_once(flag, []() { hdfsSetLogger(&logger); });
#endif // #ifdef USE_HADOOP_HDFS

hdfs_builder = hdfsNewBuilder();
if (hdfs_builder == nullptr) {
LOG(INFO) << "failed to init HDFSCommonBuilder, please check check be/conf/hdfs-site.xml";
Expand Down
4 changes: 2 additions & 2 deletions bin/start_be.sh
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ fi

for var in http_proxy HTTP_PROXY https_proxy HTTPS_PROXY; do
if [[ -n ${!var} ]]; then
log "env '${var}' = '${!var}', need unset it using 'unset ${var}'"
echo "env '${var}' = '${!var}', need unset it using 'unset ${var}'"
exit 1
fi
done
Expand Down Expand Up @@ -354,7 +354,7 @@ set_tcmalloc_heap_limit() {
fi

if [[ "${mem_limit_mb}" -gt "${total_mem_mb}" ]]; then
log "mem_limit is larger than the total memory of the server. ${mem_limit_mb} > ${total_mem_mb}"
echo "mem_limit is larger than the total memory of the server. ${mem_limit_mb} > ${total_mem_mb}"
return 1
fi
export TCMALLOC_HEAP_LIMIT_MB=${mem_limit_mb}
Expand Down
4 changes: 2 additions & 2 deletions bin/start_fe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ java_version="$(
)"
if [[ "${java_version}" -eq 17 ]]; then
if [[ -z "${JAVA_OPTS_FOR_JDK_17}" ]]; then
log "JAVA_OPTS_FOR_JDK_17 is not set in fe.conf"
echo "JAVA_OPTS_FOR_JDK_17 is not set in fe.conf"
exit 1
fi
final_java_opt="${JAVA_OPTS_FOR_JDK_17}"
else
log "ERROR: The jdk_version is ${java_version}, must be 17."
echo "ERROR: The jdk_version is ${java_version}, must be 17."
exit 1
fi
log "Using Java version ${java_version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
package org.apache.doris.common.classloader;

import org.apache.doris.common.jni.utils.ExpiringMap;
import org.apache.doris.common.jni.utils.Log4jOutputStream;
import org.apache.doris.common.jni.utils.UdfClassCache;

import com.google.common.collect.Streams;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
Expand Down Expand Up @@ -54,6 +57,7 @@ public class ScannerLoader {
* Load all classes from $DORIS_HOME/lib/java_extensions/*
*/
public void loadAllScannerJars() {
redirectStdStreamsToLog4j();
String basePath = System.getenv("DORIS_HOME");
File library = new File(basePath, "/lib/java_extensions/");
// TODO: add thread pool to load each scanner
Expand All @@ -66,6 +70,16 @@ public void loadAllScannerJars() {
});
}

private void redirectStdStreamsToLog4j() {
Logger outLogger = Logger.getLogger("stdout");
PrintStream logPrintStream = new PrintStream(new Log4jOutputStream(outLogger, Level.INFO));
System.setOut(logPrintStream);

Logger errLogger = Logger.getLogger("stderr");
PrintStream errorPrintStream = new PrintStream(new Log4jOutputStream(errLogger, Level.ERROR));
System.setErr(errorPrintStream);
}

public static UdfClassCache getUdfClassLoader(String functionSignature) {
return udfLoadedClasses.get(functionSignature);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.jni.utils;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import java.io.OutputStream;

public class Log4jOutputStream extends OutputStream {
private final Logger logger;
private final StringBuilder buffer = new StringBuilder();
private final Level level;

public Log4jOutputStream(Logger logger, Level level) {
this.logger = logger;
this.level = level;
}

@Override
public void write(int b) {
if (b == '\n') {
logger.log(level, buffer.toString());
buffer.setLength(0);
} else {
buffer.append((char) b);
}
}
}

0 comments on commit 0513e41

Please sign in to comment.