Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](auditlog) add missing audit log fields and duplicate audit log error #42262

Merged
merged 8 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,23 @@ public class InternalSchema {
AUDIT_SCHEMA.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
AUDIT_SCHEMA
.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
AUDIT_SCHEMA
.add(new ColumnDef("shuffle_send_rows", TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
AUDIT_SCHEMA
.add(new ColumnDef("shuffle_send_bytes", TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
AUDIT_SCHEMA
.add(new ColumnDef("scan_bytes_from_local_storage", TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
AUDIT_SCHEMA
.add(new ColumnDef("scan_bytes_from_remote_storage", TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
AUDIT_SCHEMA.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
AUDIT_SCHEMA.add(new ColumnDef("stmt_type", TypeDef.createVarchar(48), ColumnNullableType.NULLABLE));
AUDIT_SCHEMA.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), ColumnNullableType.NULLABLE));
AUDIT_SCHEMA.add(
new ColumnDef("is_nereids", TypeDef.create(PrimitiveType.TINYINT), ColumnNullableType.NULLABLE));
AUDIT_SCHEMA.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), ColumnNullableType.NULLABLE));
AUDIT_SCHEMA
.add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
Expand All @@ -151,6 +165,9 @@ public class InternalSchema {
new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
AUDIT_SCHEMA.add(
new ColumnDef("workload_group", TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE));
AUDIT_SCHEMA.add(
new ColumnDef("compute_group", TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE));
// Keep stmt as last column. So that in fe.audit.log, it will be easier to get sql string
AUDIT_SCHEMA.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.trees.plans.PlanType;
Expand Down Expand Up @@ -63,4 +64,16 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
public StmtType stmtType() {
return StmtType.CALL;
}

@Override
public RedirectStatus toRedirectStatus() {
// Some of call statements may need to be redirected, some may not
String funcName = unboundFunction.getName().toUpperCase();
switch (funcName) {
case "FLUSH_AUDIT_LOG":
return RedirectStatus.NO_FORWARD;
default:
return RedirectStatus.FORWARD_WITH_SYNC;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.call;

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Expression;

import java.util.List;

/**
* call flush_audit_log()
* This will flush audit log immediately to the audit_log table.
* Mainly for test cases, so that we don't need to wait 60 sec to flush the audit log.
*/
public class CallFlushAuditLogFunc extends CallFunc {

private UserIdentity user;

private CallFlushAuditLogFunc(UserIdentity user) {
this.user = user;
}

public static CallFunc create(UserIdentity user, List<Expression> args) {
if (!args.isEmpty()) {
throw new AnalysisException("FLUSH_AUDIT_LOG function requires no parameter");
}
return new CallFlushAuditLogFunc(user);
}

@Override
public void run() {
// check priv
if (!Env.getCurrentEnv().getAuth().checkGlobalPriv(user, PrivPredicate.ADMIN)) {
throw new AnalysisException("Only admin can flush audit log");
}
// flush audit log
Env.getCurrentEnv().getPluginMgr().flushAuditLog();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public static CallFunc getFunc(ConnectContext ctx, UserIdentity user, UnboundFun
// TODO, built-in functions require a separate management
case "EXECUTE_STMT": // Call built-in functions first
return CallExecuteStmtFunc.create(user, unboundFunction.getArguments());
case "FLUSH_AUDIT_LOG":
return CallFlushAuditLogFunc.create(user, unboundFunction.getArguments());
default:
return CallProcedure.create(ctx, originSql);
}
Expand Down
14 changes: 6 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ public enum EventType {
public String queryId = "";
@AuditField(value = "IsQuery")
public boolean isQuery = false;
@AuditField(value = "isNereids")
@AuditField(value = "IsNereids")
public boolean isNereids = false;
@AuditField(value = "feIp")
@AuditField(value = "FeIp")
public String feIp = "";
@AuditField(value = "StmtType")
public String stmtType = "";
Expand All @@ -96,22 +96,20 @@ public enum EventType {
public long shuffleSendRows = -1;
@AuditField(value = "SqlHash")
public String sqlHash = "";
@AuditField(value = "peakMemoryBytes")
@AuditField(value = "PeakMemoryBytes")
public long peakMemoryBytes = -1;
@AuditField(value = "SqlDigest")
public String sqlDigest = "";
@AuditField(value = "cloudClusterName")
@AuditField(value = "ComputeGroupName")
public String cloudClusterName = "";
@AuditField(value = "TraceId")
public String traceId = "";
@AuditField(value = "WorkloadGroup")
public String workloadGroup = "";
// note: newly added fields should be always before fuzzyVariables
@AuditField(value = "FuzzyVariables")
public String fuzzyVariables = "";
@AuditField(value = "scanBytesFromLocalStorage")
@AuditField(value = "ScanBytesFromLocalStorage")
public long scanBytesFromLocalStorage = -1;
@AuditField(value = "scanBytesFromRemoteStorage")
@AuditField(value = "ScanBytesFromRemoteStorage")
public long scanBytesFromRemoteStorage = -1;

public long pushToAuditLogQueueTime;
Expand Down
13 changes: 11 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class PluginMgr implements Writable {
// all dynamic plugins should have unique names,
private final Set<String> dynamicPluginNames;

// Save this handler for external call
private AuditLoader auditLoader = null;

public PluginMgr() {
plugins = new Map[PluginType.MAX_PLUGIN_TYPE_SIZE];
for (int i = 0; i < PluginType.MAX_PLUGIN_TYPE_SIZE; i++) {
Expand Down Expand Up @@ -113,8 +116,8 @@ private void initBuiltinPlugins() {
}

// AuditLoader: log audit log to internal table
AuditLoader auditLoaderPlugin = new AuditLoader();
if (!registerBuiltinPlugin(auditLoaderPlugin.getPluginInfo(), auditLoaderPlugin)) {
this.auditLoader = new AuditLoader();
if (!registerBuiltinPlugin(auditLoader.getPluginInfo(), auditLoader)) {
LOG.warn("failed to register audit log builder");
}

Expand Down Expand Up @@ -363,6 +366,12 @@ public List<List<String>> getPluginShowInfos() {
return rows;
}

public void flushAuditLog() {
if (auditLoader != null) {
auditLoader.loadIfNecessary(true);
}
}

public void readFields(DataInputStream dis) throws IOException {
int size = dis.readInt();
for (int i = 0; i < size; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

Expand All @@ -48,9 +46,6 @@ public class AuditLoader extends Plugin implements AuditPlugin {

public static final String AUDIT_LOG_TABLE = "audit_log";

private static final DateTimeFormatter DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withZone(ZoneId.systemDefault());

private StringBuilder auditLogBuffer = new StringBuilder();
private int auditLogNum = 0;
private long lastLoadTimeAuditLog = 0;
Expand Down Expand Up @@ -90,7 +85,7 @@ public void init(PluginInfo info, PluginContext ctx) throws PluginException {
// GlobalVariable.audit_plugin_max_batch_bytes.
this.auditEventQueue = Queues.newLinkedBlockingDeque(100000);
this.streamLoader = new AuditStreamLoader();
this.loadThread = new Thread(new LoadWorker(this.streamLoader), "audit loader thread");
this.loadThread = new Thread(new LoadWorker(), "audit loader thread");
this.loadThread.start();

isInit = true;
Expand Down Expand Up @@ -143,6 +138,7 @@ private void assembleAudit(AuditEvent event) {
}

private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
// should be same order as InternalSchema.AUDIT_SCHEMA
logBuffer.append(event.queryId).append("\t");
logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("\t");
logBuffer.append(event.clientIp).append("\t");
Expand All @@ -156,15 +152,21 @@ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
logBuffer.append(event.scanBytes).append("\t");
logBuffer.append(event.scanRows).append("\t");
logBuffer.append(event.returnRows).append("\t");
logBuffer.append(event.shuffleSendRows).append("\t");
logBuffer.append(event.shuffleSendBytes).append("\t");
logBuffer.append(event.scanBytesFromLocalStorage).append("\t");
logBuffer.append(event.scanBytesFromRemoteStorage).append("\t");
logBuffer.append(event.stmtId).append("\t");
logBuffer.append(event.stmtType).append("\t");
logBuffer.append(event.isQuery ? 1 : 0).append("\t");
logBuffer.append(event.isNereids ? 1 : 0).append("\t");
logBuffer.append(event.feIp).append("\t");
logBuffer.append(event.cpuTimeMs).append("\t");
logBuffer.append(event.sqlHash).append("\t");
logBuffer.append(event.sqlDigest).append("\t");
logBuffer.append(event.peakMemoryBytes).append("\t");
logBuffer.append(event.workloadGroup).append("\t");
logBuffer.append(event.cloudClusterName).append("\t");
// already trim the query in org.apache.doris.qe.AuditLogHelper#logAuditLog
String stmt = event.stmt;
if (LOG.isDebugEnabled()) {
Expand All @@ -173,10 +175,12 @@ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
logBuffer.append(stmt).append("\n");
}

private void loadIfNecessary(AuditStreamLoader loader) {
// public for external call.
// synchronized to avoid concurrent load.
public synchronized void loadIfNecessary(boolean force) {
long currentTime = System.currentTimeMillis();

if (auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes
if (force || auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes
|| currentTime - lastLoadTimeAuditLog >= GlobalVariable.auditPluginMaxBatchInternalSec * 1000) {
// begin to load
try {
Expand All @@ -189,7 +193,7 @@ private void loadIfNecessary(AuditStreamLoader loader) {
discardLogNum += auditLogNum;
return;
}
AuditStreamLoader.LoadResponse response = loader.loadBatch(auditLogBuffer, token);
AuditStreamLoader.LoadResponse response = streamLoader.loadBatch(auditLogBuffer, token);
if (LOG.isDebugEnabled()) {
LOG.debug("audit loader response: {}", response);
}
Expand All @@ -215,10 +219,8 @@ private void resetBatch(long currentTime) {
}

private class LoadWorker implements Runnable {
private AuditStreamLoader loader;

public LoadWorker(AuditStreamLoader loader) {
this.loader = loader;
public LoadWorker() {
}

public void run() {
Expand All @@ -228,7 +230,7 @@ public void run() {
if (event != null) {
assembleAudit(event);
// process all audit logs
loadIfNecessary(loader);
loadIfNecessary(false);
}
} catch (InterruptedException ie) {
if (LOG.isDebugEnabled()) {
Expand All @@ -241,3 +243,4 @@ public void run() {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private void auditQueryLog(AuditEvent event) throws IllegalAccessException {
if (af.value().equals("Time(ms)")) {
queryTime = (long) f.get(event);
}
sb.append("|").append(af.value()).append("=").append(String.valueOf(f.get(event)));
sb.append("|").append(af.value()).append("=").append(f.get(event));
}

String auditLog = sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme
String cluster = Config.isCloudMode() ? cloudCluster : "";

AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
// ATTN: MUST reset, otherwise, the same AuditEventBuilder instance will be used in the next query.
auditEventBuilder.reset();
auditEventBuilder
.setTimestamp(ctx.getStartTime())
.setClientIp(ctx.getClientIP())
Expand Down
32 changes: 32 additions & 0 deletions regression-test/data/audit/test_audit_log_behavior.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !audit_log_schema --
query_id varchar(48) Yes true \N
time datetime(3) Yes true \N
client_ip varchar(128) Yes true \N
user varchar(128) Yes false \N NONE
catalog varchar(128) Yes false \N NONE
db varchar(128) Yes false \N NONE
state varchar(128) Yes false \N NONE
error_code int Yes false \N NONE
error_message text Yes false \N NONE
query_time bigint Yes false \N NONE
scan_bytes bigint Yes false \N NONE
scan_rows bigint Yes false \N NONE
return_rows bigint Yes false \N NONE
shuffle_send_rows bigint Yes false \N NONE
shuffle_send_bytes bigint Yes false \N NONE
scan_bytes_from_local_storage bigint Yes false \N NONE
scan_bytes_from_remote_storage bigint Yes false \N NONE
stmt_id bigint Yes false \N NONE
stmt_type varchar(48) Yes false \N NONE
is_query tinyint Yes false \N NONE
is_nereids tinyint Yes false \N NONE
frontend_ip varchar(128) Yes false \N NONE
cpu_time_ms bigint Yes false \N NONE
sql_hash varchar(128) Yes false \N NONE
sql_digest varchar(128) Yes false \N NONE
peak_memory_bytes bigint Yes false \N NONE
workload_group text Yes false \N NONE
compute_group text Yes false \N NONE
stmt text Yes false \N NONE

9 changes: 8 additions & 1 deletion regression-test/suites/audit/test_audit_log_behavior.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ suite("test_audit_log_behavior") {
try {
sql "set global enable_audit_plugin = true"
sql "set global audit_plugin_max_sql_length = 58"
sql "set global audit_plugin_max_batch_interval_sec = 1"
// sql "set global audit_plugin_max_batch_interval_sec = 1"
} catch (Exception e) {
log.warn("skip this case, because " + e.getMessage())
assertTrue(e.getMessage().toUpperCase().contains("ADMIN"))
Expand Down Expand Up @@ -71,6 +71,8 @@ suite("test_audit_log_behavior") {
]
]

qt_audit_log_schema """desc internal.__internal_schema.audit_log"""

for (def on : [true, false]) {
sql "set enable_nereids_planner=${on}"
sql "truncate table __internal_schema.audit_log"
Expand All @@ -80,6 +82,10 @@ suite("test_audit_log_behavior") {
sql tuple2[0]
}

// make sure audit event is created.
// see WorkloadRuntimeStatusMgr.getQueryNeedAudit()
Thread.sleep(6000)
sql """call flush_audit_log()"""
// check result
for (int i = 0; i < cnt; i++) {
def tuple2 = sqls.get(i)
Expand All @@ -96,6 +102,7 @@ suite("test_audit_log_behavior") {
assertEquals(res[0][0].toString(), tuple2[1].toString())
}
}
// do not turn off
sql "set global enable_audit_plugin = false"
sql "set global audit_plugin_max_sql_length = 4096"
sql "set global audit_plugin_max_batch_interval_sec = 60"
Expand Down
Loading