From f00f1affe027998247ca41e4ceda975f0e9cf61a Mon Sep 17 00:00:00 2001 From: abingcbc Date: Sat, 12 Oct 2024 11:21:37 +0800 Subject: [PATCH] feat: support to rename and delete tag --- core/common/CircularBuffer.h | 2 + core/common/Constants.cpp | 12 -- core/common/Constants.h | 13 -- core/common/ParamExtractor.h | 2 + core/common/TagConstants.h | 85 +++++++++++ core/file_server/ContainerInfo.cpp | 31 ++-- core/file_server/ContainerInfo.h | 24 +-- core/file_server/FileServer.cpp | 27 +++- core/file_server/FileServer.h | 9 ++ core/file_server/FileTagOptions.cpp | 143 ++++++++++++++++++ core/file_server/FileTagOptions.h | 60 ++++++++ .../event_handler/EventHandler.cpp | 20 ++- core/file_server/event_handler/EventHandler.h | 1 + .../event_handler/HistoryFileImporter.cpp | 1 + .../event_handler/HistoryFileImporter.h | 4 +- core/file_server/reader/FileReaderOptions.cpp | 13 -- core/file_server/reader/FileReaderOptions.h | 4 - core/file_server/reader/JsonLogFileReader.h | 5 +- core/file_server/reader/LogFileReader.cpp | 75 ++++++--- core/file_server/reader/LogFileReader.h | 26 +++- core/models/PipelineEventGroup.h | 14 +- core/monitor/LogFileProfiler.cpp | 34 +++-- core/monitor/LogFileProfiler.h | 18 ++- core/observer/network/NetworkObserver.cpp | 2 +- core/pipeline/GlobalConfig.cpp | 24 +++ core/pipeline/GlobalConfig.h | 2 + core/pipeline/Pipeline.cpp | 5 +- core/plugin/input/InputFile.cpp | 10 +- core/plugin/input/InputFile.h | 4 +- core/plugin/processor/CommonParserOptions.cpp | 9 +- core/plugin/processor/CommonParserOptions.h | 2 +- .../processor/ProcessorParseApsaraNative.cpp | 7 +- .../processor/ProcessorParseApsaraNative.h | 7 +- .../ProcessorParseDelimiterNative.cpp | 10 +- .../processor/ProcessorParseDelimiterNative.h | 2 +- .../processor/ProcessorParseJsonNative.cpp | 8 +- .../processor/ProcessorParseJsonNative.h | 7 +- .../processor/ProcessorParseRegexNative.cpp | 8 +- .../processor/ProcessorParseRegexNative.h | 3 +- .../inner/ProcessorSplitLogStringNative.cpp | 19 +-- .../inner/ProcessorSplitLogStringNative.h | 1 - ...ProcessorSplitMultilineLogStringNative.cpp | 19 +-- .../ProcessorSplitMultilineLogStringNative.h | 1 - .../processor/inner/ProcessorTagNative.cpp | 78 ++++++++-- .../processor/inner/ProcessorTagNative.h | 11 ++ core/runner/ProcessorRunner.cpp | 6 +- core/unittest/input/InputFileUnittest.cpp | 6 +- .../reader/FileReaderOptionsUnittest.cpp | 21 ++- pkg/config/global_config.go | 4 + pkg/protocol/converter/converter.go | 108 +------------ .../converter/converter_single_log_flatten.go | 4 +- pkg/protocol/converter/custom_single_log.go | 4 +- pkg/protocol/converter/otlp.go | 4 +- pluginmanager/plugin_runner_helper.go | 2 +- pluginmanager/plugin_runner_v1.go | 10 +- pluginmanager/plugin_runner_v2.go | 29 +++- pluginmanager/processor_tag_commuinty.go | 84 ++++++++++ pluginmanager/processor_tag_enterprise.go | 98 ++++++++++++ .../flusher/clickhouse/flusher_clickhouse.go | 6 +- .../elasticsearch/flusher_elasticsearch.go | 6 +- plugins/flusher/kafkav2/flusher_kafka_v2.go | 6 +- plugins/flusher/loki/flusher_loki.go | 6 +- plugins/flusher/opentelemetry/flusher_otlp.go | 2 +- plugins/flusher/pulsar/flusher_pulsar.go | 6 +- .../docker/logmeta/metric_container_info.go | 5 +- 65 files changed, 898 insertions(+), 381 deletions(-) create mode 100644 core/common/TagConstants.h create mode 100644 core/file_server/FileTagOptions.cpp create mode 100644 core/file_server/FileTagOptions.h create mode 100644 pluginmanager/processor_tag_commuinty.go create mode 100644 pluginmanager/processor_tag_enterprise.go diff --git a/core/common/CircularBuffer.h b/core/common/CircularBuffer.h index 81611292b6..4c0c8dbf45 100644 --- a/core/common/CircularBuffer.h +++ b/core/common/CircularBuffer.h @@ -16,8 +16,10 @@ #pragma once #include + #include "MemoryBarrier.h" #include "Semaphore.h" +#include "common/TimeUtil.h" namespace logtail { diff --git a/core/common/Constants.cpp b/core/common/Constants.cpp index 4113c30c33..be4ac9af75 100644 --- a/core/common/Constants.cpp +++ b/core/common/Constants.cpp @@ -22,18 +22,6 @@ const std::string OS_NAME = "Linux"; const std::string OS_NAME = "Windows"; #endif -const std::string LOG_RESERVED_KEY_SOURCE = "__source__"; -const std::string LOG_RESERVED_KEY_TOPIC = "__topic__"; -const std::string LOG_RESERVED_KEY_USER_DEFINED_ID = "__user_defined_id__"; -const std::string LOG_RESERVED_KEY_MACHINE_UUID = "__machine_uuid__"; -const std::string LOG_RESERVED_KEY_HOSTNAME = "__hostname__"; -const std::string LOG_RESERVED_KEY_PATH = "__path__"; -const std::string LOG_RESERVED_KEY_PACKAGE_ID = "__pack_id__"; -const std::string LOG_RESERVED_KEY_TRUNCATE_INFO = "__truncate_info__"; -// const std::string LOG_RESERVED_KEY_ALIPAY_ZONE = "__alipay_zone__"; -const std::string LOG_RESERVED_KEY_INODE = "__inode__"; -const std::string LOG_RESERVED_KEY_FILE_OFFSET = "__file_offset__"; - const char* SLS_EMPTY_STR_FOR_INDEX = "\01"; // profile project diff --git a/core/common/Constants.h b/core/common/Constants.h index f94b0875a7..6c333049dd 100644 --- a/core/common/Constants.h +++ b/core/common/Constants.h @@ -22,19 +22,6 @@ namespace logtail { // OS name, Linux, Windows. extern const std::string OS_NAME; -// Resevered key in log. -extern const std::string LOG_RESERVED_KEY_SOURCE; -extern const std::string LOG_RESERVED_KEY_TOPIC; -extern const std::string LOG_RESERVED_KEY_USER_DEFINED_ID; -extern const std::string LOG_RESERVED_KEY_MACHINE_UUID; -extern const std::string LOG_RESERVED_KEY_HOSTNAME; -extern const std::string LOG_RESERVED_KEY_PATH; -extern const std::string LOG_RESERVED_KEY_PACKAGE_ID; -extern const std::string LOG_RESERVED_KEY_TRUNCATE_INFO; -// extern const std::string LOG_RESERVED_KEY_ALIPAY_ZONE; -extern const std::string LOG_RESERVED_KEY_INODE; -extern const std::string LOG_RESERVED_KEY_FILE_OFFSET; - extern const char* SLS_EMPTY_STR_FOR_INDEX; // profile project diff --git a/core/common/ParamExtractor.h b/core/common/ParamExtractor.h index 7ee050e066..82b6312e1e 100644 --- a/core/common/ParamExtractor.h +++ b/core/common/ParamExtractor.h @@ -285,4 +285,6 @@ bool IsValidList(const Json::Value& config, const std::string& key, std::string& bool IsValidMap(const Json::Value& config, const std::string& key, std::string& errorMsg); +bool IsKeyExist(const Json::Value& config, const std::string& key); + } // namespace logtail diff --git a/core/common/TagConstants.h b/core/common/TagConstants.h new file mode 100644 index 0000000000..36624f9a8b --- /dev/null +++ b/core/common/TagConstants.h @@ -0,0 +1,85 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace logtail { + +const std::string LOG_RESERVED_KEY_SOURCE = "__source__"; +const std::string LOG_RESERVED_KEY_TOPIC = "__topic__"; +const std::string LOG_RESERVED_KEY_MACHINE_UUID = "__machine_uuid__"; +const std::string LOG_RESERVED_KEY_PACKAGE_ID = "__pack_id__"; +const std::string LOG_RESERVED_KEY_TRUNCATE_INFO = "__truncate_info__"; + +const std::string DEFAULT_CONFIG_TAG_KEY_VALUE = "__default__"; + +enum TagKey { + FILE_OFFSET_KEY = 0, + FILE_INODE_TAG_KEY = 1, + FILE_PATH_TAG_KEY = 2, + K8S_NAMESPACE_TAG_KEY = 3, + K8S_POD_NAME_TAG_KEY = 4, + K8S_POD_UID_TAG_KEY = 5, + CONTAINER_NAME_TAG_KEY = 6, + CONTAINER_IP_TAG_KEY = 7, + CONTAINER_IMAGE_NAME_TAG_KEY = 8, + K8S_CONTAINER_NAME_TAG_KEY = 9, + K8S_CONTAINER_IMAGE_NAME_TAG_KEY = 10, + K8S_CONTAINER_IP_TAG_KEY = 11, + HOST_NAME = 12, + NUM_VALUES = 13, + UNKOWN = 14, +}; + +#ifdef __ENTERPRISE__ +const std::string TagDefaultKey[NUM_VALUES] = { + "__file_offset__", + "__inode__", + "__path__", + "_namespace_", + "_pod_name_", + "_pod_uid_", + "_container_name_", + "_container_ip_", + "_image_name_", + "_container_name_", + "_image_name_", + "_container_ip_", + "__hostname__", +}; +const std::string AGENT_TAG_DEFAULT_KEY = "__user_defined_id__"; +#else +const std::string TagDefaultKey[NUM_VALUES] = { + "log.file.offset", + "log.file.inode", + "log.file.path", + "k8s.namespace.name", + "k8s.pod.name", + "k8s.pod.uid", + "container.name", + "container.ip", + "container.image.name", + "k8s.container.name", + "k8s.container.image.name", + "k8s.container.ip", + "host.name", +}; +const std::string HOST_IP_DEFAULT_KEY = "host.ip"; +#endif + +} // namespace logtail \ No newline at end of file diff --git a/core/file_server/ContainerInfo.cpp b/core/file_server/ContainerInfo.cpp index 6a44668da0..d903bd5c47 100644 --- a/core/file_server/ContainerInfo.cpp +++ b/core/file_server/ContainerInfo.cpp @@ -21,7 +21,7 @@ namespace logtail { -const std::unordered_set containerNameTag = { +const std::vector containerNameTag = { "_image_name_", "_container_name_", "_pod_name_", @@ -30,6 +30,15 @@ const std::unordered_set containerNameTag = { "_container_ip_", }; +const std::vector containerNameTagKey = { + TagKey::CONTAINER_IMAGE_NAME_TAG_KEY, + TagKey::CONTAINER_NAME_TAG_KEY, + TagKey::K8S_POD_NAME_TAG_KEY, + TagKey::K8S_NAMESPACE_TAG_KEY, + TagKey::K8S_POD_UID_TAG_KEY, + TagKey::CONTAINER_IP_TAG_KEY, +}; + bool ContainerInfo::ParseAllByJSONObj(const Json::Value& paramsAll, std::unordered_map& containerInfoMap, std::string& errorMsg) { @@ -98,7 +107,7 @@ bool ContainerInfo::ParseByJSONObj(const Json::Value& params, ContainerInfo& con sls_logs::LogTag tag; tag.set_key(metaDatas[i - 1].asString()); tag.set_value(metaDatas[i].asString()); - containerInfo.mMetadatas.emplace_back(tag); + containerInfo.mMetadatas->emplace_back(tag); } } } @@ -109,14 +118,7 @@ bool ContainerInfo::ParseByJSONObj(const Json::Value& params, ContainerInfo& con sls_logs::LogTag tag; tag.set_key(tags[i - 1].asString()); tag.set_value(tags[i].asString()); - // 不是老版本 - if (!isOldCheckpoint) { - containerInfo.mTags.emplace_back(tag); - } else if (containerNameTag.find(tags[i - 1].asString()) != containerNameTag.end()) { - containerInfo.mMetadatas.emplace_back(tag); - } else { - containerInfo.mTags.emplace_back(tag); - } + containerInfo.mTags->emplace_back(tag); } } } @@ -129,4 +131,13 @@ bool ContainerInfo::ParseByJSONObj(const Json::Value& params, ContainerInfo& con return true; } +TagKey ContainerInfo::GetFileTagKey(const std::string& key) { + for (size_t i = 0; i < containerNameTag.size(); ++i) { + if (containerNameTag[i] == key) { + return containerNameTagKey[i]; + } + } + return TagKey::UNKOWN; +} + } // namespace logtail diff --git a/core/file_server/ContainerInfo.h b/core/file_server/ContainerInfo.h index 0f08db7999..f720408f88 100644 --- a/core/file_server/ContainerInfo.h +++ b/core/file_server/ContainerInfo.h @@ -23,7 +23,7 @@ #include #include -#include "container_manager/ConfigContainerInfoUpdateCmd.h" +#include "common/TagConstants.h" #include "protobuf/sls/sls_logs.pb.h" namespace logtail { @@ -44,12 +44,14 @@ struct ContainerInfo { std::string mLogPath; std::string mUpperDir; std::vector mMounts; // mounts of this container - std::vector mTags; // ContainerNameTag - std::vector mMetadatas; // ExternalEnvTag and ExternalK8sLabelTag + // Tags is hold by each reader and cost memory, so use shared_ptr to share the same tags. + std::shared_ptr> mTags; // ContainerNameTag, ExternalEnvTag and ExternalK8sLabelTag. + std::shared_ptr> mMetadatas; Json::Value mJson; // this obj's json, for saving to local file static bool ParseByJSONObj(const Json::Value&, ContainerInfo&, std::string&); static bool ParseAllByJSONObj(const Json::Value&, std::unordered_map&, std::string&); + static TagKey GetFileTagKey(const std::string& key); bool operator==(const ContainerInfo& rhs) const { if (mID != rhs.mID) { @@ -74,22 +76,22 @@ struct ContainerInfo { return false; } } - if (mMetadatas.size() != rhs.mMetadatas.size()) { + if (mMetadatas->size() != rhs.mMetadatas->size()) { return false; } - for (size_t idx = 0; idx < mMetadatas.size(); ++idx) { - const auto& lhsTag = mMetadatas[idx]; - const auto& rhsTag = rhs.mMetadatas[idx]; + for (size_t idx = 0; idx < mMetadatas->size(); ++idx) { + const auto& lhsTag = (*mMetadatas)[idx]; + const auto& rhsTag = (*rhs.mMetadatas)[idx]; if (lhsTag.key() != rhsTag.key() || lhsTag.value() != rhsTag.value()) { return false; } } - if (mTags.size() != rhs.mTags.size()) { + if (mTags->size() != rhs.mTags->size()) { return false; } - for (size_t idx = 0; idx < mTags.size(); ++idx) { - const auto& lhsTag = mTags[idx]; - const auto& rhsTag = rhs.mTags[idx]; + for (size_t idx = 0; idx < mTags->size(); ++idx) { + const auto& lhsTag = (*mTags)[idx]; + const auto& rhsTag = (*rhs.mTags)[idx]; if (lhsTag.key() != rhsTag.key() || lhsTag.value() != rhsTag.value()) { return false; } diff --git a/core/file_server/FileServer.cpp b/core/file_server/FileServer.cpp index 2e37967007..80f0b425b7 100644 --- a/core/file_server/FileServer.cpp +++ b/core/file_server/FileServer.cpp @@ -32,8 +32,8 @@ using namespace std; namespace logtail { FileServer::FileServer() { - WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, - {{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_FILE_SERVER}}); + WriteMetrics::GetInstance()->PrepareMetricsRecordRef( + mMetricsRecordRef, {{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_FILE_SERVER}}); } // 启动文件服务,包括加载配置、处理检查点、注册事件等 @@ -178,6 +178,29 @@ void FileServer::RemoveMultilineConfig(const string& name) { mPipelineNameMultilineConfigsMap.erase(name); } +// 获取给定名称的Tag配置 +FileTagConfig FileServer::GetFileTagConfig(const string& name) const { + ReadLock lock(mReadWriteLock); + auto itr = mPipelineNameFileTagConfigsMap.find(name); + if (itr != mPipelineNameFileTagConfigsMap.end()) { + return itr->second; + } + return make_pair(nullptr, nullptr); +} + +// 添加Tag配置 +void FileServer::AddFileTagConfig(const std::string& name, const FileTagOptions* opts, const PipelineContext* ctx) { + WriteLock lock(mReadWriteLock); + mPipelineNameFileTagConfigsMap[name] = make_pair(opts, ctx); +} + +// 移除给定名称的Tag配置 +void FileServer::RemoveFileTagConfig(const string& name) { + WriteLock lock(mReadWriteLock); + mPipelineNameFileTagConfigsMap.erase(name); +} + + // 保存容器信息 void FileServer::SaveContainerInfo(const string& pipeline, const shared_ptr>& info) { WriteLock lock(mReadWriteLock); diff --git a/core/file_server/FileServer.h b/core/file_server/FileServer.h index d77b6a13a1..c75960cbae 100644 --- a/core/file_server/FileServer.h +++ b/core/file_server/FileServer.h @@ -22,6 +22,7 @@ #include "common/Lock.h" #include "file_server/FileDiscoveryOptions.h" +#include "file_server/FileTagOptions.h" #include "file_server/MultilineOptions.h" #include "file_server/reader/FileReaderOptions.h" #include "monitor/LogtailMetric.h" @@ -66,6 +67,13 @@ class FileServer { void AddMultilineConfig(const std::string& name, const MultilineOptions* opts, const PipelineContext* ctx); void RemoveMultilineConfig(const std::string& name); + FileTagConfig GetFileTagConfig(const std::string& name) const; + const std::unordered_map& GetAllFileTagConfigs() const { + return mPipelineNameFileTagConfigsMap; + } + void AddFileTagConfig(const std::string& name, const FileTagOptions* opts, const PipelineContext* ctx); + void RemoveFileTagConfig(const std::string& name); + void SaveContainerInfo(const std::string& pipeline, const std::shared_ptr>& info); std::shared_ptr> GetAndRemoveContainerInfo(const std::string& pipeline); void ClearContainerInfo(); @@ -101,6 +109,7 @@ class FileServer { std::unordered_map mPipelineNameFileDiscoveryConfigsMap; std::unordered_map mPipelineNameFileReaderConfigsMap; std::unordered_map mPipelineNameMultilineConfigsMap; + std::unordered_map mPipelineNameFileTagConfigsMap; std::unordered_map>> mAllContainerInfoMap; std::unordered_map mPipelineNamePluginMetricManagersMap; // 过渡使用 diff --git a/core/file_server/FileTagOptions.cpp b/core/file_server/FileTagOptions.cpp new file mode 100644 index 0000000000..a862ee7b34 --- /dev/null +++ b/core/file_server/FileTagOptions.cpp @@ -0,0 +1,143 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "file_server/FileTagOptions.h" + +#include "common/ParamExtractor.h" + +namespace logtail { + +bool FileTagOptions::Init(const Json::Value& config, + const PipelineContext& context, + const std::string& pluginType, + bool enableContainerDiscovery) { + std::string errorMsg; + + bool appendingLogPositionMeta; + // AppendingLogPositionMeta + if (!GetOptionalBoolParam(config, "AppendingLogPositionMeta", appendingLogPositionMeta, errorMsg)) { + PARAM_WARNING_DEFAULT(context.GetLogger(), + context.GetAlarm(), + errorMsg, + appendingLogPositionMeta, + pluginType, + context.GetConfigName(), + context.GetProjectName(), + context.GetLogstoreName(), + context.GetRegion()); + } + + // Tags + const char* tagKey = "Tags"; + const Json::Value* tagConfig = config.find(tagKey, tagKey + strlen(tagKey)); + if (tagConfig) { + if (!tagConfig->isObject()) { + PARAM_WARNING_DEFAULT(context.GetLogger(), + context.GetAlarm(), + "param Tags is not of type object", + "custom", + pluginType, + context.GetConfigName(), + context.GetProjectName(), + context.GetLogstoreName(), + context.GetRegion()); + return false; + } + } + + // the priority of FileOffsetKey and FileOffsetTagKey is higher than appendingLogPositionMeta + if (config.isMember("FileOffsetKey") || tagConfig->isMember("FileOffsetTagKey")) { + parseDefaultNotAddTag(config, "FileOffsetKey", TagKey::FILE_OFFSET_KEY, context, pluginType); + parseDefaultNotAddTag(config, "FileOffsetTagKey", TagKey::FILE_OFFSET_KEY, context, pluginType); + } else if (appendingLogPositionMeta) { + mFileTags[TagKey::FILE_OFFSET_KEY] = TagDefaultKey[TagKey::FILE_OFFSET_KEY]; + mFileTags[TagKey::FILE_INODE_TAG_KEY] = TagDefaultKey[TagKey::FILE_INODE_TAG_KEY]; + } + + parseDefaultAddTag(config, "FilePathTagKey", TagKey::FILE_PATH_TAG_KEY, context, pluginType); + + if (enableContainerDiscovery) { + parseDefaultAddTag(config, "K8sNamespaceTagKey", TagKey::K8S_NAMESPACE_TAG_KEY, context, pluginType); + parseDefaultAddTag(config, "K8sPodNameTagKey", TagKey::K8S_POD_NAME_TAG_KEY, context, pluginType); + parseDefaultAddTag(config, "K8sPodUidTagKey", TagKey::K8S_POD_UID_TAG_KEY, context, pluginType); + parseDefaultAddTag(config, "ContainerNameTagKey", TagKey::CONTAINER_NAME_TAG_KEY, context, pluginType); + parseDefaultAddTag(config, "ContainerIpTagKey", TagKey::CONTAINER_IP_TAG_KEY, context, pluginType); + parseDefaultAddTag( + config, "ContainerImageNameTagKey", TagKey::CONTAINER_IMAGE_NAME_TAG_KEY, context, pluginType); + } + + return true; +} + +void FileTagOptions::parseDefaultAddTag(const Json::Value& config, + const std::string& keyName, + const TagKey& keyEnum, + const PipelineContext& context, + const std::string& pluginType) { + std::string errorMsg; + std::string key; + if (config.isMember(keyName)) { + if (!GetOptionalStringParam(config, keyName, key, errorMsg)) { + PARAM_WARNING_DEFAULT(context.GetLogger(), + context.GetAlarm(), + errorMsg, + "custom", + pluginType, + context.GetConfigName(), + context.GetProjectName(), + context.GetLogstoreName(), + context.GetRegion()); + } else if (!key.empty()) { + if (key == DEFAULT_CONFIG_TAG_KEY_VALUE) { + mFileTags[keyEnum] = TagDefaultKey[keyEnum]; + } else { + mFileTags[keyEnum] = key; + } + } + } else { + mFileTags[keyEnum] = TagDefaultKey[keyEnum]; + } +} + +void FileTagOptions::parseDefaultNotAddTag(const Json::Value& config, + const std::string& keyName, + const TagKey& keyEnum, + const PipelineContext& context, + const std::string& pluginType) { + std::string errorMsg; + std::string key; + if (config.isMember(keyName)) { + if (!GetOptionalStringParam(config, keyName, key, errorMsg)) { + PARAM_WARNING_DEFAULT(context.GetLogger(), + context.GetAlarm(), + errorMsg, + "custom", + pluginType, + context.GetConfigName(), + context.GetProjectName(), + context.GetLogstoreName(), + context.GetRegion()); + } else if (!key.empty()) { + if (key == DEFAULT_CONFIG_TAG_KEY_VALUE) { + mFileTags[keyEnum] = TagDefaultKey[keyEnum]; + } else { + mFileTags[keyEnum] = key; + } + } + } +} + +} // namespace logtail \ No newline at end of file diff --git a/core/file_server/FileTagOptions.h b/core/file_server/FileTagOptions.h new file mode 100644 index 0000000000..86a70c92fd --- /dev/null +++ b/core/file_server/FileTagOptions.h @@ -0,0 +1,60 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +#include "common/TagConstants.h" +#include "pipeline/PipelineContext.h" + +namespace logtail { + +class FileTagOptions { +public: + bool Init(const Json::Value& config, + const PipelineContext& context, + const std::string& pluginType, + bool enableContainerDiscovery); + StringView GetFileTagKeyName(TagKey key) const { + if (mFileTags.find(key) != mFileTags.end()) { + // FileTagOption will not be deconstructed or changed before all event be sent + return StringView(mFileTags.at(key).c_str(), mFileTags.at(key).size()); + } + return StringView(); + } + + +private: + void parseDefaultAddTag(const Json::Value& config, + const std::string& keyName, + const TagKey& keyEnum, + const PipelineContext& context, + const std::string& pluginType); + void parseDefaultNotAddTag(const Json::Value& config, + const std::string& keyName, + const TagKey& keyEnum, + const PipelineContext& context, + const std::string& pluginType); + + std::unordered_map mFileTags; +}; + +using FileTagConfig = std::pair; + +} // namespace logtail diff --git a/core/file_server/event_handler/EventHandler.cpp b/core/file_server/event_handler/EventHandler.cpp index 4c9774b373..d6b6ee4b57 100644 --- a/core/file_server/event_handler/EventHandler.cpp +++ b/core/file_server/event_handler/EventHandler.cpp @@ -352,6 +352,7 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path, const FileReaderConfig& readerConfig, const MultilineConfig& multilineConfig, const FileDiscoveryConfig& discoveryConfig, + const FileTagConfig& tagConfig, uint32_t exactlyonceConcurrency, bool forceBeginingFlag) { if (mNameReaderMap.find(name) == mNameReaderMap.end()) { @@ -369,6 +370,7 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path, readerConfig, multilineConfig, discoveryConfig, + tagConfig, exactlyonceConcurrency, forceBeginingFlag)); if (readerPtr.get() == NULL) @@ -615,15 +617,16 @@ void ModifyHandler::Handle(const Event& event) { return; } } else if (devInodeIter == mDevInodeReaderMap.end()) { - FileDiscoveryConfig config = FileServer::GetInstance()->GetFileDiscoveryConfig(mConfigName); + FileDiscoveryConfig discoverConfig = FileServer::GetInstance()->GetFileDiscoveryConfig(mConfigName); // double check // if event with config name, skip check - if (config.first && (!event.GetConfigName().empty() || config.first->IsMatch(path, name))) { + if (discoverConfig.first && (!event.GetConfigName().empty() || discoverConfig.first->IsMatch(path, name))) { FileReaderConfig readerConfig = FileServer::GetInstance()->GetFileReaderConfig(mConfigName); MultilineConfig multilineConfig = FileServer::GetInstance()->GetMultilineConfig(mConfigName); + FileTagConfig tagConfig = FileServer::GetInstance()->GetFileTagConfig(mConfigName); uint32_t concurrency = FileServer::GetInstance()->GetExactlyOnceConcurrency(mConfigName); - LogFileReaderPtr readerPtr - = CreateLogFileReaderPtr(path, name, devInode, readerConfig, multilineConfig, config, concurrency); + LogFileReaderPtr readerPtr = CreateLogFileReaderPtr( + path, name, devInode, readerConfig, multilineConfig, discoverConfig, tagConfig, concurrency); if (readerPtr.get() == NULL) { LogFileReaderPtrArray& readerArray = mNameReaderMap[name]; // if rotate queue is full, try read array header @@ -872,13 +875,14 @@ void ModifyHandler::Handle(const Event& event) { return; } if (devInodeIter == mDevInodeReaderMap.end()) { - FileDiscoveryConfig config = FileServer::GetInstance()->GetFileDiscoveryConfig(mConfigName); - if (config.first && (!event.GetConfigName().empty() || config.first->IsMatch(path, name))) { + FileDiscoveryConfig discoverConfig = FileServer::GetInstance()->GetFileDiscoveryConfig(mConfigName); + if (discoverConfig.first && (!event.GetConfigName().empty() || discoverConfig.first->IsMatch(path, name))) { FileReaderConfig readerConfig = FileServer::GetInstance()->GetFileReaderConfig(mConfigName); MultilineConfig multilineConfig = FileServer::GetInstance()->GetMultilineConfig(mConfigName); + FileTagConfig tagConfig = FileServer::GetInstance()->GetFileTagConfig(mConfigName); uint32_t concurrency = FileServer::GetInstance()->GetExactlyOnceConcurrency(mConfigName); LogFileReaderPtr readerPtr = CreateLogFileReaderPtr( - path, name, devInode, readerConfig, multilineConfig, config, concurrency, true); + path, name, devInode, readerConfig, multilineConfig, discoverConfig, tagConfig, concurrency, true); if (readerPtr.get() == NULL) { return; } @@ -1082,7 +1086,7 @@ int32_t ModifyHandler::PushLogToProcessor(LogFileReaderPtr reader, LogBuffer* lo reader->GetLogstore(), reader->GetConvertedPath(), reader->GetHostLogPath(), - reader->GetExtraTags(), + reader->GetContainerExtraTags(), reader->GetDevInode().dev, reader->GetDevInode().inode, reader->GetFileSize(), diff --git a/core/file_server/event_handler/EventHandler.h b/core/file_server/event_handler/EventHandler.h index 6cf3655dec..dff6dc64bb 100644 --- a/core/file_server/event_handler/EventHandler.h +++ b/core/file_server/event_handler/EventHandler.h @@ -95,6 +95,7 @@ class ModifyHandler : public EventHandler { const FileReaderConfig& readerConfig, const MultilineConfig& multilineConfig, const FileDiscoveryConfig& discoveryConfig, + const FileTagConfig& tagConfig, uint32_t exactlyonceConcurrency = 0, bool forceBeginingFlag = false); diff --git a/core/file_server/event_handler/HistoryFileImporter.cpp b/core/file_server/event_handler/HistoryFileImporter.cpp index 24c8bbfbae..71b86751f7 100644 --- a/core/file_server/event_handler/HistoryFileImporter.cpp +++ b/core/file_server/event_handler/HistoryFileImporter.cpp @@ -85,6 +85,7 @@ void HistoryFileImporter::ProcessEvent(const HistoryFileEvent& event, const std: event.mReaderConfig, event.mMultilineConfig, event.mDiscoveryconfig, + event.mTagConfig, event.mEOConcurrency, true)); if (readerSharePtr == NULL) { diff --git a/core/file_server/event_handler/HistoryFileImporter.h b/core/file_server/event_handler/HistoryFileImporter.h index e74df48b6b..a267ccc4fb 100644 --- a/core/file_server/event_handler/HistoryFileImporter.h +++ b/core/file_server/event_handler/HistoryFileImporter.h @@ -17,8 +17,9 @@ #pragma once #include #include -#include "common/StringTools.h" + #include "common/CircularBuffer.h" +#include "common/StringTools.h" #include "common/Thread.h" #include "plugin/input/InputFile.h" @@ -32,6 +33,7 @@ struct HistoryFileEvent { FileDiscoveryConfig mDiscoveryconfig; FileReaderConfig mReaderConfig; MultilineConfig mMultilineConfig; + FileTagConfig mTagConfig; uint32_t mEOConcurrency = 0; HistoryFileEvent() : mStartPos(0) {} diff --git a/core/file_server/reader/FileReaderOptions.cpp b/core/file_server/reader/FileReaderOptions.cpp index bb199a7daa..111ca7bc78 100644 --- a/core/file_server/reader/FileReaderOptions.cpp +++ b/core/file_server/reader/FileReaderOptions.cpp @@ -182,19 +182,6 @@ bool FileReaderOptions::Init(const Json::Value& config, const PipelineContext& c ctx.GetRegion()); } - // AppendingLogPositionMeta - if (!GetOptionalBoolParam(config, "AppendingLogPositionMeta", mAppendingLogPositionMeta, errorMsg)) { - PARAM_WARNING_DEFAULT(ctx.GetLogger(), - ctx.GetAlarm(), - errorMsg, - mAppendingLogPositionMeta, - pluginType, - ctx.GetConfigName(), - ctx.GetProjectName(), - ctx.GetLogstoreName(), - ctx.GetRegion()); - } - return true; } diff --git a/core/file_server/reader/FileReaderOptions.h b/core/file_server/reader/FileReaderOptions.h index fa5f5dcd23..c1a3015ac4 100644 --- a/core/file_server/reader/FileReaderOptions.h +++ b/core/file_server/reader/FileReaderOptions.h @@ -38,10 +38,6 @@ struct FileReaderOptions { uint32_t mReadDelayAlertThresholdBytes; uint32_t mCloseUnusedReaderIntervalSec; uint32_t mRotatorQueueSize; - // This param is compound since it controls both reader option and parser option. For simplicity, we put it in - // reader option. If option controlling parser is separated from this, the separated option should be placed in - // input. - bool mAppendingLogPositionMeta = false; FileReaderOptions(); diff --git a/core/file_server/reader/JsonLogFileReader.h b/core/file_server/reader/JsonLogFileReader.h index 8d958c907c..0c03d3e8fe 100644 --- a/core/file_server/reader/JsonLogFileReader.h +++ b/core/file_server/reader/JsonLogFileReader.h @@ -26,8 +26,9 @@ class JsonLogFileReader : public LogFileReader { const std::string& hostLogPathFile, const DevInode& devInode, const FileReaderConfig& readerConfig, - const MultilineConfig& multilineConfig) - : LogFileReader(hostLogPathDir, hostLogPathFile, devInode, readerConfig, multilineConfig) {} + const MultilineConfig& multilineConfig, + const FileTagConfig& tagConfig) + : LogFileReader(hostLogPathDir, hostLogPathFile, devInode, readerConfig, multilineConfig, tagConfig) {} protected: int32_t RemoveLastIncompleteLog(char* buffer, diff --git a/core/file_server/reader/LogFileReader.cpp b/core/file_server/reader/LogFileReader.cpp index 2f849b5fcb..3b9be5f92e 100644 --- a/core/file_server/reader/LogFileReader.cpp +++ b/core/file_server/reader/LogFileReader.cpp @@ -91,13 +91,15 @@ LogFileReader* LogFileReader::CreateLogFileReader(const string& hostLogPathDir, const FileReaderConfig& readerConfig, const MultilineConfig& multilineConfig, const FileDiscoveryConfig& discoveryConfig, + const FileTagConfig& tagConfig, uint32_t exactlyonceConcurrency, bool forceFromBeginning) { LogFileReader* reader = nullptr; if (readerConfig.second->RequiringJsonReader()) { - reader = new JsonLogFileReader(hostLogPathDir, hostLogPathFile, devInode, readerConfig, multilineConfig); + reader = new JsonLogFileReader( + hostLogPathDir, hostLogPathFile, devInode, readerConfig, multilineConfig, tagConfig); } else { - reader = new LogFileReader(hostLogPathDir, hostLogPathFile, devInode, readerConfig, multilineConfig); + reader = new LogFileReader(hostLogPathDir, hostLogPathFile, devInode, readerConfig, multilineConfig, tagConfig); } if (reader) { @@ -116,16 +118,9 @@ LogFileReader* LogFileReader::CreateLogFileReader(const string& hostLogPathDir, ? discoveryConfig.first->GetWildcardPaths()[0] : discoveryConfig.first->GetBasePath(), containerPath->mRealBaseDir.size()); - reader->AddExtraTags(containerPath->mMetadatas); - reader->AddExtraTags(containerPath->mTags); + reader->SetContainerExtraTags(containerPath->mTags); } } - if (readerConfig.first->mAppendingLogPositionMeta) { - sls_logs::LogTag inodeTag; - inodeTag.set_key(LOG_RESERVED_KEY_INODE); - inodeTag.set_value(std::to_string(devInode.inode)); - reader->AddExtraTags(std::vector{inodeTag}); - } GlobalConfig::TopicType topicType = readerConfig.second->GetGlobalConfig().mTopicType; const string& topicFormat = readerConfig.second->GetGlobalConfig().mTopicFormat; @@ -181,12 +176,14 @@ LogFileReader::LogFileReader(const std::string& hostLogPathDir, const std::string& hostLogPathFile, const DevInode& devInode, const FileReaderConfig& readerConfig, - const MultilineConfig& multilineConfig) + const MultilineConfig& multilineConfig, + const FileTagConfig& tagConfig) : mHostLogPathDir(hostLogPathDir), mHostLogPathFile(hostLogPathFile), mDevInode(devInode), mReaderConfig(readerConfig), - mMultilineConfig(multilineConfig) { + mMultilineConfig(multilineConfig), + mTagConfig(tagConfig) { mHostLogPath = PathJoin(hostLogPathDir, hostLogPathFile); mLastUpdateTime = time(NULL); mLastEventTime = mLastUpdateTime; @@ -773,7 +770,7 @@ std::string LogFileReader::GetTopicName(const std::string& topicConfig, const st sls_logs::LogTag tag; tag.set_key(keys[0]); tag.set_value(values[0]); - mExtraTags.push_back(tag); + mTopicExtraTags.push_back(tag); } return values[0]; } else { @@ -786,7 +783,7 @@ std::string LogFileReader::GetTopicName(const std::string& topicConfig, const st sls_logs::LogTag tag; tag.set_key(keys[i]); tag.set_value(values[i]); - mExtraTags.push_back(tag); + mTopicExtraTags.push_back(tag); } } return res; @@ -811,7 +808,7 @@ std::string LogFileReader::GetTopicName(const std::string& topicConfig, const st sls_logs::LogTag tag; tag.set_key(string("__topic_") + ToString(i) + "__"); tag.set_value(what[i]); - mExtraTags.push_back(tag); + mTopicExtraTags.push_back(tag); } } } else { @@ -2431,7 +2428,7 @@ void ContainerdTextParser::parseLine(LineInfo rawLine, LineInfo& paseLine) { } void LogFileReader::SetEventGroupMetaAndTag(PipelineEventGroup& group) { - // we store source-specific info with fixed key in metadata + // we store inner info in metadata switch (mFileLogFormat) { case LogFormat::DOCKER_JSON_FILE: group.SetMetadataNoCopy(EventGroupMetaKey::LOG_FORMAT, ProcessorParseContainerLogNative::DOCKER_JSON_FILE); @@ -2449,19 +2446,47 @@ void LogFileReader::SetEventGroupMetaAndTag(PipelineEventGroup& group) { group.SetMetadata(EventGroupMetaKey::LOG_FILE_INODE, ToString(GetDevInode().inode)); } group.SetMetadata(EventGroupMetaKey::SOURCE_ID, GetSourceId()); + auto offsetKey = mTagConfig.first->GetFileTagKeyName(TagKey::FILE_OFFSET_KEY); + if (!offsetKey.empty()) { + group.SetMetadata(EventGroupMetaKey::LOG_FILE_OFFSET_KEY, offsetKey); + } - // for source-specific info without fixed key, we store them in tags directly + // we store info which users can see in tags // for log, these includes: - // 1. extra topic - // 2. external k8s env/label tag - // 3. inode (this is special, currently it is in both metadata and tag, since it is not a default tag; later on, it - // should be controlled by tag processor) - const std::vector& extraTags = GetExtraTags(); - for (size_t i = 0; i < extraTags.size(); ++i) { - group.SetTag(extraTags[i].key(), extraTags[i].value()); - } + // 1. topic StringBuffer b = group.GetSourceBuffer()->CopyString(GetTopicName()); group.SetTagNoCopy(LOG_RESERVED_KEY_TOPIC, StringView(b.data, b.size)); + // 2. extra topic + auto topicExtraTags = GetTopicExtraTags(); + for (size_t i = 0; i < topicExtraTags.size(); ++i) { + group.SetTag(topicExtraTags[i].key(), topicExtraTags[i].value()); + } + // 3. container name tag, external k8s env/label tag + auto containerExtraTags = GetContainerExtraTags(); + for (size_t i = 0; i < containerExtraTags->size(); ++i) { + auto key = ContainerInfo::GetFileTagKey((*containerExtraTags)[i].key()); + if (key != TagKey::UNKOWN) { // container name tag + auto keyName = mTagConfig.first->GetFileTagKeyName(key); + if (!keyName.empty()) { + StringBuffer b = group.GetSourceBuffer()->CopyString((*containerExtraTags)[i].value()); + group.SetTagNoCopy(keyName, StringView(b.data, b.size)); + } + } else { // external k8s env/label tag + group.SetTag((*containerExtraTags)[i].key(), (*containerExtraTags)[i].value()); + } + } + // 4. inode + auto keyName = mTagConfig.first->GetFileTagKeyName(TagKey::FILE_INODE_TAG_KEY); + if (!keyName.empty()) { + StringBuffer b = group.GetSourceBuffer()->CopyString(ToString(GetDevInode().inode)); + group.SetTagNoCopy(keyName, StringView(b.data, b.size)); + } + // 5. path + keyName = mTagConfig.first->GetFileTagKeyName(TagKey::FILE_PATH_TAG_KEY); + if (!keyName.empty()) { + StringBuffer b = group.GetSourceBuffer()->CopyString(GetConvertedPath()); + group.SetTagNoCopy(keyName, StringView(b.data, b.size)); + } } PipelineEventGroup LogFileReader::GenerateEventGroup(LogFileReaderPtr reader, LogBuffer* logBuffer) { diff --git a/core/file_server/reader/LogFileReader.h b/core/file_server/reader/LogFileReader.h index e58d6e5279..16b11a40bd 100644 --- a/core/file_server/reader/LogFileReader.h +++ b/core/file_server/reader/LogFileReader.h @@ -31,16 +31,16 @@ #include "common/StringTools.h" #include "common/TimeUtil.h" #include "common/memory/SourceBuffer.h" -#include "file_server/event/Event.h" #include "file_server/FileDiscoveryOptions.h" #include "file_server/FileServer.h" #include "file_server/MultilineOptions.h" -#include "protobuf/sls/sls_logs.pb.h" +#include "file_server/event/Event.h" +#include "file_server/reader/FileReaderOptions.h" #include "logger/Logger.h" #include "models/StringView.h" #include "pipeline/queue/QueueKey.h" +#include "protobuf/sls/sls_logs.pb.h" #include "rapidjson/allocators.h" -#include "file_server/reader/FileReaderOptions.h" namespace logtail { @@ -176,6 +176,7 @@ class LogFileReader { const FileReaderConfig& readerConfig, const MultilineConfig& multilineConfig, const FileDiscoveryConfig& discoveryConfig, + const FileTagConfig& tagConfig, uint32_t exactlyonceConcurrency, bool forceFromBeginning); @@ -185,7 +186,8 @@ class LogFileReader { const std::string& hostLogPathFile, const DevInode& devInode, const FileReaderConfig& readerConfig, - const MultilineConfig& multilineConfig); + const MultilineConfig& multilineConfig, + const FileTagConfig& tagConfig); bool ReadLog(LogBuffer& logBuffer, const Event* event); time_t GetLastUpdateTime() const // actually it's the time whenever ReadLogs is called @@ -381,10 +383,14 @@ class LogFileReader { void SetDockerPath(const std::string& dockerBasePath, size_t dockerReplaceSize); - const std::vector& GetExtraTags() { return mExtraTags; } + const std::vector& GetTopicExtraTags() const { return mTopicExtraTags; } + + void SetTopicExtraTags(const std::vector& tags) { mTopicExtraTags = tags; } - void AddExtraTags(const std::vector& tags) { - mExtraTags.insert(mExtraTags.end(), tags.begin(), tags.end()); + const std::shared_ptr>& GetContainerExtraTags() { return mContainerExtraTags; } + + void SetContainerExtraTags(const std::shared_ptr> tags) { + mContainerExtraTags = tags; } QueueKey GetQueueKey() const { return mReaderConfig.second->GetProcessQueueKey(); } @@ -515,7 +521,10 @@ class LogFileReader { // mDockerPath is `/home/admin/access.log` // we should use mDockerPath to extract topic and set it to __tag__:__path__ std::string mDockerPath; - std::vector mExtraTags; + + // tags + std::vector mTopicExtraTags; + std::shared_ptr> mContainerExtraTags; // int32_t mCloseUnusedInterval; // PreciseTimestampConfig mPreciseTimestampConfig; @@ -525,6 +534,7 @@ class LogFileReader { FileReaderConfig mReaderConfig; MultilineConfig mMultilineConfig; + FileTagConfig mTagConfig; // int64_t mLogGroupKey = 0; // since reader is destructed after the corresponding pipeline is removed, pipeline context used in destructor diff --git a/core/models/PipelineEventGroup.h b/core/models/PipelineEventGroup.h index a83be6e61d..51e1e39967 100644 --- a/core/models/PipelineEventGroup.h +++ b/core/models/PipelineEventGroup.h @@ -38,26 +38,16 @@ enum class EventGroupMetaKey { LOG_FILE_PATH_RESOLVED, LOG_FILE_INODE, LOG_FORMAT, + LOG_FILE_OFFSET_KEY, HAS_PART_LOG, - K8S_CLUSTER_ID, - K8S_NODE_NAME, - K8S_NODE_IP, - K8S_NAMESPACE, - K8S_POD_UID, - K8S_POD_NAME, - CONTAINER_NAME, - CONTAINER_IP, - CONTAINER_IMAGE_NAME, - CONTAINER_IMAGE_ID, - PROMETHEUS_SCRAPE_DURATION, PROMETHEUS_SCRAPE_RESPONSE_SIZE, PROMETHEUS_SAMPLES_SCRAPED, PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC, PROMETHEUS_UP_STATE, - SOURCE_ID + SOURCE_ID, }; using GroupMetadata = std::map; diff --git a/core/monitor/LogFileProfiler.cpp b/core/monitor/LogFileProfiler.cpp index fd9b2ef7f3..e19ee197c4 100644 --- a/core/monitor/LogFileProfiler.cpp +++ b/core/monitor/LogFileProfiler.cpp @@ -27,8 +27,8 @@ #include "common/version.h" #include "file_server/ConfigManager.h" #include "logger/Logger.h" -#include "provider/Provider.h" #include "pipeline/queue/QueueKeyManager.h" +#include "provider/Provider.h" DEFINE_FLAG_INT32(profile_data_send_interval, "interval of send LogFile/DomainSocket profile data, seconds", 600); @@ -147,12 +147,12 @@ bool LogFileProfiler::GetProfileData(LogGroup& logGroup, LogStoreStatistic* stat contentPtr->set_key("read_avg_delay"); contentPtr->set_value(ToString(statistic->mReadCount == 0 ? 0 : statistic->mReadDelaySum / statistic->mReadCount)); - if (!statistic->mTags.empty()) { - const std::vector& extraTags = statistic->mTags; - for (size_t i = 0; i < extraTags.size(); ++i) { + if (statistic->mTags != nullptr && !statistic->mTags->empty()) { + auto extraTags = statistic->mTags; + for (size_t i = 0; i < extraTags->size(); ++i) { contentPtr = logPtr->add_contents(); - contentPtr->set_key(extraTags[i].key()); - contentPtr->set_value(extraTags[i].value()); + contentPtr->set_key((*extraTags)[i].key()); + contentPtr->set_value((*extraTags)[i].value()); } } @@ -277,7 +277,7 @@ void LogFileProfiler::AddProfilingData(const std::string& configName, const std::string& category, const std::string& convertedPath, const std::string& hostLogPath, - const std::vector& tags, + const std::shared_ptr>& tags, uint64_t readBytes, uint64_t skipBytes, uint64_t splitLines, @@ -331,7 +331,7 @@ void LogFileProfiler::AddProfilingData(const std::string& configName, category, convertedPath, hostLogPath, - empty, + make_shared>(empty), readBytes, skipBytes, splitLines, @@ -368,7 +368,7 @@ void LogFileProfiler::AddProfilingSkipBytes(const std::string& configName, const std::string& category, const std::string& convertedPath, const std::string& hostLogPath, - const std::vector& tags, + const std::shared_ptr>& tags, uint64_t skipBytes) { if (!hostLogPath.empty()) { // logstore statistics @@ -385,7 +385,12 @@ void LogFileProfiler::AddProfilingSkipBytes(const std::string& configName, LogStoreStatistic* statistic = NULL; if (hostLogPath.empty()) { std::vector empty; - statistic = new LogStoreStatistic(configName, projectName, category, convertedPath, hostLogPath, empty); + statistic = new LogStoreStatistic(configName, + projectName, + category, + convertedPath, + hostLogPath, + make_shared>(empty)); } else { statistic = new LogStoreStatistic(configName, projectName, category, convertedPath, hostLogPath, tags); } @@ -400,7 +405,7 @@ void LogFileProfiler::AddProfilingReadBytes(const std::string& configName, const std::string& category, const std::string& convertedPath, const std::string& hostLogPath, - const std::vector& tags, + const std::shared_ptr>& tags, uint64_t dev, uint64_t inode, uint64_t fileSize, @@ -421,7 +426,12 @@ void LogFileProfiler::AddProfilingReadBytes(const std::string& configName, LogStoreStatistic* statistic = NULL; if (hostLogPath.empty()) { std::vector empty; - statistic = new LogStoreStatistic(configName, projectName, category, convertedPath, hostLogPath, empty); + statistic = new LogStoreStatistic(configName, + projectName, + category, + convertedPath, + hostLogPath, + make_shared>(empty)); } else { statistic = new LogStoreStatistic(configName, projectName, category, convertedPath, hostLogPath, tags); } diff --git a/core/monitor/LogFileProfiler.h b/core/monitor/LogFileProfiler.h index fd29c2f185..f88d50ed79 100644 --- a/core/monitor/LogFileProfiler.h +++ b/core/monitor/LogFileProfiler.h @@ -15,12 +15,14 @@ */ #pragma once +#include #include -#include + +#include #include +#include #include -#include -#include + #include "protobuf/sls/sls_logs.pb.h" namespace sls_logs { @@ -42,7 +44,7 @@ class LogFileProfiler { const std::string& category, const std::string& convertedPath, const std::string& hostLogPath, - const std::vector& tags, + const std::shared_ptr>& tags, uint64_t readBytes, uint64_t skipBytes, uint64_t splitLines, @@ -58,7 +60,7 @@ class LogFileProfiler { const std::string& category, const std::string& convertedPath, const std::string& hostLogPath, - const std::vector& tags, + const std::shared_ptr>& tags, uint64_t skipBytes); void AddProfilingReadBytes(const std::string& configName, @@ -67,7 +69,7 @@ class LogFileProfiler { const std::string& category, const std::string& convertedPath, const std::string& host, - const std::vector& tags, + const std::shared_ptr>& tags, uint64_t dev, uint64_t inode, uint64_t fileSize, @@ -97,7 +99,7 @@ class LogFileProfiler { const std::string& category, const std::string& convertedPath, const std::string& hostLogPath, - const std::vector& tags, + const std::shared_ptr>& tags, uint64_t readBytes = 0, uint64_t skipBytes = 0, uint64_t splitLines = 0, @@ -167,7 +169,7 @@ class LogFileProfiler { std::string mCategory; std::string mConvertedPath; std::string mHostLogPath; - std::vector mTags; + std::shared_ptr> mTags; // how many bytes processed uint64_t mReadBytes; // how many bytes skiped diff --git a/core/observer/network/NetworkObserver.cpp b/core/observer/network/NetworkObserver.cpp index 003a4471c4..8f7a04d444 100644 --- a/core/observer/network/NetworkObserver.cpp +++ b/core/observer/network/NetworkObserver.cpp @@ -545,7 +545,7 @@ int NetworkObserver::OutputDirectly(std::vector& logs, const Pipe } sls_logs::LogGroup logGroup; sls_logs::LogTag* logTagPtr = logGroup.add_logtags(); - logTagPtr->set_key(LOG_RESERVED_KEY_HOSTNAME); + // logTagPtr->set_key(LOG_RESERVED_KEY_HOSTNAME); logTagPtr->set_value(LogFileProfiler::mHostname.substr(0, 99)); #ifdef __ENTERPRISE__ std::string userDefinedId = EnterpriseConfigProvider::GetInstance()->GetUserDefinedIdSet(); diff --git a/core/pipeline/GlobalConfig.cpp b/core/pipeline/GlobalConfig.cpp index 0b2a5879b3..81cdc37432 100644 --- a/core/pipeline/GlobalConfig.cpp +++ b/core/pipeline/GlobalConfig.cpp @@ -151,6 +151,30 @@ bool GlobalConfig::Init(const Json::Value& config, const PipelineContext& ctx, J } } + // PipelineMetaTagKey + if (!GetOptionalMapParam(config, "PipelineMetaTagKey", mPipelineMetaTagKey, errorMsg)) { + PARAM_WARNING_IGNORE(ctx.GetLogger(), + ctx.GetAlarm(), + errorMsg, + moduleName, + ctx.GetConfigName(), + ctx.GetProjectName(), + ctx.GetLogstoreName(), + ctx.GetRegion()); + } + + // AgentEnvMetaTagKey + if (!GetOptionalMapParam(config, "AgentEnvMetaTagKey", mAgentEnvMetaTagKey, errorMsg)) { + PARAM_WARNING_IGNORE(ctx.GetLogger(), + ctx.GetAlarm(), + errorMsg, + moduleName, + ctx.GetConfigName(), + ctx.GetProjectName(), + ctx.GetLogstoreName(), + ctx.GetRegion()); + } + return true; } diff --git a/core/pipeline/GlobalConfig.h b/core/pipeline/GlobalConfig.h index 56547c8685..942e606dee 100644 --- a/core/pipeline/GlobalConfig.h +++ b/core/pipeline/GlobalConfig.h @@ -38,6 +38,8 @@ struct GlobalConfig { uint32_t mProcessPriority = 0; bool mEnableTimestampNanosecond = false; bool mUsingOldContentTag = false; + Json::Value mPipelineMetaTagKey; + Json::Value mAgentEnvMetaTagKey; }; } // namespace logtail diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 45373db490..8c6fe1be1c 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -475,6 +475,8 @@ void Pipeline::CopyNativeGlobalParamToGoPipeline(Json::Value& pipeline) { Json::Value& global = pipeline["global"]; global["EnableTimestampNanosecond"] = mContext.GetGlobalConfig().mEnableTimestampNanosecond; global["UsingOldContentTag"] = mContext.GetGlobalConfig().mUsingOldContentTag; + global["PipelineMetaTagKey"] = mContext.GetGlobalConfig().mPipelineMetaTagKey; + global["AgentEnvMetaTagKey"] = mContext.GetGlobalConfig().mAgentEnvMetaTagKey; } } @@ -529,8 +531,7 @@ std::string Pipeline::GetNowPluginID() { PluginInstance::PluginMeta Pipeline::GenNextPluginMeta(bool lastOne) { mPluginID.fetch_add(1); - return PluginInstance::PluginMeta( - std::to_string(mPluginID.load())); + return PluginInstance::PluginMeta(std::to_string(mPluginID.load())); } void Pipeline::WaitAllItemsInProcessFinished() { diff --git a/core/plugin/input/InputFile.cpp b/core/plugin/input/InputFile.cpp index c5b0039a19..5af9c53eb8 100644 --- a/core/plugin/input/InputFile.cpp +++ b/core/plugin/input/InputFile.cpp @@ -115,6 +115,11 @@ bool InputFile::Init(const Json::Value& config, Json::Value& optionalGoPipeline) } } + // Tag + if (!mFileTag.Init(config, *mContext, sName, mEnableContainerDiscovery)) { + return false; + } + // MaxCheckpointDirSearchDepth if (!GetOptionalUIntParam(config, "MaxCheckpointDirSearchDepth", mMaxCheckpointDirSearchDepth, errorMsg)) { PARAM_WARNING_DEFAULT(mContext->GetLogger(), @@ -181,6 +186,7 @@ bool InputFile::Start() { FileServer::GetInstance()->AddFileDiscoveryConfig(mContext->GetConfigName(), &mFileDiscovery, mContext); FileServer::GetInstance()->AddFileReaderConfig(mContext->GetConfigName(), &mFileReader, mContext); FileServer::GetInstance()->AddMultilineConfig(mContext->GetConfigName(), &mMultiline, mContext); + FileServer::GetInstance()->AddFileTagConfig(mContext->GetConfigName(), &mFileTag, mContext); FileServer::GetInstance()->AddExactlyOnceConcurrency(mContext->GetConfigName(), mExactlyOnceConcurrency); return true; } @@ -192,6 +198,7 @@ bool InputFile::Stop(bool isPipelineRemoving) { FileServer::GetInstance()->RemoveFileDiscoveryConfig(mContext->GetConfigName()); FileServer::GetInstance()->RemoveFileReaderConfig(mContext->GetConfigName()); FileServer::GetInstance()->RemoveMultilineConfig(mContext->GetConfigName()); + FileServer::GetInstance()->RemoveFileTagConfig(mContext->GetConfigName()); FileServer::GetInstance()->RemoveExactlyOnceConcurrency(mContext->GetConfigName()); FileServer::GetInstance()->RemovePluginMetricManager(mContext->GetConfigName()); return true; @@ -206,7 +213,6 @@ bool InputFile::CreateInnerProcessors() { processor = PluginRegistry::GetInstance()->CreateProcessor( ProcessorSplitLogStringNative::sName, mContext->GetPipeline().GenNextPluginMeta(false)); detail["SplitChar"] = Json::Value('\0'); - detail["AppendingLogPositionMeta"] = Json::Value(mFileReader.mAppendingLogPositionMeta); } else if (mMultiline.IsMultiline()) { processor = PluginRegistry::GetInstance()->CreateProcessor( ProcessorSplitMultilineLogStringNative::sName, mContext->GetPipeline().GenNextPluginMeta(false)); @@ -214,7 +220,6 @@ bool InputFile::CreateInnerProcessors() { detail["StartPattern"] = Json::Value(mMultiline.mStartPattern); detail["ContinuePattern"] = Json::Value(mMultiline.mContinuePattern); detail["EndPattern"] = Json::Value(mMultiline.mEndPattern); - detail["AppendingLogPositionMeta"] = Json::Value(mFileReader.mAppendingLogPositionMeta); detail["IgnoringUnmatchWarning"] = Json::Value(mMultiline.mIgnoringUnmatchWarning); if (mMultiline.mUnmatchedContentTreatment == MultilineOptions::UnmatchedContentTreatment::DISCARD) { detail["UnmatchedContentTreatment"] = Json::Value("discard"); @@ -225,7 +230,6 @@ bool InputFile::CreateInnerProcessors() { } else { processor = PluginRegistry::GetInstance()->CreateProcessor( ProcessorSplitLogStringNative::sName, mContext->GetPipeline().GenNextPluginMeta(false)); - detail["AppendingLogPositionMeta"] = Json::Value(mFileReader.mAppendingLogPositionMeta); } if (!processor->Init(detail, *mContext)) { // should not happen diff --git a/core/plugin/input/InputFile.h b/core/plugin/input/InputFile.h index ee8275ef7c..7abf4bde84 100644 --- a/core/plugin/input/InputFile.h +++ b/core/plugin/input/InputFile.h @@ -20,10 +20,11 @@ #include "container_manager/ContainerDiscoveryOptions.h" #include "file_server/FileDiscoveryOptions.h" +#include "file_server/FileTagOptions.h" #include "file_server/MultilineOptions.h" +#include "file_server/reader/FileReaderOptions.h" #include "monitor/PluginMetricManager.h" #include "pipeline/plugin/interface/Input.h" -#include "file_server/reader/FileReaderOptions.h" namespace logtail { @@ -47,6 +48,7 @@ class InputFile : public Input { ContainerDiscoveryOptions mContainerDiscovery; FileReaderOptions mFileReader; MultilineOptions mMultiline; + FileTagOptions mFileTag; PluginMetricManagerPtr mPluginMetricManager; IntGaugePtr mMonitorFileTotal; // others diff --git a/core/plugin/processor/CommonParserOptions.cpp b/core/plugin/processor/CommonParserOptions.cpp index dd1a75308e..a7df26b4a1 100644 --- a/core/plugin/processor/CommonParserOptions.cpp +++ b/core/plugin/processor/CommonParserOptions.cpp @@ -16,8 +16,8 @@ #include "plugin/processor/CommonParserOptions.h" -#include "common/Constants.h" #include "common/ParamExtractor.h" +#include "common/TagConstants.h" #include "plugin/processor/inner/ProcessorParseContainerLogNative.h" using namespace std; @@ -95,14 +95,17 @@ bool CommonParserOptions::ShouldAddSourceContent(bool parseSuccess) { return (((parseSuccess && mKeepingSourceWhenParseSucceed) || (!parseSuccess && mKeepingSourceWhenParseFail))); } -bool CommonParserOptions::ShouldEraseEvent(bool parseSuccess, const LogEvent& sourceEvent) { +bool CommonParserOptions::ShouldEraseEvent(bool parseSuccess, + const LogEvent& sourceEvent, + const GroupMetadata& metadata) { if (!parseSuccess && !mKeepingSourceWhenParseFail) { if (sourceEvent.Empty()) { return true; } size_t size = sourceEvent.Size(); // "__file_offset__" - if (size == 1 && (sourceEvent.cbegin()->first == LOG_RESERVED_KEY_FILE_OFFSET)) { + auto offsetKey = metadata.find(EventGroupMetaKey::LOG_FILE_OFFSET_KEY); + if (size == 1 && (offsetKey != metadata.end() && sourceEvent.cbegin()->first == offsetKey->second)) { return true; } else if (size == 2 && sourceEvent.HasContent(ProcessorParseContainerLogNative::containerTimeKey) && sourceEvent.HasContent(ProcessorParseContainerLogNative::containerSourceKey)) { diff --git a/core/plugin/processor/CommonParserOptions.h b/core/plugin/processor/CommonParserOptions.h index b76345b37a..5cd6ebfd6d 100644 --- a/core/plugin/processor/CommonParserOptions.h +++ b/core/plugin/processor/CommonParserOptions.h @@ -36,7 +36,7 @@ struct CommonParserOptions { bool Init(const Json::Value& config, const PipelineContext& ctx, const std::string& pluginType); bool ShouldAddSourceContent(bool parseSuccess); bool ShouldAddLegacyUnmatchedRawLog(bool parseSuccess); - bool ShouldEraseEvent(bool parseSuccess, const LogEvent& sourceEvent); + bool ShouldEraseEvent(bool parseSuccess, const LogEvent& sourceEvent, const GroupMetadata& metadata); }; } // namespace logtail diff --git a/core/plugin/processor/ProcessorParseApsaraNative.cpp b/core/plugin/processor/ProcessorParseApsaraNative.cpp index 43d3854266..a8b7009838 100644 --- a/core/plugin/processor/ProcessorParseApsaraNative.cpp +++ b/core/plugin/processor/ProcessorParseApsaraNative.cpp @@ -98,7 +98,7 @@ void ProcessorParseApsaraNative::Process(PipelineEventGroup& logGroup) { size_t wIdx = 0; for (size_t rIdx = 0; rIdx < events.size(); ++rIdx) { - if (ProcessEvent(logPath, events[rIdx], cachedLogTime, timeStrCache)) { + if (ProcessEvent(logPath, events[rIdx], cachedLogTime, timeStrCache, logGroup.GetAllMetadata())) { if (wIdx != rIdx) { events[wIdx] = std::move(events[rIdx]); } @@ -120,7 +120,8 @@ void ProcessorParseApsaraNative::Process(PipelineEventGroup& logGroup) { bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, PipelineEventPtr& e, LogtailTime& cachedLogTime, - StringView& timeStrCache) { + StringView& timeStrCache, + const GroupMetadata& metadata) { if (!IsSupportedEvent(e)) { mOutFailedEventsTotal->Add(1); return true; @@ -167,7 +168,7 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, if (mCommonParserOptions.ShouldAddLegacyUnmatchedRawLog(false)) { AddLog(mCommonParserOptions.legacyUnmatchedRawLogKey, buffer, sourceEvent, false); } - if (mCommonParserOptions.ShouldEraseEvent(false, sourceEvent)) { + if (mCommonParserOptions.ShouldEraseEvent(false, sourceEvent, metadata)) { mDiscardedEventsTotal->Add(1); return false; } diff --git a/core/plugin/processor/ProcessorParseApsaraNative.h b/core/plugin/processor/ProcessorParseApsaraNative.h index 4bd77ee8e1..c954bcfa0c 100644 --- a/core/plugin/processor/ProcessorParseApsaraNative.h +++ b/core/plugin/processor/ProcessorParseApsaraNative.h @@ -41,8 +41,11 @@ class ProcessorParseApsaraNative : public Processor { bool IsSupportedEvent(const PipelineEventPtr& e) const override; private: - bool - ProcessEvent(const StringView& logPath, PipelineEventPtr& e, LogtailTime& lastLogTime, StringView& timeStrCache); + bool ProcessEvent(const StringView& logPath, + PipelineEventPtr& e, + LogtailTime& lastLogTime, + StringView& timeStrCache, + const GroupMetadata& metadata); void AddLog(const StringView& key, const StringView& value, LogEvent& targetEvent, bool overwritten = true); time_t ApsaraEasyReadLogTimeParser(StringView& buffer, StringView& timeStr, LogtailTime& lastLogTime, int64_t& microTime); diff --git a/core/plugin/processor/ProcessorParseDelimiterNative.cpp b/core/plugin/processor/ProcessorParseDelimiterNative.cpp index 4f9b44fdf8..6b5a881b76 100644 --- a/core/plugin/processor/ProcessorParseDelimiterNative.cpp +++ b/core/plugin/processor/ProcessorParseDelimiterNative.cpp @@ -195,7 +195,7 @@ void ProcessorParseDelimiterNative::Process(PipelineEventGroup& logGroup) { size_t wIdx = 0; for (size_t rIdx = 0; rIdx < events.size(); ++rIdx) { - if (ProcessEvent(logPath, events[rIdx])) { + if (ProcessEvent(logPath, events[rIdx], logGroup.GetAllMetadata())) { if (wIdx != rIdx) { events[wIdx] = std::move(events[rIdx]); } @@ -206,7 +206,9 @@ void ProcessorParseDelimiterNative::Process(PipelineEventGroup& logGroup) { return; } -bool ProcessorParseDelimiterNative::ProcessEvent(const StringView& logPath, PipelineEventPtr& e) { +bool ProcessorParseDelimiterNative::ProcessEvent(const StringView& logPath, + PipelineEventPtr& e, + const GroupMetadata& metadata) { if (!IsSupportedEvent(e)) { mOutFailedEventsTotal->Add(1); return true; @@ -241,7 +243,7 @@ bool ProcessorParseDelimiterNative::ProcessEvent(const StringView& logPath, Pipe mOutFailedEventsTotal->Add(1); return true; } - + size_t reserveSize = mOverflowedFieldsTreatment == OverflowedFieldsTreatment::EXTEND ? (mKeys.size() + 10) : (mKeys.size() + 1); std::vector columnValues; @@ -357,7 +359,7 @@ bool ProcessorParseDelimiterNative::ProcessEvent(const StringView& logPath, Pipe if (mCommonParserOptions.ShouldAddLegacyUnmatchedRawLog(parseSuccess)) { AddLog(mCommonParserOptions.legacyUnmatchedRawLogKey, buffer, sourceEvent, false); } - if (mCommonParserOptions.ShouldEraseEvent(parseSuccess, sourceEvent)) { + if (mCommonParserOptions.ShouldEraseEvent(parseSuccess, sourceEvent, metadata)) { mDiscardedEventsTotal->Add(1); return false; } diff --git a/core/plugin/processor/ProcessorParseDelimiterNative.h b/core/plugin/processor/ProcessorParseDelimiterNative.h index cebed7a582..34fc4aa328 100644 --- a/core/plugin/processor/ProcessorParseDelimiterNative.h +++ b/core/plugin/processor/ProcessorParseDelimiterNative.h @@ -62,7 +62,7 @@ class ProcessorParseDelimiterNative : public Processor { private: static const std::string s_mDiscardedFieldKey; - bool ProcessEvent(const StringView& logPath, PipelineEventPtr& e); + bool ProcessEvent(const StringView& logPath, PipelineEventPtr& e, const GroupMetadata& metadata); bool SplitString(const char* buffer, int32_t begIdx, int32_t endIdx, diff --git a/core/plugin/processor/ProcessorParseJsonNative.cpp b/core/plugin/processor/ProcessorParseJsonNative.cpp index 90b3d075c5..ccb197cc04 100644 --- a/core/plugin/processor/ProcessorParseJsonNative.cpp +++ b/core/plugin/processor/ProcessorParseJsonNative.cpp @@ -68,7 +68,7 @@ void ProcessorParseJsonNative::Process(PipelineEventGroup& logGroup) { size_t wIdx = 0; for (size_t rIdx = 0; rIdx < events.size(); ++rIdx) { - if (ProcessEvent(logPath, events[rIdx])) { + if (ProcessEvent(logPath, events[rIdx], logGroup.GetAllMetadata())) { if (wIdx != rIdx) { events[wIdx] = std::move(events[rIdx]); } @@ -78,7 +78,9 @@ void ProcessorParseJsonNative::Process(PipelineEventGroup& logGroup) { events.resize(wIdx); } -bool ProcessorParseJsonNative::ProcessEvent(const StringView& logPath, PipelineEventPtr& e) { +bool ProcessorParseJsonNative::ProcessEvent(const StringView& logPath, + PipelineEventPtr& e, + const GroupMetadata& metadata) { if (!IsSupportedEvent(e)) { mOutFailedEventsTotal->Add(1); return true; @@ -103,7 +105,7 @@ bool ProcessorParseJsonNative::ProcessEvent(const StringView& logPath, PipelineE if (mCommonParserOptions.ShouldAddLegacyUnmatchedRawLog(parseSuccess)) { AddLog(mCommonParserOptions.legacyUnmatchedRawLogKey, rawContent, sourceEvent, false); } - if (mCommonParserOptions.ShouldEraseEvent(parseSuccess, sourceEvent)) { + if (mCommonParserOptions.ShouldEraseEvent(parseSuccess, sourceEvent, metadata)) { mDiscardedEventsTotal->Add(1); return false; } diff --git a/core/plugin/processor/ProcessorParseJsonNative.h b/core/plugin/processor/ProcessorParseJsonNative.h index b9d1902696..4650bce6d2 100644 --- a/core/plugin/processor/ProcessorParseJsonNative.h +++ b/core/plugin/processor/ProcessorParseJsonNative.h @@ -39,9 +39,12 @@ class ProcessorParseJsonNative : public Processor { bool IsSupportedEvent(const PipelineEventPtr& e) const override; private: - bool JsonLogLineParser(LogEvent& sourceEvent, const StringView& logPath, PipelineEventPtr& e, bool& sourceKeyOverwritten); + bool JsonLogLineParser(LogEvent& sourceEvent, + const StringView& logPath, + PipelineEventPtr& e, + bool& sourceKeyOverwritten); void AddLog(const StringView& key, const StringView& value, LogEvent& targetEvent, bool overwritten = true); - bool ProcessEvent(const StringView& logPath, PipelineEventPtr& e); + bool ProcessEvent(const StringView& logPath, PipelineEventPtr& e, const GroupMetadata& metadata); static std::string RapidjsonValueToString(const rapidjson::Value& value); int* mParseFailures = nullptr; diff --git a/core/plugin/processor/ProcessorParseRegexNative.cpp b/core/plugin/processor/ProcessorParseRegexNative.cpp index f04d8ad0dd..1312a980ac 100644 --- a/core/plugin/processor/ProcessorParseRegexNative.cpp +++ b/core/plugin/processor/ProcessorParseRegexNative.cpp @@ -113,7 +113,7 @@ void ProcessorParseRegexNative::Process(PipelineEventGroup& logGroup) { size_t wIdx = 0; for (size_t rIdx = 0; rIdx < events.size(); ++rIdx) { - if (ProcessEvent(logPath, events[rIdx])) { + if (ProcessEvent(logPath, events[rIdx], logGroup.GetAllMetadata())) { if (wIdx != rIdx) { events[wIdx] = std::move(events[rIdx]); } @@ -128,7 +128,9 @@ bool ProcessorParseRegexNative::IsSupportedEvent(const PipelineEventPtr& e) cons return e.Is(); } -bool ProcessorParseRegexNative::ProcessEvent(const StringView& logPath, PipelineEventPtr& e) { +bool ProcessorParseRegexNative::ProcessEvent(const StringView& logPath, + PipelineEventPtr& e, + const GroupMetadata& metadata) { if (!IsSupportedEvent(e)) { mOutFailedEventsTotal->Add(1); return true; @@ -156,7 +158,7 @@ bool ProcessorParseRegexNative::ProcessEvent(const StringView& logPath, Pipeline if (mCommonParserOptions.ShouldAddLegacyUnmatchedRawLog(parseSuccess)) { AddLog(mCommonParserOptions.legacyUnmatchedRawLogKey, rawContent, sourceEvent, false); } - if (mCommonParserOptions.ShouldEraseEvent(parseSuccess, sourceEvent)) { + if (mCommonParserOptions.ShouldEraseEvent(parseSuccess, sourceEvent, metadata)) { mDiscardedEventsTotal->Add(1); return false; } diff --git a/core/plugin/processor/ProcessorParseRegexNative.h b/core/plugin/processor/ProcessorParseRegexNative.h index c2a8fc8d88..157178d6f7 100644 --- a/core/plugin/processor/ProcessorParseRegexNative.h +++ b/core/plugin/processor/ProcessorParseRegexNative.h @@ -17,7 +17,6 @@ #pragma once #include - #include #include "models/LogEvent.h" @@ -47,7 +46,7 @@ class ProcessorParseRegexNative : public Processor { private: /// @return false if data need to be discarded - bool ProcessEvent(const StringView& logPath, PipelineEventPtr& e); + bool ProcessEvent(const StringView& logPath, PipelineEventPtr& e, const GroupMetadata& metadata); bool WholeLineModeParser(LogEvent& sourceEvent, const std::string& key); bool RegexLogLineParser(LogEvent& sourceEvent, const boost::regex& reg, diff --git a/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp b/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp index 35716caf59..fe31914280 100644 --- a/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp +++ b/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp @@ -55,19 +55,6 @@ bool ProcessorSplitLogStringNative::Init(const Json::Value& config) { mSplitChar = static_cast(splitter); } - // AppendingLogPositionMeta - if (!GetOptionalBoolParam(config, "AppendingLogPositionMeta", mAppendingLogPositionMeta, errorMsg)) { - PARAM_WARNING_DEFAULT(mContext->GetLogger(), - mContext->GetAlarm(), - errorMsg, - mAppendingLogPositionMeta, - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); - } - mSplitLines = &(GetContext().GetProcessProfile().splitLines); return true; @@ -144,9 +131,11 @@ void ProcessorSplitLogStringNative::ProcessEvent(PipelineEventGroup& logGroup, ? sourceEvent.GetPosition().second - (content.data() - sourceVal.data()) : content.size() + 1; targetEvent->SetPosition(offset, length); - if (mAppendingLogPositionMeta) { + // offset tag + if (logGroup.HasMetadata(EventGroupMetaKey::LOG_FILE_OFFSET_KEY)) { StringBuffer offsetStr = logGroup.GetSourceBuffer()->CopyString(ToString(offset)); - targetEvent->SetContentNoCopy(LOG_RESERVED_KEY_FILE_OFFSET, StringView(offsetStr.data, offsetStr.size)); + targetEvent->SetContentNoCopy(logGroup.GetMetadata(EventGroupMetaKey::LOG_FILE_OFFSET_KEY), + StringView(offsetStr.data, offsetStr.size)); } // TODO: remove the following code after the flusher refactorization if (logGroup.GetExactlyOnceCheckpoint() != nullptr) { diff --git a/core/plugin/processor/inner/ProcessorSplitLogStringNative.h b/core/plugin/processor/inner/ProcessorSplitLogStringNative.h index 7b0febbd21..fba71c134c 100644 --- a/core/plugin/processor/inner/ProcessorSplitLogStringNative.h +++ b/core/plugin/processor/inner/ProcessorSplitLogStringNative.h @@ -30,7 +30,6 @@ class ProcessorSplitLogStringNative : public Processor { std::string mSourceKey = DEFAULT_CONTENT_KEY; char mSplitChar = '\n'; - bool mAppendingLogPositionMeta = false; const std::string& Name() const override { return sName; } bool Init(const Json::Value& config) override; diff --git a/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp b/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp index c5970b6dc2..14fe057678 100644 --- a/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp +++ b/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp @@ -51,19 +51,6 @@ bool ProcessorSplitMultilineLogStringNative::Init(const Json::Value& config) { return false; } - // AppendingLogPositionMeta - if (!GetOptionalBoolParam(config, "AppendingLogPositionMeta", mAppendingLogPositionMeta, errorMsg)) { - PARAM_WARNING_DEFAULT(mContext->GetLogger(), - mContext->GetAlarm(), - errorMsg, - mAppendingLogPositionMeta, - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); - } - mMatchedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_MATCHED_EVENTS_TOTAL); mMatchedLinesTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_MATCHED_LINES_TOTAL); mUnmatchedLinesTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_UNMATCHED_LINES_TOTAL); @@ -313,9 +300,11 @@ void ProcessorSplitMultilineLogStringNative::CreateNewEvent(const StringView& co auto const length = isLastLog ? sourceEvent.GetPosition().second - (content.data() - sourceVal.data()) : content.size() + 1; targetEvent->SetPosition(offset, length); - if (mAppendingLogPositionMeta) { + // offset tag + if (logGroup.HasMetadata(EventGroupMetaKey::LOG_FILE_OFFSET_KEY)) { StringBuffer offsetStr = logGroup.GetSourceBuffer()->CopyString(ToString(offset)); - targetEvent->SetContentNoCopy(LOG_RESERVED_KEY_FILE_OFFSET, StringView(offsetStr.data, offsetStr.size)); + targetEvent->SetContentNoCopy(logGroup.GetMetadata(EventGroupMetaKey::LOG_FILE_OFFSET_KEY), + StringView(offsetStr.data, offsetStr.size)); } // TODO: remove the following code after the flusher refactorization if (logGroup.GetExactlyOnceCheckpoint() != nullptr) { diff --git a/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h b/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h index 5fea3ea02c..40ad8f893e 100644 --- a/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h +++ b/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h @@ -32,7 +32,6 @@ class ProcessorSplitMultilineLogStringNative : public Processor { std::string mSourceKey = DEFAULT_CONTENT_KEY; MultilineOptions mMultiline; - bool mAppendingLogPositionMeta = false; const std::string& Name() const override { return sName; } bool Init(const Json::Value& config) override; diff --git a/core/plugin/processor/inner/ProcessorTagNative.cpp b/core/plugin/processor/inner/ProcessorTagNative.cpp index e9f9926339..61b86a0458 100644 --- a/core/plugin/processor/inner/ProcessorTagNative.cpp +++ b/core/plugin/processor/inner/ProcessorTagNative.cpp @@ -21,8 +21,9 @@ #include "app_config/AppConfig.h" #include "application/Application.h" #include "common/Flags.h" -#include "protobuf/sls/sls_logs.pb.h" +#include "common/ParamExtractor.h" #include "pipeline/Pipeline.h" +#include "protobuf/sls/sls_logs.pb.h" #ifdef __ENTERPRISE__ #include "config/provider/EnterpriseConfigProvider.h" #endif @@ -36,23 +37,48 @@ namespace logtail { const string ProcessorTagNative::sName = "processor_tag_native"; bool ProcessorTagNative::Init(const Json::Value& config) { + string errorMsg; + // PipelineMetaTagKey + if (!GetOptionalMapParam(config, "PipelineMetaTagKey", mPipelineMetaTagKey, errorMsg)) { + PARAM_WARNING_IGNORE(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } +#ifdef __ENTERPRISE__ + // AgentEnvMetaTagKey + const std::string key = "AgentEnvMetaTagKey"; + const Json::Value* itr = config.find(key.c_str(), key.c_str() + key.length()); + if (itr) { + mEnableAgentEnvMetaTag = true; + } + if (!GetOptionalMapParam(config, "AgentEnvMetaTagKey", mAgentEnvMetaTagKey, errorMsg)) { + PARAM_WARNING_IGNORE(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } +#endif return true; } void ProcessorTagNative::Process(PipelineEventGroup& logGroup) { - // group level - StringView filePath = logGroup.GetMetadata(EventGroupMetaKey::LOG_FILE_PATH); - if (!filePath.empty()) { - logGroup.SetTagNoCopy(LOG_RESERVED_KEY_PATH, filePath.substr(0, 511)); - } - - // process level #ifdef __ENTERPRISE__ string agentTag = EnterpriseConfigProvider::GetInstance()->GetUserDefinedIdSet(); if (!agentTag.empty()) { auto sb = logGroup.GetSourceBuffer()->CopyString(agentTag); - logGroup.SetTagNoCopy(LOG_RESERVED_KEY_USER_DEFINED_ID, StringView(sb.data, sb.size)); + addTagIfRequired(logGroup, "AGENT_TAG", AGENT_TAG_DEFAULT_KEY, StringView(sb.data, sb.size)); } +#else + addTagIfRequired(logGroup, "HOST_IP", HOST_IP_DEFAULT_KEY, LogFileProfiler::mIpAddr); #endif if (!STRING_FLAG(ALIYUN_LOG_FILE_TAGS).empty()) { @@ -68,14 +94,23 @@ void ProcessorTagNative::Process(PipelineEventGroup& logGroup) { return; } - // process level - logGroup.SetTagNoCopy(LOG_RESERVED_KEY_HOSTNAME, LogFileProfiler::mHostname); - logGroup.SetTagNoCopy(LOG_RESERVED_KEY_SOURCE, LogFileProfiler::mIpAddr); + addTagIfRequired(logGroup, "HOST_NAME", TagDefaultKey[TagKey::HOST_NAME], LogFileProfiler::mHostname); auto sb = logGroup.GetSourceBuffer()->CopyString(Application::GetInstance()->GetUUID()); logGroup.SetTagNoCopy(LOG_RESERVED_KEY_MACHINE_UUID, StringView(sb.data, sb.size)); static const vector& sEnvTags = AppConfig::GetInstance()->GetEnvTags(); if (!sEnvTags.empty()) { for (size_t i = 0; i < sEnvTags.size(); ++i) { +#ifdef __ENTERPRISE__ + if (mEnableAgentEnvMetaTag) { + auto envTagKey = sEnvTags[i].key(); + if (mAgentEnvMetaTagKey.find(envTagKey) != mAgentEnvMetaTagKey.end()) { + if (!mAgentEnvMetaTagKey[envTagKey].empty()) { + logGroup.SetTagNoCopy(mAgentEnvMetaTagKey[envTagKey], sEnvTags[i].value()); + } + } + continue; + } +#endif logGroup.SetTagNoCopy(sEnvTags[i].key(), sEnvTags[i].value()); } } @@ -85,4 +120,23 @@ bool ProcessorTagNative::IsSupportedEvent(const PipelineEventPtr& /*e*/) const { return true; } +void ProcessorTagNative::addTagIfRequired(PipelineEventGroup& logGroup, + const std::string& configKey, + const std::string& defaultKey, + const StringView& value) const { + auto it = mPipelineMetaTagKey.find(configKey); + if (it != mPipelineMetaTagKey.end()) { + if (!it->second.empty()) { + if (it->second == DEFAULT_CONFIG_TAG_KEY_VALUE) { + logGroup.SetTagNoCopy(defaultKey, value); + } else { + logGroup.SetTagNoCopy(it->second, value); + } + } + // emtpy value means not set + } else { + logGroup.SetTagNoCopy(defaultKey, value); + } +} + } // namespace logtail diff --git a/core/plugin/processor/inner/ProcessorTagNative.h b/core/plugin/processor/inner/ProcessorTagNative.h index 04676f07d8..e030e4ae7e 100644 --- a/core/plugin/processor/inner/ProcessorTagNative.h +++ b/core/plugin/processor/inner/ProcessorTagNative.h @@ -31,6 +31,17 @@ class ProcessorTagNative : public Processor { protected: bool IsSupportedEvent(const PipelineEventPtr& e) const override; +private: + void addTagIfRequired(PipelineEventGroup& logGroup, + const std::string& configKey, + const std::string& defaultKey, + const StringView& value) const; + std::unordered_map mPipelineMetaTagKey; +#ifdef __ENTERPRISE__ + bool mEnableAgentEnvMetaTag = false; + std::unordered_map mAgentEnvMetaTagKey; +#endif + #ifdef APSARA_UNIT_TEST_MAIN friend class ProcessorTagNativeUnittest; #endif diff --git a/core/runner/ProcessorRunner.cpp b/core/runner/ProcessorRunner.cpp index 336428997a..89e767185d 100644 --- a/core/runner/ProcessorRunner.cpp +++ b/core/runner/ProcessorRunner.cpp @@ -93,7 +93,8 @@ void ProcessorRunner::Run(uint32_t threadNo) { // thread local metrics should be initialized in each thread WriteMetrics::GetInstance()->PrepareMetricsRecordRef( - sMetricsRecordRef, {{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_PROCESSOR}, {"thread_no", ToString(threadNo)}}); + sMetricsRecordRef, + {{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_PROCESSOR}, {"thread_no", ToString(threadNo)}}); sInGroupsCnt = sMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_EVENT_GROUPS_TOTAL); sInEventsCnt = sMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_EVENTS_TOTAL); sInGroupDataSizeBytes = sMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_SIZE_BYTES); @@ -207,7 +208,8 @@ void ProcessorRunner::Run(uint32_t threadNo) { pipeline->GetContext().GetLogstoreName(), convertedPath, hostLogPath, - vector(), // warning: this cannot be recovered! + make_shared>( + vector()), // warning: this cannot be recovered! profile.readBytes, profile.skipBytes, profile.splitLines, diff --git a/core/unittest/input/InputFileUnittest.cpp b/core/unittest/input/InputFileUnittest.cpp index 17b334831b..43e589c4e1 100644 --- a/core/unittest/input/InputFileUnittest.cpp +++ b/core/unittest/input/InputFileUnittest.cpp @@ -21,10 +21,10 @@ #include "app_config/AppConfig.h" #include "common/JsonUtil.h" #include "file_server/FileServer.h" -#include "plugin/input/InputFile.h" #include "pipeline/Pipeline.h" #include "pipeline/PipelineContext.h" #include "pipeline/plugin/PluginRegistry.h" +#include "plugin/input/InputFile.h" #include "plugin/processor/inner/ProcessorSplitLogStringNative.h" #include "plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h" #include "plugin/processor/inner/ProcessorTagNative.h" @@ -257,7 +257,6 @@ void InputFileUnittest::TestCreateInnerProcessors() { auto plugin = static_cast(input->mInnerProcessors[0]->mPlugin.get()); APSARA_TEST_EQUAL(DEFAULT_CONTENT_KEY, plugin->mSourceKey); APSARA_TEST_EQUAL('\n', plugin->mSplitChar); - APSARA_TEST_TRUE(plugin->mAppendingLogPositionMeta); APSARA_TEST_EQUAL(ProcessorTagNative::sName, input->mInnerProcessors[1]->Name()); } { @@ -292,7 +291,6 @@ void InputFileUnittest::TestCreateInnerProcessors() { APSARA_TEST_TRUE(plugin->mMultiline.mIgnoringUnmatchWarning); APSARA_TEST_EQUAL(MultilineOptions::UnmatchedContentTreatment::DISCARD, plugin->mMultiline.mUnmatchedContentTreatment); - APSARA_TEST_TRUE(plugin->mAppendingLogPositionMeta); APSARA_TEST_EQUAL(ProcessorTagNative::sName, input->mInnerProcessors[1]->Name()); } { @@ -316,7 +314,6 @@ void InputFileUnittest::TestCreateInnerProcessors() { auto plugin = static_cast(input->mInnerProcessors[0]->mPlugin.get()); APSARA_TEST_EQUAL(DEFAULT_CONTENT_KEY, plugin->mSourceKey); APSARA_TEST_EQUAL('\0', plugin->mSplitChar); - APSARA_TEST_TRUE(plugin->mAppendingLogPositionMeta); APSARA_TEST_EQUAL(ProcessorTagNative::sName, input->mInnerProcessors[1]->Name()); ctx.SetIsFirstProcessorJsonFlag(false); } @@ -344,7 +341,6 @@ void InputFileUnittest::TestCreateInnerProcessors() { auto plugin = static_cast(input->mInnerProcessors[0]->mPlugin.get()); APSARA_TEST_EQUAL(DEFAULT_CONTENT_KEY, plugin->mSourceKey); APSARA_TEST_EQUAL('\0', plugin->mSplitChar); - APSARA_TEST_TRUE(plugin->mAppendingLogPositionMeta); APSARA_TEST_EQUAL(ProcessorTagNative::sName, input->mInnerProcessors[1]->Name()); ctx.SetIsFirstProcessorJsonFlag(false); } diff --git a/core/unittest/reader/FileReaderOptionsUnittest.cpp b/core/unittest/reader/FileReaderOptionsUnittest.cpp index 762370eda5..c89538b3b0 100644 --- a/core/unittest/reader/FileReaderOptionsUnittest.cpp +++ b/core/unittest/reader/FileReaderOptionsUnittest.cpp @@ -12,15 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include #include -#include - #include "common/Flags.h" #include "common/JsonUtil.h" -#include "pipeline/PipelineContext.h" #include "file_server/reader/FileReaderOptions.h" +#include "pipeline/PipelineContext.h" #include "unittest/Unittest.h" DECLARE_FLAG_INT32(default_tail_limit_kb); @@ -56,9 +56,9 @@ void FileReaderOptionsUnittest::OnSuccessfulInit() const { APSARA_TEST_EQUAL(static_cast(INT32_FLAG(default_reader_flush_timeout)), config->mFlushTimeoutSecs); APSARA_TEST_EQUAL(0U, config->mReadDelaySkipThresholdBytes); APSARA_TEST_EQUAL(static_cast(INT32_FLAG(delay_bytes_upperlimit)), config->mReadDelayAlertThresholdBytes); - APSARA_TEST_EQUAL(static_cast(INT32_FLAG(reader_close_unused_file_time)), config->mCloseUnusedReaderIntervalSec); + APSARA_TEST_EQUAL(static_cast(INT32_FLAG(reader_close_unused_file_time)), + config->mCloseUnusedReaderIntervalSec); APSARA_TEST_EQUAL(static_cast(INT32_FLAG(logreader_max_rotate_queue_size)), config->mRotatorQueueSize); - APSARA_TEST_FALSE(config->mAppendingLogPositionMeta); // valid optional param configStr = R"( @@ -70,8 +70,7 @@ void FileReaderOptionsUnittest::OnSuccessfulInit() const { "ReadDelaySkipThresholdBytes": 1000, "ReadDelayAlertThresholdBytes": 100, "CloseUnusedReaderIntervalSec": 10, - "RotatorQueueSize": 15, - "AppendingLogPositionMeta": true + "RotatorQueueSize": 15 } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); @@ -85,7 +84,6 @@ void FileReaderOptionsUnittest::OnSuccessfulInit() const { APSARA_TEST_EQUAL(100U, config->mReadDelayAlertThresholdBytes); APSARA_TEST_EQUAL(10U, config->mCloseUnusedReaderIntervalSec); APSARA_TEST_EQUAL(15U, config->mRotatorQueueSize); - APSARA_TEST_TRUE(config->mAppendingLogPositionMeta); // invalid optional param (except for FileEcoding) configStr = R"( @@ -97,8 +95,7 @@ void FileReaderOptionsUnittest::OnSuccessfulInit() const { "ReadDelaySkipThresholdBytes": "1000", "ReadDelayAlertThresholdBytes": "100", "CloseUnusedReaderIntervalSec": "10", - "RotatorQueueSize": "15", - "AppendingLogPositionMeta": "true" + "RotatorQueueSize": "15" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); @@ -110,9 +107,9 @@ void FileReaderOptionsUnittest::OnSuccessfulInit() const { APSARA_TEST_EQUAL(static_cast(INT32_FLAG(default_reader_flush_timeout)), config->mFlushTimeoutSecs); APSARA_TEST_EQUAL(0U, config->mReadDelaySkipThresholdBytes); APSARA_TEST_EQUAL(static_cast(INT32_FLAG(delay_bytes_upperlimit)), config->mReadDelayAlertThresholdBytes); - APSARA_TEST_EQUAL(static_cast(INT32_FLAG(reader_close_unused_file_time)), config->mCloseUnusedReaderIntervalSec); + APSARA_TEST_EQUAL(static_cast(INT32_FLAG(reader_close_unused_file_time)), + config->mCloseUnusedReaderIntervalSec); APSARA_TEST_EQUAL(static_cast(INT32_FLAG(logreader_max_rotate_queue_size)), config->mRotatorQueueSize); - APSARA_TEST_FALSE(config->mAppendingLogPositionMeta); // FileEncoding configStr = R"( diff --git a/pkg/config/global_config.go b/pkg/config/global_config.go index f5d3d13708..dab6f67804 100644 --- a/pkg/config/global_config.go +++ b/pkg/config/global_config.go @@ -47,6 +47,10 @@ type GlobalConfig struct { UsingOldContentTag bool EnableContainerdUpperDirDetect bool EnableSlsMetricsFormat bool + + PipelineMetaTagKey map[string]string + PipelineMetaTagKeyDefaultValue map[string]string + AgentEnvMetaTagKey map[string]string } // LoongcollectorGlobalConfig is the singleton instance of GlobalConfig. diff --git a/pkg/protocol/converter/converter.go b/pkg/protocol/converter/converter.go index 7b7e1282c7..5a428ec5eb 100644 --- a/pkg/protocol/converter/converter.go +++ b/pkg/protocol/converter/converter.go @@ -20,7 +20,6 @@ import ( "sync" "github.com/alibaba/ilogtail/pkg/config" - "github.com/alibaba/ilogtail/pkg/flags" "github.com/alibaba/ilogtail/pkg/models" "github.com/alibaba/ilogtail/pkg/protocol" ) @@ -49,25 +48,6 @@ const ( targetGroupMetadataPrefix = "metadata." ) -const ( - tagHostIP = "host.ip" - tagLogTopic = "log.topic" - tagLogFilePath = "log.file.path" - tagHostname = "host.name" - tagK8sNodeIP = "k8s.node.ip" - tagK8sNodeName = "k8s.node.name" - tagK8sNamespace = "k8s.namespace.name" - tagK8sPodName = "k8s.pod.name" - tagK8sPodIP = "k8s.pod.ip" - tagK8sPodUID = "k8s.pod.uid" - tagContainerName = "container.name" - tagContainerIP = "container.ip" - tagContainerImageName = "container.image.name" - tagK8sContainerName = "k8s.container.name" - tagK8sContainerIP = "k8s.container.ip" - tagK8sContainerImageName = "k8s.container.image.name" -) - // todo: make multiple pools for different size levels var byteBufPool = sync.Pool{ New: func() interface{} { @@ -76,27 +56,6 @@ var byteBufPool = sync.Pool{ }, } -var tagConversionMap = map[string]string{ - "__path__": tagLogFilePath, - "__hostname__": tagHostname, - "_node_ip_": tagK8sNodeIP, - "_node_name_": tagK8sNodeName, - "_namespace_": tagK8sNamespace, - "_pod_name_": tagK8sPodName, - "_pod_ip_": tagK8sPodIP, - "_pod_uid_": tagK8sPodUID, - "_container_name_": tagContainerName, - "_container_ip_": tagContainerIP, - "_image_name_": tagContainerImageName, -} - -// When in k8s, the following tags should be renamed to k8s-specific names. -var specialTagConversionMap = map[string]string{ - "_container_name_": tagK8sContainerName, - "_container_ip_": tagK8sContainerIP, - "_image_name_": tagK8sContainerImageName, -} - var supportedEncodingMap = map[string]map[string]bool{ ProtocolCustomSingle: { EncodingJSON: true, @@ -126,13 +85,12 @@ type Converter struct { Separator string IgnoreUnExpectedData bool OnlyContents bool - TagKeyRenameMap map[string]string ProtocolKeyRenameMap map[string]string GlobalConfig *config.GlobalConfig } func NewConverterWithSep(protocol, encoding, sep string, ignoreUnExpectedData bool, tagKeyRenameMap, protocolKeyRenameMap map[string]string, globalConfig *config.GlobalConfig) (*Converter, error) { - converter, err := NewConverter(protocol, encoding, tagKeyRenameMap, protocolKeyRenameMap, globalConfig) + converter, err := NewConverter(protocol, encoding, protocolKeyRenameMap, globalConfig) if err != nil { return nil, err } @@ -141,7 +99,7 @@ func NewConverterWithSep(protocol, encoding, sep string, ignoreUnExpectedData bo return converter, nil } -func NewConverter(protocol, encoding string, tagKeyRenameMap, protocolKeyRenameMap map[string]string, globalConfig *config.GlobalConfig) (*Converter, error) { +func NewConverter(protocol, encoding string, protocolKeyRenameMap map[string]string, globalConfig *config.GlobalConfig) (*Converter, error) { enc, ok := supportedEncodingMap[protocol] if !ok { return nil, fmt.Errorf("unsupported protocol: %s", protocol) @@ -152,7 +110,6 @@ func NewConverter(protocol, encoding string, tagKeyRenameMap, protocolKeyRenameM return &Converter{ Protocol: protocol, Encoding: encoding, - TagKeyRenameMap: tagKeyRenameMap, ProtocolKeyRenameMap: protocolKeyRenameMap, GlobalConfig: globalConfig, }, nil @@ -227,61 +184,18 @@ func TrimPrefix(str string) string { } } -func convertLogToMap(log *protocol.Log, logTags []*protocol.LogTag, src, topic string, tagKeyRenameMap map[string]string) (map[string]string, map[string]string) { +func convertLogToMap(log *protocol.Log, logTags []*protocol.LogTag, src, topic string) (map[string]string, map[string]string) { contents, tags := make(map[string]string), make(map[string]string) for _, logContent := range log.Contents { - switch logContent.Key { - case "__log_topic__": - addTagIfRequired(tags, tagKeyRenameMap, tagLogTopic, logContent.Value) - case tagPrefix + "__user_defined_id__": - continue - default: - var tagName string - if strings.HasPrefix(logContent.Key, tagPrefix) { - tagName = logContent.Key[len(tagPrefix):] - if _, ok := specialTagConversionMap[tagName]; *flags.K8sFlag && ok { - tagName = specialTagConversionMap[tagName] - } else if _, ok := tagConversionMap[tagName]; ok { - tagName = tagConversionMap[tagName] - } - } else { - if _, ok := specialTagConversionMap[logContent.Key]; *flags.K8sFlag && ok { - tagName = specialTagConversionMap[logContent.Key] - } else if _, ok := tagConversionMap[logContent.Key]; ok { - tagName = tagConversionMap[logContent.Key] - } - } - if len(tagName) != 0 { - addTagIfRequired(tags, tagKeyRenameMap, tagName, logContent.Value) - } else { - contents[logContent.Key] = logContent.Value - } - } + contents[logContent.Key] = logContent.Value } - for _, logTag := range logTags { - if logTag.Key == "__user_defined_id__" || logTag.Key == "__pack_id__" { - continue - } - - tagName := logTag.Key - if _, ok := specialTagConversionMap[logTag.Key]; *flags.K8sFlag && ok { - tagName = specialTagConversionMap[logTag.Key] - } else if _, ok := tagConversionMap[logTag.Key]; ok { - tagName = tagConversionMap[logTag.Key] - } - addTagIfRequired(tags, tagKeyRenameMap, tagName, logTag.Value) - } - - addTagIfRequired(tags, tagKeyRenameMap, tagHostIP, src) - if topic != "" { - addTagIfRequired(tags, tagKeyRenameMap, tagLogTopic, topic) + tags[logTag.Key] = logTag.Value } - return contents, tags } -func findTargetValues(targetFields []string, contents, tags, tagKeyRenameMap map[string]string) (map[string]string, error) { +func findTargetValues(targetFields []string, contents, tags map[string]string) (map[string]string, error) { if len(targetFields) == 0 { return nil, nil } @@ -296,8 +210,6 @@ func findTargetValues(targetFields []string, contents, tags, tagKeyRenameMap map case strings.HasPrefix(field, targetTagPrefix): if value, ok := tags[field[len(targetTagPrefix):]]; ok { desiredValue[field] = value - } else if value, ok := tagKeyRenameMap[field[len(targetTagPrefix):]]; ok { - desiredValue[field] = tags[value] } default: return nil, fmt.Errorf("unsupported field: %s", field) @@ -305,11 +217,3 @@ func findTargetValues(targetFields []string, contents, tags, tagKeyRenameMap map } return desiredValue, nil } - -func addTagIfRequired(tags, tagKeyRenameMap map[string]string, key, value string) { - if newKey, ok := tagKeyRenameMap[key]; ok && len(newKey) != 0 { - tags[newKey] = value - } else if !ok { - tags[key] = value - } -} diff --git a/pkg/protocol/converter/converter_single_log_flatten.go b/pkg/protocol/converter/converter_single_log_flatten.go index 9daf6000c5..0d5ab826a7 100644 --- a/pkg/protocol/converter/converter_single_log_flatten.go +++ b/pkg/protocol/converter/converter_single_log_flatten.go @@ -23,9 +23,9 @@ import ( func (c *Converter) ConvertToSingleProtocolLogsFlatten(logGroup *protocol.LogGroup, targetFields []string) ([]map[string]interface{}, []map[string]string, error) { convertedLogs, desiredValues := make([]map[string]interface{}, len(logGroup.Logs)), make([]map[string]string, len(logGroup.Logs)) for i, log := range logGroup.Logs { - contents, tags := convertLogToMap(log, logGroup.LogTags, logGroup.Source, logGroup.Topic, c.TagKeyRenameMap) + contents, tags := convertLogToMap(log, logGroup.LogTags, logGroup.Source, logGroup.Topic) - desiredValue, err := findTargetValues(targetFields, contents, tags, c.TagKeyRenameMap) + desiredValue, err := findTargetValues(targetFields, contents, tags) if err != nil { return nil, nil, err } diff --git a/pkg/protocol/converter/custom_single_log.go b/pkg/protocol/converter/custom_single_log.go index b20df66e0e..8013ef560a 100644 --- a/pkg/protocol/converter/custom_single_log.go +++ b/pkg/protocol/converter/custom_single_log.go @@ -32,9 +32,9 @@ const ( func (c *Converter) ConvertToSingleProtocolLogs(logGroup *protocol.LogGroup, targetFields []string) ([]map[string]interface{}, []map[string]string, error) { convertedLogs, desiredValues := make([]map[string]interface{}, len(logGroup.Logs)), make([]map[string]string, len(logGroup.Logs)) for i, log := range logGroup.Logs { - contents, tags := convertLogToMap(log, logGroup.LogTags, logGroup.Source, logGroup.Topic, c.TagKeyRenameMap) + contents, tags := convertLogToMap(log, logGroup.LogTags, logGroup.Source, logGroup.Topic) - desiredValue, err := findTargetValues(targetFields, contents, tags, c.TagKeyRenameMap) + desiredValue, err := findTargetValues(targetFields, contents, tags) if err != nil { return nil, nil, err } diff --git a/pkg/protocol/converter/otlp.go b/pkg/protocol/converter/otlp.go index 2bef2fadf4..0e8dac8540 100644 --- a/pkg/protocol/converter/otlp.go +++ b/pkg/protocol/converter/otlp.go @@ -65,8 +65,8 @@ func (c *Converter) ConvertToOtlpResourseLogs(logGroup *protocol.LogGroup, targe for i, log := range logGroup.Logs { logRecord := scopeLog.LogRecords().AppendEmpty() - contents, tags := convertLogToMap(log, logGroup.LogTags, logGroup.Source, logGroup.Topic, c.TagKeyRenameMap) - desiredValue, err := findTargetValues(targetFields, contents, tags, c.TagKeyRenameMap) + contents, tags := convertLogToMap(log, logGroup.LogTags, logGroup.Source, logGroup.Topic) + desiredValue, err := findTargetValues(targetFields, contents, tags) if err != nil { return rsLogs, nil, err } diff --git a/pluginmanager/plugin_runner_helper.go b/pluginmanager/plugin_runner_helper.go index be90760737..8e99f4e382 100644 --- a/pluginmanager/plugin_runner_helper.go +++ b/pluginmanager/plugin_runner_helper.go @@ -79,7 +79,7 @@ func flushOutStore[T FlushData, F FlusherWrapperInterface](lc *LogstoreConfig, s } func loadAdditionalTags(globalConfig *config.GlobalConfig) models.Tags { - tags := models.NewTagsWithKeyValues("__hostname__", util.GetHostName()) + tags := models.NewTagsWithKeyValues() for i := 0; i < len(helper.EnvTags); i += 2 { tags.Add(helper.EnvTags[i], helper.EnvTags[i+1]) } diff --git a/pluginmanager/plugin_runner_v1.go b/pluginmanager/plugin_runner_v1.go index 0e50eba6dd..95733cb204 100644 --- a/pluginmanager/plugin_runner_v1.go +++ b/pluginmanager/plugin_runner_v1.go @@ -231,6 +231,10 @@ func (p *pluginv1Runner) runProcessor() { func (p *pluginv1Runner) runProcessorInternal(cc *pipeline.AsyncControl) { defer panicRecover(p.LogstoreConfig.ConfigName) var logCtx *pipeline.LogWithContext + processorTag := ProcessorTag{ + PipelineMetaTagKey: p.LogstoreConfig.GlobalConfig.PipelineMetaTagKey, + AgentEnvMetaTagKey: p.LogstoreConfig.GlobalConfig.AgentEnvMetaTagKey, + } for { select { case <-cc.CancelToken(): @@ -238,6 +242,7 @@ func (p *pluginv1Runner) runProcessorInternal(cc *pipeline.AsyncControl) { return } case logCtx = <-p.LogsChan: + processorTag.ProcessV1(logCtx, p.LogstoreConfig.GlobalConfig) logs := []*protocol.Log{logCtx.Log} p.LogstoreConfig.Statistics.RawLogMetric.Add(int64(len(logs))) for _, processor := range p.ProcessorPlugins { @@ -315,17 +320,12 @@ func (p *pluginv1Runner) runFlusherInternal(cc *pipeline.AsyncControl) { } p.LogstoreConfig.Statistics.FlushLogGroupMetric.Add(int64(len(logGroups))) - // Add tags for each non-empty LogGroup, includes: default hostname tag, - // env tags and global tags in config. for _, logGroup := range logGroups { if len(logGroup.Logs) == 0 { continue } p.LogstoreConfig.Statistics.FlushLogMetric.Add(int64(len(logGroup.Logs))) logGroup.Source = util.GetIPAddress() - for key, value := range loadAdditionalTags(p.LogstoreConfig.GlobalConfig).Iterator() { - logGroup.LogTags = append(logGroup.LogTags, &protocol.LogTag{Key: key, Value: value}) - } } // Flush LogGroups to all flushers. diff --git a/pluginmanager/plugin_runner_v2.go b/pluginmanager/plugin_runner_v2.go index c9f9867d57..4652614b14 100644 --- a/pluginmanager/plugin_runner_v2.go +++ b/pluginmanager/plugin_runner_v2.go @@ -255,6 +255,10 @@ func (p *pluginv2Runner) runProcessorInternal(cc *pipeline.AsyncControl) { defer panicRecover(p.LogstoreConfig.ConfigName) pipeContext := p.ProcessPipeContext pipeChan := p.InputPipeContext.Collector().Observe() + processorTag := ProcessorTag{ + PipelineMetaTagKey: p.LogstoreConfig.GlobalConfig.PipelineMetaTagKey, + AgentEnvMetaTagKey: p.LogstoreConfig.GlobalConfig.AgentEnvMetaTagKey, + } for { select { case <-cc.CancelToken(): @@ -262,6 +266,7 @@ func (p *pluginv2Runner) runProcessorInternal(cc *pipeline.AsyncControl) { return } case group := <-pipeChan: + processorTag.ProcessV2(group, p.LogstoreConfig.GlobalConfig) p.LogstoreConfig.Statistics.RawLogMetric.Add(int64(len(group.Events))) pipeEvents := []*models.PipelineGroupEvents{group} for _, processor := range p.ProcessorPlugins { @@ -342,14 +347,11 @@ func (p *pluginv2Runner) runFlusherInternal(cc *pipeline.AsyncControl) { } p.LogstoreConfig.Statistics.FlushLogGroupMetric.Add(int64(len(data))) - // Add tags for each non-empty LogGroup, includes: default hostname tag, - // env tags and global tags in config. for _, item := range data { if len(item.Events) == 0 { continue } p.LogstoreConfig.Statistics.FlushLogMetric.Add(int64(len(item.Events))) - item.Group.GetTags().Merge(loadAdditionalTags(p.LogstoreConfig.GlobalConfig)) } // Flush LogGroups to all flushers. @@ -459,11 +461,22 @@ func (p *pluginv2Runner) ReceiveLogGroup(in pipeline.LogGroupWithContext) { meta.Add(ctxKeyTopic, topic) tags := models.NewTags() - for _, tag := range in.LogGroup.GetLogTags() { - tags.Add(tag.GetKey(), tag.GetValue()) - } - if len(topic) > 0 { - tags.Add(tagKeyLogTopic, topic) + if !p.LogstoreConfig.GlobalConfig.UsingOldContentTag { + for _, tag := range in.LogGroup.GetLogTags() { + tags.Add(tag.GetKey(), tag.GetValue()) + } + if len(topic) > 0 { + tags.Add(tagKeyLogTopic, topic) + } + } else { + for _, log := range in.LogGroup.GetLogs() { + for _, tag := range in.LogGroup.GetLogTags() { + log.Contents = append(log.Contents, &protocol.Log_Content{ + Key: tag.GetKey(), + Value: tag.GetValue(), + }) + } + } } group := models.NewGroup(meta, tags) diff --git a/pluginmanager/processor_tag_commuinty.go b/pluginmanager/processor_tag_commuinty.go new file mode 100644 index 0000000000..a6cb5d6d95 --- /dev/null +++ b/pluginmanager/processor_tag_commuinty.go @@ -0,0 +1,84 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !enterprise + +package pluginmanager + +import ( + "github.com/alibaba/ilogtail/pkg/config" + "github.com/alibaba/ilogtail/pkg/models" + "github.com/alibaba/ilogtail/pkg/pipeline" + "github.com/alibaba/ilogtail/pkg/util" +) + +const ( + hostNameDefaultTagKey = "host.name" + hostIPDefaultTagKey = "host.ip" + defaultConfigTagKeyValue = "__default__" +) + +// Processor interface cannot meet the requirements of tag processing, so we need to create a special ProcessorTag struct +type ProcessorTag struct { + PipelineMetaTagKey map[string]string + AgentEnvMetaTagKey map[string]string +} + +func (p *ProcessorTag) ProcessV1(logCtx *pipeline.LogWithContext, globalConfig *config.GlobalConfig) { + tags, ok := logCtx.Context["tags"] + if !ok { + return + } + tagsMap, ok := tags.(map[string]string) + if !ok { + return + } + p.addTagIfRequired("HOST_NAME", hostNameDefaultTagKey, util.GetHostName(), tagsMap) + p.addTagIfRequired("HOST_IP", hostIPDefaultTagKey, util.GetIPAddress(), tagsMap) + + // Add tags for each log, includes: default hostname tag, + // env tags and global tags in config. + for k, v := range loadAdditionalTags(globalConfig).Iterator() { + tagsMap[k] = v + } +} + +func (p *ProcessorTag) ProcessV2(in *models.PipelineGroupEvents, globalConfig *config.GlobalConfig) { + tagsMap := make(map[string]string) + p.addTagIfRequired("HOST_NAME", hostNameDefaultTagKey, util.GetHostName(), tagsMap) + p.addTagIfRequired("HOST_IP", hostIPDefaultTagKey, util.GetIPAddress(), tagsMap) + for k, v := range tagsMap { + in.Group.Tags.Add(k, v) + } + + // Add tags for each log, includes: default hostname tag, + // env tags and global tags in config. + for k, v := range loadAdditionalTags(globalConfig).Iterator() { + in.Group.Tags.Add(k, v) + } +} + +func (p *ProcessorTag) addTagIfRequired(configKey, defaultKey, value string, tags map[string]string) { + if key, ok := p.PipelineMetaTagKey[configKey]; ok { + if key != "" { + if key == defaultConfigTagKeyValue { + tags[defaultKey] = value + } else { + tags[key] = value + } + } + } else { + tags[defaultKey] = value + } +} diff --git a/pluginmanager/processor_tag_enterprise.go b/pluginmanager/processor_tag_enterprise.go new file mode 100644 index 0000000000..deda1187ea --- /dev/null +++ b/pluginmanager/processor_tag_enterprise.go @@ -0,0 +1,98 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build enterprise + +package pluginmanager + +import ( + "github.com/alibaba/ilogtail/pkg/config" + "github.com/alibaba/ilogtail/pkg/models" + "github.com/alibaba/ilogtail/pkg/pipeline" + "github.com/alibaba/ilogtail/pkg/util" +) + +const ( + hostNameDefaultTagKey = "__hostname__" + agentTagDefaultTagKey = "__user_defined_id__" + defaultConfigTagKeyValue = "__default__" +) + +// Processor interface cannot meet the requirements of tag processing, so we need to create a special ProcessorTag struct +type ProcessorTag struct { + PipelineMetaTagKey map[string]string + AgentEnvMetaTagKey map[string]string +} + +func (p *ProcessorTag) ProcessV1(logCtx *pipeline.LogWithContext, globalConfig *config.GlobalConfig) { + tags, ok := logCtx.Context["tags"] + if !ok { + return + } + tagsMap, ok := tags.(map[string]string) + if !ok { + return + } + p.addTagIfRequired("HOST_NAME", hostNameDefaultTagKey, util.GetHostName(), tagsMap) + + // Add tags for each log, includes: default hostname tag, + // env tags and global tags in config. + for k, v := range loadAdditionalTags(globalConfig).Iterator() { + if p.AgentEnvMetaTagKey != nil { + if newK, ok := p.AgentEnvMetaTagKey[k]; ok { + if newK != "" { + tagsMap[newK] = v + } + } + } else { + tagsMap[k] = v + } + } +} + +func (p *ProcessorTag) ProcessV2(in *models.PipelineGroupEvents, globalConfig *config.GlobalConfig) { + tagsMap := make(map[string]string) + p.addTagIfRequired("HOST_NAME", util.GetHostName(), tagsMap) + + // Add tags for each log, includes: default hostname tag, + // env tags and global tags in config. + for k, v := range loadAdditionalTags(globalConfig).Iterator() { + if p.AgentEnvMetaTagKey != nil { + if newK, ok := p.AgentEnvMetaTagKey[k]; ok { + if newK != "" { + tagsMap[newK] = v + } + } + } else { + tagsMap[k] = v + } + } + for k, v := range tagsMap { + in.Group.Tags.Add(k, v) + } +} + +func (p *ProcessorTag) addTagIfRequired(configKey, defaultKey, value string, tags map[string]string) { + if key, ok := p.PipelineMetaTagKey[configKey]; ok { + if key != "" { + if key == defaultConfigTagKeyValue { + tags[defaultKey] = value + } else { + tags[key] = value + } + } + } else { + tags[defaultKey] = value + } +} diff --git a/plugins/flusher/clickhouse/flusher_clickhouse.go b/plugins/flusher/clickhouse/flusher_clickhouse.go index cada16ffe3..bdbb099b98 100644 --- a/plugins/flusher/clickhouse/flusher_clickhouse.go +++ b/plugins/flusher/clickhouse/flusher_clickhouse.go @@ -73,8 +73,6 @@ type FlusherClickHouse struct { } type convertConfig struct { - // Rename one or more fields from tags. - TagFieldsRename map[string]string // Rename one or more fields, The protocol field options can only be: contents, tags, time ProtocolFieldsRename map[string]string // Convert protocol, default value: custom_single @@ -173,8 +171,8 @@ func (f *FlusherClickHouse) Validate() error { func (f *FlusherClickHouse) getConverter() (*converter.Converter, error) { logger.Debug(f.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", f.Convert.Protocol, - "Encoding", f.Convert.Encoding, "TagFieldsRename", f.Convert.TagFieldsRename, "ProtocolFieldsRename", f.Convert.ProtocolFieldsRename) - return converter.NewConverter(f.Convert.Protocol, f.Convert.Encoding, f.Convert.TagFieldsRename, f.Convert.ProtocolFieldsRename, f.context.GetPipelineScopeConfig()) + "Encoding", f.Convert.Encoding, "ProtocolFieldsRename", f.Convert.ProtocolFieldsRename) + return converter.NewConverter(f.Convert.Protocol, f.Convert.Encoding, f.Convert.ProtocolFieldsRename, f.context.GetPipelineScopeConfig()) } func (f *FlusherClickHouse) Flush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error { diff --git a/plugins/flusher/elasticsearch/flusher_elasticsearch.go b/plugins/flusher/elasticsearch/flusher_elasticsearch.go index e1fe5a3c89..39f9e8e117 100644 --- a/plugins/flusher/elasticsearch/flusher_elasticsearch.go +++ b/plugins/flusher/elasticsearch/flusher_elasticsearch.go @@ -57,8 +57,6 @@ type HTTPConfig struct { } type convertConfig struct { - // Rename one or more fields from tags. - TagFieldsRename map[string]string // Rename one or more fields, The protocol field options can only be: contents, tags, time ProtocolFieldsRename map[string]string // Convert protocol, default value: custom_single @@ -148,8 +146,8 @@ func (f *FlusherElasticSearch) Validate() error { func (f *FlusherElasticSearch) getConverter() (*converter.Converter, error) { logger.Debug(f.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", f.Convert.Protocol, - "Encoding", f.Convert.Encoding, "TagFieldsRename", f.Convert.TagFieldsRename, "ProtocolFieldsRename", f.Convert.ProtocolFieldsRename) - return converter.NewConverter(f.Convert.Protocol, f.Convert.Encoding, f.Convert.TagFieldsRename, f.Convert.ProtocolFieldsRename, f.context.GetPipelineScopeConfig()) + "Encoding", f.Convert.Encoding, "ProtocolFieldsRename", f.Convert.ProtocolFieldsRename) + return converter.NewConverter(f.Convert.Protocol, f.Convert.Encoding, f.Convert.ProtocolFieldsRename, f.context.GetPipelineScopeConfig()) } func (f *FlusherElasticSearch) getIndexKeys() ([]string, bool, error) { diff --git a/plugins/flusher/kafkav2/flusher_kafka_v2.go b/plugins/flusher/kafkav2/flusher_kafka_v2.go index a537cb9136..2d9f957663 100644 --- a/plugins/flusher/kafkav2/flusher_kafka_v2.go +++ b/plugins/flusher/kafkav2/flusher_kafka_v2.go @@ -149,8 +149,6 @@ type metaRetryConfig struct { } type convertConfig struct { - // Rename one or more fields from tags. - TagFieldsRename map[string]string // Rename one or more fields, The protocol field options can only be: contents, tags, time ProtocolFieldsRename map[string]string // Convert protocol, default value: custom_single @@ -564,8 +562,8 @@ func (k *FlusherKafka) makeHeaders() []sarama.RecordHeader { } func (k *FlusherKafka) getConverter() (*converter.Converter, error) { - logger.Debug(k.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", k.Convert.Protocol, "Encoding", k.Convert.Encoding, "TagFieldsRename", k.Convert.TagFieldsRename, "ProtocolFieldsRename", k.Convert.ProtocolFieldsRename) - return converter.NewConverter(k.Convert.Protocol, k.Convert.Encoding, k.Convert.TagFieldsRename, k.Convert.ProtocolFieldsRename, k.context.GetPipelineScopeConfig()) + logger.Debug(k.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", k.Convert.Protocol, "Encoding", k.Convert.Encoding, "ProtocolFieldsRename", k.Convert.ProtocolFieldsRename) + return converter.NewConverter(k.Convert.Protocol, k.Convert.Encoding, k.Convert.ProtocolFieldsRename, k.context.GetPipelineScopeConfig()) } func init() { diff --git a/plugins/flusher/loki/flusher_loki.go b/plugins/flusher/loki/flusher_loki.go index 6b3f4cdba0..519d7c11db 100644 --- a/plugins/flusher/loki/flusher_loki.go +++ b/plugins/flusher/loki/flusher_loki.go @@ -63,8 +63,6 @@ type FlusherLoki struct { } type convertConfig struct { - // Rename one or more fields from tags. - TagFieldsRename map[string]string // Rename one or more fields, The protocol field options can only be: contents, tags, time ProtocolFieldsRename map[string]string // Convert protocol, default value: custom_single @@ -179,8 +177,8 @@ func (f *FlusherLoki) Stop() error { func (f *FlusherLoki) getConverter() (*converter.Converter, error) { logger.Debug(f.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", f.Convert.Protocol, - "Encoding", f.Convert.Encoding, "TagFieldsRename", f.Convert.TagFieldsRename, "ProtocolFieldsRename", f.Convert.ProtocolFieldsRename) - cvt, err := converter.NewConverter(f.Convert.Protocol, f.Convert.Encoding, f.Convert.TagFieldsRename, f.Convert.ProtocolFieldsRename, f.context.GetPipelineScopeConfig()) + "Encoding", f.Convert.Encoding, "ProtocolFieldsRename", f.Convert.ProtocolFieldsRename) + cvt, err := converter.NewConverter(f.Convert.Protocol, f.Convert.Encoding, f.Convert.ProtocolFieldsRename, f.context.GetPipelineScopeConfig()) if err != nil { return nil, err } diff --git a/plugins/flusher/opentelemetry/flusher_otlp.go b/plugins/flusher/opentelemetry/flusher_otlp.go index fce944339e..c358e7d003 100644 --- a/plugins/flusher/opentelemetry/flusher_otlp.go +++ b/plugins/flusher/opentelemetry/flusher_otlp.go @@ -115,7 +115,7 @@ func (f *FlusherOTLP) Init(ctx pipeline.Context) error { func (f *FlusherOTLP) getConverter() (*converter.Converter, error) { switch f.Version { case v1: - return converter.NewConverter(converter.ProtocolOtlpV1, converter.EncodingNone, nil, nil, f.context.GetPipelineScopeConfig()) + return converter.NewConverter(converter.ProtocolOtlpV1, converter.EncodingNone, nil, f.context.GetPipelineScopeConfig()) default: return nil, fmt.Errorf("unsupported otlp log protocol version : %s", f.Version) } diff --git a/plugins/flusher/pulsar/flusher_pulsar.go b/plugins/flusher/pulsar/flusher_pulsar.go index 80d84fb73e..d40987421f 100644 --- a/plugins/flusher/pulsar/flusher_pulsar.go +++ b/plugins/flusher/pulsar/flusher_pulsar.go @@ -84,8 +84,6 @@ type FlusherPulsar struct { selectFields []string } type convertConfig struct { - // Rename one or more fields from tags. - TagFieldsRename map[string]string // Rename one or more fields, The protocol field options can only be: contents, tags, time ProtocolFieldsRename map[string]string // Convert protocol, default value: custom_single @@ -246,8 +244,8 @@ func (f *FlusherPulsar) Stop() error { func (f *FlusherPulsar) getConverter() (*converter.Converter, error) { logger.Debug(f.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", f.Convert.Protocol, - "Encoding", f.Convert.Encoding, "TagFieldsRename", f.Convert.TagFieldsRename, "ProtocolFieldsRename", f.Convert.ProtocolFieldsRename) - return converter.NewConverter(f.Convert.Protocol, f.Convert.Encoding, f.Convert.TagFieldsRename, f.Convert.ProtocolFieldsRename, f.context.GetPipelineScopeConfig()) + "Encoding", f.Convert.Encoding, "ProtocolFieldsRename", f.Convert.ProtocolFieldsRename) + return converter.NewConverter(f.Convert.Protocol, f.Convert.Encoding, f.Convert.ProtocolFieldsRename, f.context.GetPipelineScopeConfig()) } func (f *FlusherPulsar) initClientOptions() pulsar.ClientOptions { diff --git a/plugins/input/docker/logmeta/metric_container_info.go b/plugins/input/docker/logmeta/metric_container_info.go index 392bea353a..d71537df73 100644 --- a/plugins/input/docker/logmeta/metric_container_info.go +++ b/plugins/input/docker/logmeta/metric_container_info.go @@ -221,10 +221,9 @@ func (idf *InputDockerFile) addMappingToLogtail(info *helper.DockerInfoDetail, c cmd.Tags = append(cmd.Tags, val) } // info.ContainerNameTag - cmd.MetaDatas = make([]string, 0, len(info.ContainerNameTag)*2) for key, val := range info.ContainerNameTag { - cmd.MetaDatas = append(cmd.MetaDatas, key) - cmd.MetaDatas = append(cmd.MetaDatas, val) + cmd.Tags = append(cmd.MetaDatas, key) + cmd.Tags = append(cmd.MetaDatas, val) } cmd.Mounts = make([]Mount, 0, len(containerInfo.Mounts)) for _, mount := range containerInfo.Mounts {