Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support to process tag #1806

Merged
merged 29 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ class AppConfig {
friend class EnterpriseSLSClientManagerUnittest;
friend class FlusherRunnerUnittest;
friend class PipelineUpdateUnittest;
friend class ProcessorTagNativeUnittest;
#endif
};

Expand Down
35 changes: 28 additions & 7 deletions core/collection_pipeline/CollectionPipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,13 @@ bool CollectionPipeline::Init(CollectionConfig&& config) {
if (!mContext.InitGlobalConfig(*config.mGlobal, extendedParams)) {
return false;
}
// extended global param includes: tag config
AddExtendedGlobalParamToGoPipeline(extendedParams, mGoPipelineWithInput);
AddExtendedGlobalParamToGoPipeline(extendedParams, mGoPipelineWithoutInput);
}
CopyNativeGlobalParamToGoPipeline(mGoPipelineWithInput);
CopyNativeGlobalParamToGoPipeline(mGoPipelineWithoutInput);

if (config.ShouldAddNativeTagProcessor()) {
LOG_INFO(sLogger, ("add tag processor", "native"));
if (config.ShouldAddProcessorTagNative()) {
unique_ptr<ProcessorInstance> processor
= PluginRegistry::GetInstance()->CreateProcessor(ProcessorTagNative::sName, GenNextPluginMeta(false));
Json::Value detail;
Expand All @@ -242,14 +240,14 @@ bool CollectionPipeline::Init(CollectionConfig&& config) {
// should not happen
return false;
}
mPipelineInnerProcessor.emplace_back(std::move(processor));
mPipelineInnerProcessorLine.emplace_back(std::move(processor));
} else {
// processor tag requires tags as input, so it is a special processor, cannot add as plugin
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
if (!mGoPipelineWithInput.isNull()) {
mGoPipelineWithInput["global"]["EnableProcessorTag"] = true;
CopyTagParamToGoPipeline(mGoPipelineWithInput, config.mGlobal);
}
if (!mGoPipelineWithoutInput.isNull()) {
mGoPipelineWithoutInput["global"]["EnableProcessorTag"] = true;
CopyTagParamToGoPipeline(mGoPipelineWithoutInput, config.mGlobal);
}
}

Expand Down Expand Up @@ -402,7 +400,7 @@ void CollectionPipeline::Process(vector<PipelineEventGroup>& logGroupList, size_
for (auto& p : mInputs[inputIndex]->GetInnerProcessors()) {
p->Process(logGroupList);
}
for (auto& p : mPipelineInnerProcessor) {
for (auto& p : mPipelineInnerProcessorLine) {
p->Process(logGroupList);
}
for (auto& p : mProcessorLine) {
Expand Down Expand Up @@ -520,6 +518,29 @@ void CollectionPipeline::CopyNativeGlobalParamToGoPipeline(Json::Value& pipeline
}
}

void CollectionPipeline::CopyTagParamToGoPipeline(Json::Value& root, const Json::Value* config) {
if (!root.isNull()) {
Json::Value& global = root["global"];
root["global"]["EnableProcessorTag"] = true;
if (config == nullptr) {
return;
}
// PipelineMetaTagKey
const string pipelineMetaTagKey = "PipelineMetaTagKey";
const Json::Value* itr
= config->find(pipelineMetaTagKey.c_str(), pipelineMetaTagKey.c_str() + pipelineMetaTagKey.length());
if (itr) {
global["PipelineMetaTagKey"] = *itr;
}
// AgentMetaTagKey
const string agentMetaTagKey = "AgentMetaTagKey";
itr = config->find(agentMetaTagKey.c_str(), agentMetaTagKey.c_str() + agentMetaTagKey.length());
if (itr) {
global["AgentMetaTagKey"] = *itr;
}
}
}

bool CollectionPipeline::LoadGoPipelines() const {
if (!mGoPipelineWithoutInput.isNull()) {
string content = mGoPipelineWithoutInput.toStyledString();
Expand Down
3 changes: 2 additions & 1 deletion core/collection_pipeline/CollectionPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,13 @@ class CollectionPipeline {
const std::string& module,
Json::Value& dst);
void CopyNativeGlobalParamToGoPipeline(Json::Value& root);
void CopyTagParamToGoPipeline(Json::Value& root, const Json::Value* config);
bool ShouldAddPluginToGoPipelineWithInput() const { return mInputs.empty() && mProcessorLine.empty(); }
void WaitAllItemsInProcessFinished();

std::string mName;
std::vector<std::unique_ptr<InputInstance>> mInputs;
std::vector<std::unique_ptr<ProcessorInstance>> mPipelineInnerProcessor;
std::vector<std::unique_ptr<ProcessorInstance>> mPipelineInnerProcessorLine;
std::vector<std::unique_ptr<ProcessorInstance>> mProcessorLine;
std::vector<std::unique_ptr<FlusherInstance>> mFlushers;
Router mRouter;
Expand Down
12 changes: 7 additions & 5 deletions core/collection_pipeline/GlobalConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
#include "collection_pipeline/GlobalConfig.h"

#include <string>
#include <unordered_map>

#include "json/json.h"

#include "collection_pipeline/CollectionPipelineContext.h"
#include "collection_pipeline/queue/ProcessQueueManager.h"
Expand All @@ -27,8 +24,13 @@ using namespace std;

namespace logtail {

const unordered_set<string> GlobalConfig::sNativeParam
= {"TopicType", "TopicFormat", "Priority", "EnableTimestampNanosecond", "UsingOldContentTag"};
const unordered_set<string> GlobalConfig::sNativeParam = {"TopicType",
"TopicFormat",
"Priority",
"EnableTimestampNanosecond",
"UsingOldContentTag",
"PipelineMetaTagKey",
"AgentMetaTagKey"};

bool GlobalConfig::Init(const Json::Value& config, const CollectionPipelineContext& ctx, Json::Value& extendedParams) {
const string moduleName = "global";
Expand Down
3 changes: 0 additions & 3 deletions core/collection_pipeline/GlobalConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
#include <cstdint>

#include <string>
#include <unordered_map>
#include <unordered_set>

#include "json/json.h"

#include "constants/TagConstants.h"

namespace logtail {

class CollectionPipelineContext;
Expand Down
28 changes: 12 additions & 16 deletions core/common/ParamExtractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@

#include "common/ParamExtractor.h"

#include <unordered_map>

#include "boost/regex.hpp"

#include "constants/TagConstants.h"

using namespace std;

namespace logtail {
Expand Down Expand Up @@ -196,12 +192,12 @@ bool IsValidMap(const Json::Value& config, const string& key, string& errorMsg)
return true;
}

void ParseDefaultAddedTag(const Json::Value* config,
const string& configField,
const string& defaultTagKeyValue,
const CollectionPipelineContext& context,
const string& pluginType,
string& customTagKey) {
static void ParseDefaultAddedTag(const Json::Value* config,
const string& configField,
const string& defaultTagKeyValue,
const CollectionPipelineContext& context,
const string& pluginType,
string& customTagKey) {
string errorMsg;
customTagKey = DEFAULT_CONFIG_TAG_KEY_VALUE;
if (config && config->isMember(configField)) {
Expand All @@ -224,12 +220,12 @@ void ParseDefaultAddedTag(const Json::Value* config,
}
}

void ParseOptionalTag(const Json::Value* config,
const string& configField,
const string& defaultTagKeyValue,
const CollectionPipelineContext& context,
const string& pluginType,
string& customTagKey) {
static void ParseOptionalTag(const Json::Value* config,
const string& configField,
const string& defaultTagKeyValue,
const CollectionPipelineContext& context,
const string& pluginType,
string& customTagKey) {
string errorMsg;
if (config && config->isMember(configField)) {
if (!GetOptionalStringParam(*config, "Tags." + configField, customTagKey, errorMsg)) {
Expand Down
2 changes: 1 addition & 1 deletion core/config/CollectionConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ struct CollectionConfig {
return mHasGoFlusher || ShouldNativeFlusherConnectedByGoPipeline();
}

bool ShouldAddNativeTagProcessor() const { return mHasNativeProcessor || (mHasNativeInput && !mHasGoProcessor); }
bool ShouldAddProcessorTagNative() const { return mHasNativeProcessor || (mHasNativeInput && !mHasGoProcessor); }

// bool IsProcessRunnerInvolved() const {
// // 长期过渡使用,待C++部分的时序聚合能力与Go持平后恢复下面的正式版
Expand Down
2 changes: 1 addition & 1 deletion core/constants/TagConstants.cpp
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ const string& GetDefaultTagKeyString(TagKey key) {
const string DEFAULT_CONFIG_TAG_KEY_VALUE = "__default__";

////////////////////////// LOG ////////////////////////
const string DEFAULT_LOG_TAG_HOST_NAME = "__hostname__";
const string DEFAULT_LOG_TAG_NAMESPACE = "_namespace_";
const string DEFAULT_LOG_TAG_POD_NAME = "_pod_name_";
const string DEFAULT_LOG_TAG_POD_UID = "_pod_uid_";
const string DEFAULT_LOG_TAG_CONTAINER_NAME = "_container_name_";
const string DEFAULT_LOG_TAG_CONTAINER_IP = "_container_ip_";
const string DEFAULT_LOG_TAG_IMAGE_NAME = "_image_name_";
const string DEFAULT_LOG_TAG_HOST_NAME = "__hostname__";
const string DEFAULT_LOG_TAG_FILE_OFFSET = "__file_offset__";
const string DEFAULT_LOG_TAG_FILE_INODE = "__inode__";
const string DEFAULT_LOG_TAG_FILE_PATH = "__path__";
Expand Down
3 changes: 1 addition & 2 deletions core/constants/TagConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

#pragma once
#include <string>
#include <vector>

namespace logtail {

enum TagKey : int {
enum TagKey {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
FILE_OFFSET_KEY,
FILE_INODE_TAG_KEY,
FILE_PATH_TAG_KEY,
Expand Down
11 changes: 7 additions & 4 deletions core/file_server/ContainerInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,14 @@ bool ContainerInfo::ParseByJSONObj(const Json::Value& params, ContainerInfo& con
if (tags[i].isString() && tags[i - 1].isString()) {
std::string key = tags[i - 1].asString();
std::string value = tags[i].asString();
// 老版本或者容器元信息
if (isOldCheckpoint && containerNameTag.find(key) != containerNameTag.end()) {
containerInfo.AddMetadata(key, value);
} else {
if (isOldCheckpoint) {
containerInfo.mTags.emplace_back(key, value);
} else {
if (containerNameTag.find(key) != containerNameTag.end()) {
containerInfo.AddMetadata(key, value);
} else {
containerInfo.mTags.emplace_back(key, value);
}
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions core/file_server/FileTagOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

#include "file_server/FileTagOptions.h"

#include <unistd.h>

#include "collection_pipeline/CollectionPipelineContext.h"
#include "common/ParamExtractor.h"
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
#include "constants/TagConstants.h"
Expand Down Expand Up @@ -106,10 +104,14 @@ StringView FileTagOptions::GetFileTagKeyName(TagKey key) const {

bool FileTagOptions::EnableLogPositionMeta() {
auto offsetIter = mFileTags.find(TagKey::FILE_OFFSET_KEY);
bool enableFileOffset = offsetIter != mFileTags.end() && !offsetIter->second.empty();
if (offsetIter != mFileTags.end() && !offsetIter->second.empty()) {
return true;
}
auto inodeIter = mFileTags.find(TagKey::FILE_INODE_TAG_KEY);
bool enableFileInode = inodeIter != mFileTags.end() && !inodeIter->second.empty();
return enableFileOffset || enableFileInode;
if (inodeIter != mFileTags.end() && !inodeIter->second.empty()) {
return true;
}
return false;
}

} // namespace logtail
6 changes: 0 additions & 6 deletions core/plugin/processor/inner/ProcessorTagNative.cpp
Copy link
Collaborator

@henryzhx8 henryzhx8 Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 整体实现需要重新看一下文档
  2. 这个插件应该是pipeline级别的了
  3. C++和Go串联的情况下,应该全部由C++这边来完成处理,Go应该啥都不做,只有Aggregator的Topic是个例外

Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ bool ProcessorTagNative::Init(const Json::Value& config) {
ParseTagKey(tagConfig, "HOST_IP", TagKey::HOST_IP_TAG_KEY, mPipelineMetaTagKey, *mContext, sName, true);
#endif

#ifdef __ENTERPRISE__
// AgentEnvMetaTagKey
const std::string envTagKey = "AgentEnvMetaTagKey";
const Json::Value* itr = config.find(envTagKey.c_str(), envTagKey.c_str() + envTagKey.length());
Expand All @@ -90,7 +89,6 @@ bool ProcessorTagNative::Init(const Json::Value& config) {
mContext->GetRegion());
}
}
#endif
return true;
}

Expand Down Expand Up @@ -125,7 +123,6 @@ void ProcessorTagNative::Process(PipelineEventGroup& logGroup) {
static const vector<sls_logs::LogTag>& sEnvTags = AppConfig::GetInstance()->GetEnvTags();
if (!sEnvTags.empty()) {
for (size_t i = 0; i < sEnvTags.size(); ++i) {
#ifdef __ENTERPRISE__
if (mAgentEnvMetaTagKey.empty() && mAppendingAllEnvMetaTag) {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
logGroup.SetTagNoCopy(sEnvTags[i].key(), sEnvTags[i].value());
} else {
Expand All @@ -137,9 +134,6 @@ void ProcessorTagNative::Process(PipelineEventGroup& logGroup) {
}
}
}
#else
logGroup.SetTagNoCopy(sEnvTags[i].key(), sEnvTags[i].value());
#endif
}
}

Expand Down
2 changes: 0 additions & 2 deletions core/plugin/processor/inner/ProcessorTagNative.h
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ class ProcessorTagNative : public Processor {
void AddTag(PipelineEventGroup& logGroup, TagKey tagKey, const std::string& value) const;
void AddTag(PipelineEventGroup& logGroup, TagKey tagKey, StringView value) const;
std::unordered_map<TagKey, std::string> mPipelineMetaTagKey;
#ifdef __ENTERPRISE__
// After unmarshalling from json, we cannot determine the map is empty or no such config
bool mAppendingAllEnvMetaTag = false;
std::unordered_map<std::string, std::string> mAgentEnvMetaTagKey;
#endif

#ifdef APSARA_UNIT_TEST_MAIN
friend class ProcessorTagNativeUnittest;
Expand Down
Loading
Loading