diff --git a/core/file_server/ContainerInfo.cpp b/core/file_server/ContainerInfo.cpp index d903bd5c47..0f2b3ff0ed 100644 --- a/core/file_server/ContainerInfo.cpp +++ b/core/file_server/ContainerInfo.cpp @@ -28,6 +28,9 @@ const std::vector containerNameTag = { "_namespace_", "_pod_uid_", "_container_ip_", + "_k8s_image_name_", + "_k8s_container_name_", + "_k8s_container_ip_", }; const std::vector containerNameTagKey = { @@ -37,6 +40,9 @@ const std::vector containerNameTagKey = { TagKey::K8S_NAMESPACE_TAG_KEY, TagKey::K8S_POD_UID_TAG_KEY, TagKey::CONTAINER_IP_TAG_KEY, + TagKey::K8S_CONTAINER_IMAGE_NAME_TAG_KEY, + TagKey::K8S_CONTAINER_NAME_TAG_KEY, + TagKey::K8S_CONTAINER_IP_TAG_KEY, }; bool ContainerInfo::ParseAllByJSONObj(const Json::Value& paramsAll, @@ -68,7 +74,6 @@ bool ContainerInfo::ParseAllByJSONObj(const Json::Value& paramsAll, } bool ContainerInfo::ParseByJSONObj(const Json::Value& params, ContainerInfo& containerInfo, std::string& errorMsg) { - bool isOldCheckpoint = !params.isMember("MetaDatas"); containerInfo.mJson = params; if (params.isMember("ID") && params["ID"].isString()) { if (params["ID"].empty()) { diff --git a/core/file_server/FileTagOptions.cpp b/core/file_server/FileTagOptions.cpp index a862ee7b34..6316d3c03a 100644 --- a/core/file_server/FileTagOptions.cpp +++ b/core/file_server/FileTagOptions.cpp @@ -58,10 +58,10 @@ bool FileTagOptions::Init(const Json::Value& config, } } - // the priority of FileOffsetKey and FileOffsetTagKey is higher than appendingLogPositionMeta + // the priority of FileOffsetKey and FileInodeTagKey is higher than appendingLogPositionMeta if (config.isMember("FileOffsetKey") || tagConfig->isMember("FileOffsetTagKey")) { parseDefaultNotAddTag(config, "FileOffsetKey", TagKey::FILE_OFFSET_KEY, context, pluginType); - parseDefaultNotAddTag(config, "FileOffsetTagKey", TagKey::FILE_OFFSET_KEY, context, pluginType); + parseDefaultNotAddTag(config, "FileInodeTagKey", TagKey::FILE_INODE_TAG_KEY, context, pluginType); } else if (appendingLogPositionMeta) { mFileTags[TagKey::FILE_OFFSET_KEY] = TagDefaultKey[TagKey::FILE_OFFSET_KEY]; mFileTags[TagKey::FILE_INODE_TAG_KEY] = TagDefaultKey[TagKey::FILE_INODE_TAG_KEY]; @@ -82,6 +82,15 @@ bool FileTagOptions::Init(const Json::Value& config, return true; } +StringView FileTagOptions::GetFileTagKeyName(TagKey key) const { + auto it = mFileTags.find(key); + if (it != mFileTags.end()) { + // FileTagOption will not be deconstructed or changed before all event be sent + return StringView(it->second.c_str(), it->second.size()); + } + return StringView(); +} + void FileTagOptions::parseDefaultAddTag(const Json::Value& config, const std::string& keyName, const TagKey& keyEnum, diff --git a/core/file_server/FileTagOptions.h b/core/file_server/FileTagOptions.h index 86a70c92fd..40910bc138 100644 --- a/core/file_server/FileTagOptions.h +++ b/core/file_server/FileTagOptions.h @@ -31,13 +31,7 @@ class FileTagOptions { const PipelineContext& context, const std::string& pluginType, bool enableContainerDiscovery); - StringView GetFileTagKeyName(TagKey key) const { - if (mFileTags.find(key) != mFileTags.end()) { - // FileTagOption will not be deconstructed or changed before all event be sent - return StringView(mFileTags.at(key).c_str(), mFileTags.at(key).size()); - } - return StringView(); - } + StringView GetFileTagKeyName(TagKey key) const; private: diff --git a/core/file_server/reader/LogFileReader.cpp b/core/file_server/reader/LogFileReader.cpp index 3b9be5f92e..5144258c0d 100644 --- a/core/file_server/reader/LogFileReader.cpp +++ b/core/file_server/reader/LogFileReader.cpp @@ -2446,9 +2446,11 @@ void LogFileReader::SetEventGroupMetaAndTag(PipelineEventGroup& group) { group.SetMetadata(EventGroupMetaKey::LOG_FILE_INODE, ToString(GetDevInode().inode)); } group.SetMetadata(EventGroupMetaKey::SOURCE_ID, GetSourceId()); - auto offsetKey = mTagConfig.first->GetFileTagKeyName(TagKey::FILE_OFFSET_KEY); - if (!offsetKey.empty()) { - group.SetMetadata(EventGroupMetaKey::LOG_FILE_OFFSET_KEY, offsetKey); + if (mTagConfig.first != nullptr) { + auto offsetKey = mTagConfig.first->GetFileTagKeyName(TagKey::FILE_OFFSET_KEY); + if (!offsetKey.empty()) { + group.SetMetadata(EventGroupMetaKey::LOG_FILE_OFFSET_KEY, offsetKey); + } } // we store info which users can see in tags @@ -2463,16 +2465,22 @@ void LogFileReader::SetEventGroupMetaAndTag(PipelineEventGroup& group) { } // 3. container name tag, external k8s env/label tag auto containerExtraTags = GetContainerExtraTags(); - for (size_t i = 0; i < containerExtraTags->size(); ++i) { - auto key = ContainerInfo::GetFileTagKey((*containerExtraTags)[i].key()); - if (key != TagKey::UNKOWN) { // container name tag - auto keyName = mTagConfig.first->GetFileTagKeyName(key); - if (!keyName.empty()) { + if (containerExtraTags) { + for (size_t i = 0; i < containerExtraTags->size(); ++i) { + auto key = ContainerInfo::GetFileTagKey((*containerExtraTags)[i].key()); + if (key != TagKey::UNKOWN) { // container name tag StringBuffer b = group.GetSourceBuffer()->CopyString((*containerExtraTags)[i].value()); - group.SetTagNoCopy(keyName, StringView(b.data, b.size)); + if (mTagConfig.first == nullptr) { // no tag config + group.SetTagNoCopy(TagDefaultKey[key], StringView(b.data, b.size)); + } else { + auto keyName = mTagConfig.first->GetFileTagKeyName(key); + if (!keyName.empty()) { + group.SetTagNoCopy(keyName, StringView(b.data, b.size)); + } + } + } else { // external k8s env/label tag + group.SetTag((*containerExtraTags)[i].key(), (*containerExtraTags)[i].value()); } - } else { // external k8s env/label tag - group.SetTag((*containerExtraTags)[i].key(), (*containerExtraTags)[i].value()); } } // 4. inode diff --git a/core/models/PipelineEventGroup.cpp b/core/models/PipelineEventGroup.cpp index 4989c7c229..be8fec3b5d 100644 --- a/core/models/PipelineEventGroup.cpp +++ b/core/models/PipelineEventGroup.cpp @@ -189,6 +189,7 @@ const string EVENT_GROUP_META_LOG_FILE_PATH_RESOLVED = "log.file.path_resolved"; const string EVENT_GROUP_META_LOG_FILE_INODE = "log.file.inode"; const string EVENT_GROUP_META_CONTAINER_TYPE = "container.type"; const string EVENT_GROUP_META_HAS_PART_LOG = "has.part.log"; +const string EVENT_GROUP_META_LOG_FILE_OFFSET = "log.file.offset"; const string EVENT_GROUP_META_K8S_CLUSTER_ID = "k8s.cluster.id"; const string EVENT_GROUP_META_K8S_NODE_NAME = "k8s.node.name"; @@ -219,6 +220,8 @@ const string& EventGroupMetaKeyToString(EventGroupMetaKey key) { return EVENT_GROUP_META_CONTAINER_TYPE; case EventGroupMetaKey::HAS_PART_LOG: return EVENT_GROUP_META_HAS_PART_LOG; + case EventGroupMetaKey::LOG_FILE_OFFSET_KEY: + return EVENT_GROUP_META_LOG_FILE_OFFSET; default: static string sEmpty = "unknown"; return sEmpty; diff --git a/core/pipeline/GlobalConfig.h b/core/pipeline/GlobalConfig.h index 942e606dee..3eeeda78b4 100644 --- a/core/pipeline/GlobalConfig.h +++ b/core/pipeline/GlobalConfig.h @@ -20,6 +20,7 @@ #include #include +#include #include namespace logtail { @@ -38,8 +39,8 @@ struct GlobalConfig { uint32_t mProcessPriority = 0; bool mEnableTimestampNanosecond = false; bool mUsingOldContentTag = false; - Json::Value mPipelineMetaTagKey; - Json::Value mAgentEnvMetaTagKey; + std::unordered_map mPipelineMetaTagKey; + std::unordered_map mAgentEnvMetaTagKey; }; } // namespace logtail diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 8c6fe1be1c..4250f22f86 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -475,8 +475,16 @@ void Pipeline::CopyNativeGlobalParamToGoPipeline(Json::Value& pipeline) { Json::Value& global = pipeline["global"]; global["EnableTimestampNanosecond"] = mContext.GetGlobalConfig().mEnableTimestampNanosecond; global["UsingOldContentTag"] = mContext.GetGlobalConfig().mUsingOldContentTag; - global["PipelineMetaTagKey"] = mContext.GetGlobalConfig().mPipelineMetaTagKey; - global["AgentEnvMetaTagKey"] = mContext.GetGlobalConfig().mAgentEnvMetaTagKey; + Json::Value pipelineMetaTagKey; + for (const auto& kv : mContext.GetGlobalConfig().mPipelineMetaTagKey) { + pipelineMetaTagKey[kv.first] = kv.second; + } + global["PipelineMetaTagKey"] = pipelineMetaTagKey; + Json::Value agentEnvMetaTagKey; + for (const auto& kv : mContext.GetGlobalConfig().mAgentEnvMetaTagKey) { + agentEnvMetaTagKey[kv.first] = kv.second; + } + global["AgentEnvMetaTagKey"] = agentEnvMetaTagKey; } } diff --git a/core/pipeline/serializer/SLSSerializer.cpp b/core/pipeline/serializer/SLSSerializer.cpp index c17d7e4d01..29f8b64e65 100644 --- a/core/pipeline/serializer/SLSSerializer.cpp +++ b/core/pipeline/serializer/SLSSerializer.cpp @@ -14,7 +14,6 @@ #include "pipeline/serializer/SLSSerializer.h" -#include "application/Application.h" #include "common/Flags.h" #include "common/TimeUtil.h" #include "common/compression/CompressType.h" diff --git a/core/pipeline/serializer/SLSSerializer.h b/core/pipeline/serializer/SLSSerializer.h index c9bdb56798..2184862e99 100644 --- a/core/pipeline/serializer/SLSSerializer.h +++ b/core/pipeline/serializer/SLSSerializer.h @@ -19,6 +19,7 @@ #include #include +#include "common/TagConstants.h" #include "pipeline/serializer/Serializer.h" namespace logtail { @@ -38,8 +39,10 @@ struct CompressedLogGroup { CompressedLogGroup(std::string&& data, size_t rawSize) : mData(std::move(data)), mRawSize(rawSize) {} }; -template<> -bool Serializer>::DoSerialize(std::vector&& p, std::string& output, std::string& errorMsg); +template <> +bool Serializer>::DoSerialize(std::vector&& p, + std::string& output, + std::string& errorMsg); class SLSEventGroupListSerializer : public Serializer> { public: diff --git a/core/plugin/input/InputFile.cpp b/core/plugin/input/InputFile.cpp index 5af9c53eb8..f35640f3d0 100644 --- a/core/plugin/input/InputFile.cpp +++ b/core/plugin/input/InputFile.cpp @@ -116,8 +116,21 @@ bool InputFile::Init(const Json::Value& config, Json::Value& optionalGoPipeline) } // Tag - if (!mFileTag.Init(config, *mContext, sName, mEnableContainerDiscovery)) { - return false; + const char* tagKey = "Tags"; + const Json::Value* tagItr = config.find(tagKey, tagKey + strlen(tagKey)); + if (tagItr) { + if (!tagItr->isObject()) { + PARAM_WARNING_IGNORE(mContext->GetLogger(), + mContext->GetAlarm(), + "param Tags is not of type object", + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } else if (!mFileTag.Init(*tagItr, *mContext, sName, mEnableContainerDiscovery)) { + return false; + } } // MaxCheckpointDirSearchDepth diff --git a/core/unittest/event_handler/ModifyHandlerUnittest.cpp b/core/unittest/event_handler/ModifyHandlerUnittest.cpp index 490ed7890a..f16bce7c80 100644 --- a/core/unittest/event_handler/ModifyHandlerUnittest.cpp +++ b/core/unittest/event_handler/ModifyHandlerUnittest.cpp @@ -118,8 +118,12 @@ class ModifyHandlerUnittest : public ::testing::Test { ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(0, 0, ctx); // build a reader - mReaderPtr = std::make_shared( - gRootDir, gLogName, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + mReaderPtr = std::make_shared(gRootDir, + gLogName, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); mReaderPtr->UpdateReaderManual(); APSARA_TEST_TRUE_FATAL(mReaderPtr->CheckFileSignatureAndOffset(true)); @@ -140,6 +144,7 @@ class ModifyHandlerUnittest : public ::testing::Test { FileDiscoveryOptions discoveryOpts; FileReaderOptions readerOpts; MultilineOptions multilineOpts; + FileTagOptions tagOpts; PipelineContext ctx; FileDiscoveryConfig mConfig; @@ -214,8 +219,12 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() { std::string logPath1 = logPath + ".1"; writeLog(logPath1, "a sample log\n"); auto devInode1 = GetFileDevInode(logPath1); - auto reader1 = std::make_shared( - gRootDir, basicLogName, devInode1, std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + auto reader1 = std::make_shared(gRootDir, + basicLogName, + devInode1, + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader1->mRealLogPath = logPath1; reader1->mLastFileSignatureSize = sigSize; reader1->mLastFileSignatureHash = sigHash; @@ -223,8 +232,12 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() { std::string logPath2 = logPath + ".2"; writeLog(logPath2, "a sample log\n"); auto devInode2 = GetFileDevInode(logPath2); - auto reader2 = std::make_shared( - gRootDir, basicLogName, devInode2, std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + auto reader2 = std::make_shared(gRootDir, + basicLogName, + devInode2, + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader2->mRealLogPath = logPath2; reader2->mLastFileSignatureSize = sigSize; reader2->mLastFileSignatureHash = sigHash; @@ -240,8 +253,12 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() { std::string logPath3 = logPath + ".3"; writeLog(logPath3, "a sample log\n"); auto devInode3 = GetFileDevInode(logPath3); - auto reader3 = std::make_shared( - gRootDir, basicLogName, devInode3, std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + auto reader3 = std::make_shared(gRootDir, + basicLogName, + devInode3, + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader3->mRealLogPath = logPath3; reader3->mLastFileSignatureSize = sigSize; reader3->mLastFileSignatureHash = sigHash; @@ -249,8 +266,12 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() { std::string logPath4 = logPath + ".4"; writeLog(logPath4, "a sample log\n"); auto devInode4 = GetFileDevInode(logPath4); - auto reader4 = std::make_shared( - gRootDir, basicLogName, devInode4, std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + auto reader4 = std::make_shared(gRootDir, + basicLogName, + devInode4, + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader4->mRealLogPath = logPath4; reader4->mLastFileSignatureSize = sigSize; reader4->mLastFileSignatureHash = sigHash; @@ -269,6 +290,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() { std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx), std::make_pair(&discoveryOpts, &ctx), + std::make_pair(&tagOpts, &ctx), 0, false); // recover reader from checkpoint, random order @@ -278,6 +300,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() { std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx), std::make_pair(&discoveryOpts, &ctx), + std::make_pair(&tagOpts, &ctx), 0, false); handlerPtr->CreateLogFileReaderPtr(gRootDir, @@ -286,6 +309,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() { std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx), std::make_pair(&discoveryOpts, &ctx), + std::make_pair(&tagOpts, &ctx), 0, false); handlerPtr->CreateLogFileReaderPtr(gRootDir, @@ -294,6 +318,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() { std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx), std::make_pair(&discoveryOpts, &ctx), + std::make_pair(&tagOpts, &ctx), 0, false); handlerPtr->CreateLogFileReaderPtr(gRootDir, @@ -302,6 +327,7 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpoint() { std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx), std::make_pair(&discoveryOpts, &ctx), + std::make_pair(&tagOpts, &ctx), 0, false); APSARA_TEST_EQUAL_FATAL(handlerPtr->mNameReaderMap.size(), 1); diff --git a/core/unittest/flusher/FlusherSLSUnittest.cpp b/core/unittest/flusher/FlusherSLSUnittest.cpp index a760f5b8cd..05249245d8 100644 --- a/core/unittest/flusher/FlusherSLSUnittest.cpp +++ b/core/unittest/flusher/FlusherSLSUnittest.cpp @@ -591,7 +591,7 @@ void FlusherSLSUnittest::TestSend() { // replayed group PipelineEventGroup group(make_shared()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); - group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname"); + group.SetTag(TagDefaultKey[TagKey::HOST_NAME], "hostname"); group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid"); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); @@ -632,9 +632,9 @@ void FlusherSLSUnittest::TestSend() { APSARA_TEST_EQUAL("uuid", logGroup.machineuuid()); APSARA_TEST_EQUAL("172.0.0.1", logGroup.source()); APSARA_TEST_EQUAL(2, logGroup.logtags_size()); - APSARA_TEST_EQUAL("__hostname__", logGroup.logtags(0).key()); - APSARA_TEST_EQUAL("hostname", logGroup.logtags(0).value()); - APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(1).key()); + APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key()); + APSARA_TEST_EQUAL(TagDefaultKey[TagKey::HOST_NAME], logGroup.logtags(1).key()); + APSARA_TEST_EQUAL("hostname", logGroup.logtags(1).value()); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size()); @@ -648,7 +648,7 @@ void FlusherSLSUnittest::TestSend() { flusher.mBatcher.GetEventFlushStrategy().SetMaxCnt(1); PipelineEventGroup group(make_shared()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); - group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname"); + group.SetTag(TagDefaultKey[TagKey::HOST_NAME], "hostname"); group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid"); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); @@ -686,9 +686,9 @@ void FlusherSLSUnittest::TestSend() { APSARA_TEST_EQUAL("uuid", logGroup.machineuuid()); APSARA_TEST_EQUAL("172.0.0.1", logGroup.source()); APSARA_TEST_EQUAL(2, logGroup.logtags_size()); - APSARA_TEST_EQUAL("__hostname__", logGroup.logtags(0).key()); - APSARA_TEST_EQUAL("hostname", logGroup.logtags(0).value()); - APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(1).key()); + APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key()); + APSARA_TEST_EQUAL(TagDefaultKey[TagKey::HOST_NAME], logGroup.logtags(1).key()); + APSARA_TEST_EQUAL("hostname", logGroup.logtags(1).value()); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size()); @@ -733,7 +733,7 @@ void FlusherSLSUnittest::TestSend() { flusher.mBatcher.GetEventFlushStrategy().SetMaxCnt(1); PipelineEventGroup group(make_shared()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); - group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname"); + group.SetTag(TagDefaultKey[TagKey::HOST_NAME], "hostname"); group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid"); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); @@ -766,9 +766,9 @@ void FlusherSLSUnittest::TestSend() { APSARA_TEST_EQUAL("uuid", logGroup.machineuuid()); APSARA_TEST_EQUAL("172.0.0.1", logGroup.source()); APSARA_TEST_EQUAL(3, logGroup.logtags_size()); - APSARA_TEST_EQUAL("__hostname__", logGroup.logtags(0).key()); - APSARA_TEST_EQUAL("hostname", logGroup.logtags(0).value()); - APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(1).key()); + APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key()); + APSARA_TEST_EQUAL(TagDefaultKey[TagKey::HOST_NAME], logGroup.logtags(1).key()); + APSARA_TEST_EQUAL("hostname", logGroup.logtags(1).value()); APSARA_TEST_EQUAL("tag_key", logGroup.logtags(2).key()); APSARA_TEST_EQUAL("tag_value", logGroup.logtags(2).value()); APSARA_TEST_EQUAL(1, logGroup.logs_size()); @@ -815,7 +815,7 @@ void FlusherSLSUnittest::TestSend() { PipelineEventGroup group(make_shared()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); - group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname"); + group.SetTag(TagDefaultKey[TagKey::HOST_NAME], "hostname"); group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid"); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); @@ -870,9 +870,9 @@ void FlusherSLSUnittest::TestSend() { APSARA_TEST_EQUAL("uuid", logGroup.machineuuid()); APSARA_TEST_EQUAL("172.0.0.1", logGroup.source()); APSARA_TEST_EQUAL(2, logGroup.logtags_size()); - APSARA_TEST_EQUAL("__hostname__", logGroup.logtags(0).key()); - APSARA_TEST_EQUAL("hostname", logGroup.logtags(0).value()); - APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(1).key()); + APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key()); + APSARA_TEST_EQUAL(TagDefaultKey[TagKey::HOST_NAME], logGroup.logtags(1).key()); + APSARA_TEST_EQUAL("hostname", logGroup.logtags(1).value()); APSARA_TEST_EQUAL(1, logGroup.logs_size()); if (i == 0) { APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); @@ -917,7 +917,7 @@ void FlusherSLSUnittest::TestFlush() { PipelineEventGroup group(make_shared()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); - group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname"); + group.SetTag(TagDefaultKey[TagKey::HOST_NAME], "hostname"); group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid"); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); @@ -961,7 +961,7 @@ void FlusherSLSUnittest::TestFlushAll() { PipelineEventGroup group(make_shared()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); - group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname"); + group.SetTag(TagDefaultKey[TagKey::HOST_NAME], "hostname"); group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid"); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); diff --git a/core/unittest/pipeline/PipelineUnittest.cpp b/core/unittest/pipeline/PipelineUnittest.cpp index 1fbe72ff22..85b0133ecf 100644 --- a/core/unittest/pipeline/PipelineUnittest.cpp +++ b/core/unittest/pipeline/PipelineUnittest.cpp @@ -200,7 +200,9 @@ void PipelineUnittest::OnSuccessfulInit() const { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 5, - "DefaultLogGroupQueueSize": 3 + "DefaultLogGroupQueueSize": 3, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "inputs": [ { @@ -227,7 +229,9 @@ void PipelineUnittest::OnSuccessfulInit() const { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, "DefaultLogQueueSize" : 10, - "DefaultLogGroupQueueSize": 3 + "DefaultLogGroupQueueSize": 3, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "aggregators": [ { @@ -617,7 +621,9 @@ void PipelineUnittest::OnInitVariousTopology() const { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, - "DefaultLogQueueSize" : 10 + "DefaultLogQueueSize" : 10, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "processors": [ { @@ -684,7 +690,9 @@ void PipelineUnittest::OnInitVariousTopology() const { { "global": { "EnableTimestampNanosecond": false, - "UsingOldContentTag": false + "UsingOldContentTag": false, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "inputs": [ { @@ -803,7 +811,9 @@ void PipelineUnittest::OnInitVariousTopology() const { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, - "DefaultLogQueueSize" : 10 + "DefaultLogQueueSize" : 10, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "processors": [ { @@ -977,7 +987,9 @@ void PipelineUnittest::OnInitVariousTopology() const { { "global": { "EnableTimestampNanosecond": false, - "UsingOldContentTag": false + "UsingOldContentTag": false, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "inputs": [ { @@ -1077,7 +1089,9 @@ void PipelineUnittest::OnInitVariousTopology() const { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, - "DefaultLogQueueSize" : 10 + "DefaultLogQueueSize" : 10, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "aggregators": [ { @@ -1197,7 +1211,9 @@ void PipelineUnittest::OnInitVariousTopology() const { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, - "DefaultLogQueueSize" : 10 + "DefaultLogQueueSize" : 10, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "processors": [ { @@ -1257,7 +1273,9 @@ void PipelineUnittest::OnInitVariousTopology() const { { "global": { "EnableTimestampNanosecond": false, - "UsingOldContentTag": false + "UsingOldContentTag": false, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "inputs": [ { @@ -1364,7 +1382,9 @@ void PipelineUnittest::OnInitVariousTopology() const { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, - "DefaultLogQueueSize" : 10 + "DefaultLogQueueSize" : 10, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "processors": [ { @@ -1491,7 +1511,9 @@ void PipelineUnittest::OnInitVariousTopology() const { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, - "DefaultLogQueueSize" : 10 + "DefaultLogQueueSize" : 10, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "aggregators": [ { @@ -1540,7 +1562,9 @@ void PipelineUnittest::OnInitVariousTopology() const { { "global": { "EnableTimestampNanosecond": false, - "UsingOldContentTag": false + "UsingOldContentTag": false, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "inputs": [ { @@ -1641,7 +1665,9 @@ void PipelineUnittest::OnInitVariousTopology() const { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, - "DefaultLogQueueSize" : 10 + "DefaultLogQueueSize" : 10, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "aggregators": [ { @@ -1791,7 +1817,9 @@ void PipelineUnittest::OnInitVariousTopology() const { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, - "DefaultLogQueueSize" : 10 + "DefaultLogQueueSize" : 10, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "processors": [ { @@ -1865,7 +1893,9 @@ void PipelineUnittest::OnInitVariousTopology() const { { "global": { "EnableTimestampNanosecond": false, - "UsingOldContentTag": false + "UsingOldContentTag": false, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "inputs": [ { @@ -1994,7 +2024,9 @@ void PipelineUnittest::OnInitVariousTopology() const { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, - "DefaultLogQueueSize" : 10 + "DefaultLogQueueSize" : 10, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "processors": [ { @@ -2151,7 +2183,9 @@ void PipelineUnittest::OnInitVariousTopology() const { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, - "DefaultLogQueueSize" : 10 + "DefaultLogQueueSize" : 10, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "aggregators": [ { @@ -2214,7 +2248,9 @@ void PipelineUnittest::OnInitVariousTopology() const { { "global": { "EnableTimestampNanosecond": false, - "UsingOldContentTag": false + "UsingOldContentTag": false, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "inputs": [ { @@ -2567,7 +2603,9 @@ void PipelineUnittest::OnInputFileWithContainerDiscovery() const { "AlwaysOnline": true, "EnableTimestampNanosecond": false, "UsingOldContentTag": false, - "DefaultLogQueueSize" : 10 + "DefaultLogQueueSize" : 10, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "inputs": [ { @@ -2629,7 +2667,9 @@ void PipelineUnittest::OnInputFileWithContainerDiscovery() const { "AlwaysOnline": true, "EnableTimestampNanosecond": false, "UsingOldContentTag": false, - "DefaultLogQueueSize" : 10 + "DefaultLogQueueSize" : 10, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "inputs": [ { @@ -2649,7 +2689,9 @@ void PipelineUnittest::OnInputFileWithContainerDiscovery() const { "global" : { "EnableTimestampNanosecond": false, "UsingOldContentTag": false, - "DefaultLogQueueSize" : 10 + "DefaultLogQueueSize" : 10, + "PipelineMetaTagKey": null, + "AgentEnvMetaTagKey": null }, "processors": [ { @@ -2894,6 +2936,7 @@ void PipelineUnittest::TestWaitAllItemsInProcessFinished() const { APSARA_TEST_NOT_EQUAL(std::future_status::ready, future.wait_for(std::chrono::seconds(0))); pipeline->mInProcessCnt.store(0); // recover + usleep(1000); APSARA_TEST_EQUAL(std::future_status::ready, future.wait_for(std::chrono::seconds(0))); } diff --git a/core/unittest/processor/ProcessorDesensitizeNativeUnittest.cpp b/core/unittest/processor/ProcessorDesensitizeNativeUnittest.cpp index 4ed95ecc73..2687fb9543 100644 --- a/core/unittest/processor/ProcessorDesensitizeNativeUnittest.cpp +++ b/core/unittest/processor/ProcessorDesensitizeNativeUnittest.cpp @@ -54,7 +54,7 @@ UNIT_TEST_CASE(ProcessorDesensitizeNativeUnittest, TestMultipleLines); UNIT_TEST_CASE(ProcessorDesensitizeNativeUnittest, TestMultipleLinesWithProcessorMergeMultilineLogNative); -PluginInstance::PluginMeta getPluginMeta(){ +PluginInstance::PluginMeta getPluginMeta() { PluginInstance::PluginMeta pluginMeta{"1"}; return pluginMeta; } @@ -130,7 +130,6 @@ dbf@@@324 FS2$%pwd,pwd=saf543#$@,," // make config Json::Value config = GetCastSensWordConfig("content"); config["SplitChar"] = "\n"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processorSplitLogStringNative; @@ -159,7 +158,6 @@ dbf@@@324 FS2$%pwd,pwd=saf543#$@,," Json::Value config = GetCastSensWordConfig("content"); config["StartPattern"] = "[a-zA-Z0-9]*"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitMultilineLogStringNative ProcessorSplitMultilineLogStringNative processorSplitMultilineLogStringNative; @@ -232,7 +230,6 @@ dbf@@@324 FS2$%pwd,pwd=saf543#$@,," // make config Json::Value config = GetCastSensWordConfig("content"); config["SplitChar"] = "\n"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processorSplitLogStringNative; @@ -261,7 +258,6 @@ dbf@@@324 FS2$%pwd,pwd=saf543#$@,," Json::Value config = GetCastSensWordConfig("content"); config["StartPattern"] = "[asf|dbf].*"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; config["MergeType"] = "regex"; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processorSplitLogStringNative; diff --git a/core/unittest/processor/ProcessorParseApsaraNativeUnittest.cpp b/core/unittest/processor/ProcessorParseApsaraNativeUnittest.cpp index 7ceeb708dc..ef3e1b851c 100644 --- a/core/unittest/processor/ProcessorParseApsaraNativeUnittest.cpp +++ b/core/unittest/processor/ProcessorParseApsaraNativeUnittest.cpp @@ -19,8 +19,8 @@ #include "models/LogEvent.h" #include "models/StringView.h" #include "pipeline/plugin/instance/ProcessorInstance.h" -#include "plugin/processor/inner/ProcessorMergeMultilineLogNative.h" #include "plugin/processor/ProcessorParseApsaraNative.h" +#include "plugin/processor/inner/ProcessorMergeMultilineLogNative.h" #include "plugin/processor/inner/ProcessorSplitLogStringNative.h" #include "plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h" #include "unittest/Unittest.h" @@ -63,7 +63,7 @@ UNIT_TEST_CASE(ProcessorParseApsaraNativeUnittest, TestProcessEventMicrosecondUn UNIT_TEST_CASE(ProcessorParseApsaraNativeUnittest, TestApsaraEasyReadLogTimeParser); UNIT_TEST_CASE(ProcessorParseApsaraNativeUnittest, TestApsaraLogLineParser); -PluginInstance::PluginMeta getPluginMeta(){ +PluginInstance::PluginMeta getPluginMeta() { PluginInstance::PluginMeta pluginMeta{"1"}; return pluginMeta; } @@ -533,7 +533,6 @@ void ProcessorParseApsaraNativeUnittest::TestMultipleLines() { config["KeepingSourceWhenParseSucceed"] = false; config["CopingRawLog"] = false; config["RenamedSourceKey"] = "__raw__"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processorSplitLogStringNative; @@ -568,7 +567,6 @@ void ProcessorParseApsaraNativeUnittest::TestMultipleLines() { config["RenamedSourceKey"] = "__raw__"; config["StartPattern"] = "[a-zA-Z0-9]*"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitMultilineLogStringNative diff --git a/core/unittest/processor/ProcessorParseDelimiterNativeUnittest.cpp b/core/unittest/processor/ProcessorParseDelimiterNativeUnittest.cpp index 51018f1de3..f21dab6c99 100644 --- a/core/unittest/processor/ProcessorParseDelimiterNativeUnittest.cpp +++ b/core/unittest/processor/ProcessorParseDelimiterNativeUnittest.cpp @@ -65,7 +65,7 @@ UNIT_TEST_CASE(ProcessorParseDelimiterNativeUnittest, TestAllowingShortenedField UNIT_TEST_CASE(ProcessorParseDelimiterNativeUnittest, TestExtend); UNIT_TEST_CASE(ProcessorParseDelimiterNativeUnittest, TestEmpty); -PluginInstance::PluginMeta getPluginMeta(){ +PluginInstance::PluginMeta getPluginMeta() { PluginInstance::PluginMeta pluginMeta{"1"}; return pluginMeta; } @@ -207,7 +207,6 @@ void ProcessorParseDelimiterNativeUnittest::TestAllowingShortenedFields() { config["RenamedSourceKey"] = "__raw__"; config["AllowingShortenedFields"] = false; config["SplitChar"] = '\n'; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processor; @@ -248,7 +247,6 @@ void ProcessorParseDelimiterNativeUnittest::TestAllowingShortenedFields() { config["AllowingShortenedFields"] = false; config["StartPattern"] = "[a-zA-Z0-9]*"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitMultilineLogStringNative ProcessorSplitMultilineLogStringNative processor; @@ -331,7 +329,6 @@ void ProcessorParseDelimiterNativeUnittest::TestAllowingShortenedFields() { config["RenamedSourceKey"] = "__raw__"; config["AllowingShortenedFields"] = true; config["SplitChar"] = '\n'; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processor; @@ -370,7 +367,6 @@ void ProcessorParseDelimiterNativeUnittest::TestAllowingShortenedFields() { config["AllowingShortenedFields"] = true; config["StartPattern"] = "[a-zA-Z0-9]*"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitMultilineLogStringNative ProcessorSplitMultilineLogStringNative processor; @@ -480,7 +476,6 @@ void ProcessorParseDelimiterNativeUnittest::TestExtend() { config["RenamedSourceKey"] = "__raw__"; config["AllowingShortenedFields"] = false; config["SplitChar"] = '\n'; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processor; @@ -520,7 +515,6 @@ void ProcessorParseDelimiterNativeUnittest::TestExtend() { config["AllowingShortenedFields"] = false; config["StartPattern"] = "[a-zA-Z0-9]*"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitMultilineLogStringNative ProcessorSplitMultilineLogStringNative processor; @@ -607,7 +601,6 @@ void ProcessorParseDelimiterNativeUnittest::TestExtend() { config["RenamedSourceKey"] = "__raw__"; config["AllowingShortenedFields"] = false; config["SplitChar"] = '\n'; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processor; @@ -646,7 +639,6 @@ void ProcessorParseDelimiterNativeUnittest::TestExtend() { config["AllowingShortenedFields"] = false; config["StartPattern"] = "[a-zA-Z0-9]*"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitMultilineLogStringNative ProcessorSplitMultilineLogStringNative processor; @@ -728,8 +720,6 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLines() { config["RenamedSourceKey"] = "__raw__"; config["AllowingShortenedFields"] = false; config["SplitChar"] = '\n'; - config["AppendingLogPositionMeta"] = false; - // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processor; @@ -770,7 +760,6 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLines() { config["AllowingShortenedFields"] = false; config["StartPattern"] = "[a-zA-Z0-9]*"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitMultilineLogStringNative ProcessorSplitMultilineLogStringNative processor; @@ -853,7 +842,6 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLines() { config["RenamedSourceKey"] = "__raw__"; config["AllowingShortenedFields"] = false; config["SplitChar"] = '\n'; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processor; @@ -891,7 +879,6 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLines() { config["AllowingShortenedFields"] = false; config["StartPattern"] = "[a-zA-Z0-9]*"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitMultilineLogStringNative ProcessorSplitMultilineLogStringNative processor; @@ -978,7 +965,6 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLines() { config["RenamedSourceKey"] = "__raw__"; config["AllowingShortenedFields"] = false; config["SplitChar"] = '\n'; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processor; @@ -1017,7 +1003,6 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLines() { config["AllowingShortenedFields"] = false; config["StartPattern"] = "[a-zA-Z0-9]*"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitMultilineLogStringNative ProcessorSplitMultilineLogStringNative processor; @@ -1099,8 +1084,6 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLinesWithProcessorMergeM config["RenamedSourceKey"] = "__raw__"; config["AllowingShortenedFields"] = false; config["SplitChar"] = '\n'; - config["AppendingLogPositionMeta"] = false; - // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processor; @@ -1142,7 +1125,6 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLinesWithProcessorMergeM config["StartPattern"] = "[123|012].*"; config["MergeType"] = "regex"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processorSplitLogStringNative; @@ -1231,7 +1213,6 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLinesWithProcessorMergeM config["RenamedSourceKey"] = "__raw__"; config["AllowingShortenedFields"] = false; config["SplitChar"] = '\n'; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processor; @@ -1270,7 +1251,6 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLinesWithProcessorMergeM config["StartPattern"] = "[123|012].*"; config["MergeType"] = "regex"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processorSplitLogStringNative; @@ -1363,7 +1343,6 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLinesWithProcessorMergeM config["RenamedSourceKey"] = "__raw__"; config["AllowingShortenedFields"] = false; config["SplitChar"] = '\n'; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processor; @@ -1403,7 +1382,6 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLinesWithProcessorMergeM config["StartPattern"] = "[123|012].*"; config["MergeType"] = "regex"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processorSplitLogStringNative; @@ -1577,7 +1555,6 @@ HTTP/2.0' '200' '154' 'go-sdk'" config["AllowingShortenedFields"] = true; config["StartPattern"] = "[a-zA-Z0-9]*"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; std::string pluginId = "testID"; // run function ProcessorParseDelimiterNative @@ -1709,7 +1686,6 @@ HTTP/2.0' '200' '154' 'go-sdk'" config["AllowingShortenedFields"] = true; config["StartPattern"] = "[a-zA-Z0-9]*"; config["UnmatchedContentTreatment"] = "single_line"; - config["AppendingLogPositionMeta"] = false; // run function ProcessorSplitMultilineLogStringNative ProcessorSplitMultilineLogStringNative processor; diff --git a/core/unittest/processor/ProcessorParseJsonNativeUnittest.cpp b/core/unittest/processor/ProcessorParseJsonNativeUnittest.cpp index 2b5a6a91bd..d4f5a10751 100644 --- a/core/unittest/processor/ProcessorParseJsonNativeUnittest.cpp +++ b/core/unittest/processor/ProcessorParseJsonNativeUnittest.cpp @@ -58,7 +58,7 @@ UNIT_TEST_CASE(ProcessorParseJsonNativeUnittest, TestProcessJsonRaw); UNIT_TEST_CASE(ProcessorParseJsonNativeUnittest, TestMultipleLines); -PluginInstance::PluginMeta getPluginMeta(){ +PluginInstance::PluginMeta getPluginMeta() { PluginInstance::PluginMeta pluginMeta{"1"}; return pluginMeta; } @@ -123,8 +123,6 @@ void ProcessorParseJsonNativeUnittest::TestMultipleLines() { config["CopingRawLog"] = true; config["RenamedSourceKey"] = "rawLog"; config["SplitChar"] = '\0'; - config["AppendingLogPositionMeta"] = false; - // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processor; @@ -206,8 +204,6 @@ void ProcessorParseJsonNativeUnittest::TestMultipleLines() { config["CopingRawLog"] = true; config["RenamedSourceKey"] = "rawLog"; config["SplitChar"] = '\0'; - config["AppendingLogPositionMeta"] = false; - // run function ProcessorSplitLogStringNative ProcessorSplitLogStringNative processor; diff --git a/core/unittest/processor/ProcessorSplitLogStringNativeUnittest.cpp b/core/unittest/processor/ProcessorSplitLogStringNativeUnittest.cpp index 32e158ceac..9d4889fcc5 100644 --- a/core/unittest/processor/ProcessorSplitLogStringNativeUnittest.cpp +++ b/core/unittest/processor/ProcessorSplitLogStringNativeUnittest.cpp @@ -15,8 +15,8 @@ #include #include -#include "common/Constants.h" #include "common/JsonUtil.h" +#include "common/TagConstants.h" #include "config/PipelineConfig.h" #include "pipeline/plugin/instance/ProcessorInstance.h" #include "plugin/processor/inner/ProcessorSplitLogStringNative.h" @@ -39,7 +39,7 @@ UNIT_TEST_CASE(ProcessorSplitLogStringNativeUnittest, TestInit) UNIT_TEST_CASE(ProcessorSplitLogStringNativeUnittest, TestProcessJson) UNIT_TEST_CASE(ProcessorSplitLogStringNativeUnittest, TestProcessCommon) -PluginInstance::PluginMeta getPluginMeta(){ +PluginInstance::PluginMeta getPluginMeta() { PluginInstance::PluginMeta pluginMeta{"1"}; return pluginMeta; } @@ -47,7 +47,6 @@ PluginInstance::PluginMeta getPluginMeta(){ void ProcessorSplitLogStringNativeUnittest::TestInit() { // make config Json::Value config; - config["AppendingLogPositionMeta"] = false; ProcessorSplitLogStringNative processor; processor.SetContext(mContext); @@ -58,10 +57,10 @@ void ProcessorSplitLogStringNativeUnittest::TestProcessJson() { // make config Json::Value config; config["SplitChar"] = '\0'; - config["AppendingLogPositionMeta"] = true; // make events auto sourceBuffer = std::make_shared(); PipelineEventGroup eventGroup(sourceBuffer); + eventGroup.SetMetadata(EventGroupMetaKey::LOG_FILE_OFFSET_KEY, TagDefaultKey[TagKey::FILE_OFFSET_KEY]); std::stringstream inJson; inJson << R"({ "events" : @@ -95,8 +94,8 @@ void ProcessorSplitLogStringNativeUnittest::TestProcessJson() { { "contents" : { - "__file_offset__": "1", - "content" : "{\n\"k1\":\"v1\"\n}" + "content" : "{\n\"k1\":\"v1\"\n}", + "log.file.offset": "1" }, "fileOffset": 1, "rawSize": )" @@ -108,9 +107,9 @@ void ProcessorSplitLogStringNativeUnittest::TestProcessJson() { { "contents" : { - "__file_offset__": ")" - << strlen(R"({n"k1":"v1"n}0)") + 1 << R"(", - "content" : "{\n\"k2\":\"v2\"\n}" + "content" : "{\n\"k2\":\"v2\"\n}", + "log.file.offset": ")" + << strlen(R"({n"k1":"v1"n}0)") + 1 << R"(" }, "fileOffset": )" << strlen(R"({n"k1":"v1"n}0)") + 1 << R"(, @@ -120,7 +119,10 @@ void ProcessorSplitLogStringNativeUnittest::TestProcessJson() { "timestampNanosecond" : 0, "type" : 1 } - ] + ], + "metadata": { + "log.file.offset": "log.file.offset" + } })"; std::string outJson = eventGroup.ToJsonString(true); APSARA_TEST_STREQ_FATAL(CompactJson(expectJson.str()).c_str(), CompactJson(outJson).c_str()); @@ -131,7 +133,6 @@ void ProcessorSplitLogStringNativeUnittest::TestProcessJson() { void ProcessorSplitLogStringNativeUnittest::TestProcessCommon() { // make config Json::Value config; - config["AppendingLogPositionMeta"] = false; // make events auto sourceBuffer = std::make_shared(); PipelineEventGroup eventGroup(sourceBuffer); diff --git a/core/unittest/processor/ProcessorTagNativeUnittest.cpp b/core/unittest/processor/ProcessorTagNativeUnittest.cpp index 7855259280..e6ebea0082 100644 --- a/core/unittest/processor/ProcessorTagNativeUnittest.cpp +++ b/core/unittest/processor/ProcessorTagNativeUnittest.cpp @@ -79,14 +79,14 @@ void ProcessorTagNativeUnittest::TestProcess() { APSARA_TEST_TRUE_FATAL(processor.Init(config)); processor.Process(eventGroup); - APSARA_TEST_TRUE_FATAL(eventGroup.HasTag(LOG_RESERVED_KEY_PATH)); - APSARA_TEST_EQUAL_FATAL(eventGroup.GetMetadata(EventGroupMetaKey::LOG_FILE_PATH), - eventGroup.GetTag(LOG_RESERVED_KEY_PATH)); - APSARA_TEST_FALSE_FATAL(eventGroup.HasTag(LOG_RESERVED_KEY_HOSTNAME)); + // APSARA_TEST_TRUE_FATAL(eventGroup.HasTag(LOG_RESERVED_KEY_PATH)); + // APSARA_TEST_EQUAL_FATAL(eventGroup.GetMetadata(EventGroupMetaKey::LOG_FILE_PATH), + // eventGroup.GetTag(LOG_RESERVED_KEY_PATH)); + // APSARA_TEST_FALSE_FATAL(eventGroup.HasTag(LOG_RESERVED_KEY_HOSTNAME)); #ifdef __ENTERPRISE__ - APSARA_TEST_TRUE_FATAL(eventGroup.HasTag(LOG_RESERVED_KEY_USER_DEFINED_ID)); - APSARA_TEST_EQUAL_FATAL(EnterpriseConfigProvider::GetInstance()->GetUserDefinedIdSet(), - eventGroup.GetTag(LOG_RESERVED_KEY_USER_DEFINED_ID)); + // APSARA_TEST_TRUE_FATAL(eventGroup.HasTag(LOG_RESERVED_KEY_USER_DEFINED_ID)); + // APSARA_TEST_EQUAL_FATAL(EnterpriseConfigProvider::GetInstance()->GetUserDefinedIdSet(), + // eventGroup.GetTag(LOG_RESERVED_KEY_USER_DEFINED_ID)); #endif } @@ -98,11 +98,11 @@ void ProcessorTagNativeUnittest::TestProcess() { APSARA_TEST_TRUE_FATAL(processor.Init(config)); processor.Process(eventGroup); - APSARA_TEST_TRUE_FATAL(eventGroup.HasTag(LOG_RESERVED_KEY_PATH)); - APSARA_TEST_EQUAL_FATAL(eventGroup.GetMetadata(EventGroupMetaKey::LOG_FILE_PATH), - eventGroup.GetTag(LOG_RESERVED_KEY_PATH)); - APSARA_TEST_TRUE_FATAL(eventGroup.HasTag(LOG_RESERVED_KEY_HOSTNAME)); - APSARA_TEST_EQUAL_FATAL(LogFileProfiler::mHostname, eventGroup.GetTag(LOG_RESERVED_KEY_HOSTNAME)); + // APSARA_TEST_TRUE_FATAL(eventGroup.HasTag(LOG_RESERVED_KEY_PATH)); + // APSARA_TEST_EQUAL_FATAL(eventGroup.GetMetadata(EventGroupMetaKey::LOG_FILE_PATH), + // eventGroup.GetTag(LOG_RESERVED_KEY_PATH)); + // APSARA_TEST_TRUE_FATAL(eventGroup.HasTag(LOG_RESERVED_KEY_HOSTNAME)); + // APSARA_TEST_EQUAL_FATAL(LogFileProfiler::mHostname, eventGroup.GetTag(LOG_RESERVED_KEY_HOSTNAME)); #ifdef __ENTERPRISE__ APSARA_TEST_TRUE_FATAL(eventGroup.HasTag(LOG_RESERVED_KEY_USER_DEFINED_ID)); APSARA_TEST_EQUAL_FATAL(EnterpriseConfigProvider::GetInstance()->GetUserDefinedIdSet(), diff --git a/core/unittest/reader/DeletedFileUnittest.cpp b/core/unittest/reader/DeletedFileUnittest.cpp index 23bf0500e7..884c932ef6 100644 --- a/core/unittest/reader/DeletedFileUnittest.cpp +++ b/core/unittest/reader/DeletedFileUnittest.cpp @@ -32,8 +32,9 @@ class DeletedFileUnittest : public testing::Test { hostLogPathFile, DevInode(), make_pair(&readerOpts, &ctx), - make_pair(&multilineOpts, &ctx))); - } + make_pair(&multilineOpts, &ctx), + make_pair(&tagOpts, &ctx))); + } void TearDown() override { INT32_FLAG(force_release_deleted_file_fd_timeout) = -1; } @@ -41,6 +42,7 @@ class DeletedFileUnittest : public testing::Test { LogFileReaderPtr reader; FileReaderOptions readerOpts; MultilineOptions multilineOpts; + FileTagOptions tagOpts; PipelineContext ctx; string hostLogPathDir = "."; string hostLogPathFile = "DeletedFileUnittest.txt"; diff --git a/core/unittest/reader/ForceReadUnittest.cpp b/core/unittest/reader/ForceReadUnittest.cpp index fa99c10eec..13460a5096 100644 --- a/core/unittest/reader/ForceReadUnittest.cpp +++ b/core/unittest/reader/ForceReadUnittest.cpp @@ -25,10 +25,10 @@ #include "common/JsonUtil.h" #include "config/PipelineConfig.h" #include "file_server/ConfigManager.h" +#include "file_server/FileServer.h" #include "file_server/event/BlockEventManager.h" #include "file_server/event/Event.h" #include "file_server/event_handler/EventHandler.h" -#include "file_server/FileServer.h" #include "logger/Logger.h" #include "pipeline/Pipeline.h" #include "pipeline/queue/ProcessQueueManager.h" @@ -118,6 +118,7 @@ class ForceReadUnittest : public testing::Test { FileServer::GetInstance()->AddFileDiscoveryConfig(mConfigName, &discoveryOpts, &ctx); FileServer::GetInstance()->AddFileReaderConfig(mConfigName, &readerOpts, &ctx); FileServer::GetInstance()->AddMultilineConfig(mConfigName, &multilineOpts, &ctx); + FileServer::GetInstance()->AddFileTagConfig(mConfigName, &tagOpts, &ctx); ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(0, 0, ctx); ProcessQueueManager::GetInstance()->EnablePop(mConfigName); } @@ -133,6 +134,7 @@ class ForceReadUnittest : public testing::Test { FileDiscoveryOptions discoveryOpts; FileReaderOptions readerOpts; MultilineOptions multilineOpts; + FileTagOptions tagOpts; PipelineContext ctx; FileDiscoveryConfig mConfig; }; @@ -144,8 +146,12 @@ void ForceReadUnittest::TestTimeoutForceRead() { { // read -> add timeout event -> handle timeout -> valid -> read empty -> not rollback Init(); - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); reader.CheckFileSignatureAndOffset(true); @@ -182,8 +188,12 @@ void ForceReadUnittest::TestTimeoutForceRead() { { // read -> write -> add timeout event -> handle timeout -> valid -> read not empty -> rollback Init(); - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); reader.CheckFileSignatureAndOffset(true); @@ -224,8 +234,12 @@ void ForceReadUnittest::TestTimeoutForceRead() { // read -> add timeout event -> write -> read -> handle timeout -> event invalid LOG_WARNING(sLogger, ("This case is difficult to test", "test")); Init(); - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); reader.CheckFileSignatureAndOffset(true); @@ -298,8 +312,12 @@ void ForceReadUnittest::TestFileCloseForceRead() { { // file close -> handle timeout -> valid -> not rollback Init(); - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); reader.CheckFileSignatureAndOffset(true); @@ -342,8 +360,12 @@ void ForceReadUnittest::TestAddTimeoutEvent() { { // read part -> not add timeout event Init(); - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); reader.CheckFileSignatureAndOffset(true); @@ -367,8 +389,12 @@ void ForceReadUnittest::TestAddTimeoutEvent() { { // read all -> add timeout event Init(); - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); reader.CheckFileSignatureAndOffset(true); diff --git a/core/unittest/reader/GetLastLineDataUnittest.cpp b/core/unittest/reader/GetLastLineDataUnittest.cpp index 761c225a57..e99ac67f42 100644 --- a/core/unittest/reader/GetLastLineDataUnittest.cpp +++ b/core/unittest/reader/GetLastLineDataUnittest.cpp @@ -13,8 +13,8 @@ // limitations under the License. #include "common/FileSystemUtil.h" -#include "file_server/reader/LogFileReader.h" #include "common/memory/SourceBuffer.h" +#include "file_server/reader/LogFileReader.h" #include "unittest/Unittest.h" namespace logtail { @@ -66,6 +66,7 @@ class LastMatchedContainerdTextLineUnittest : public ::testing::Test { std::unique_ptr expectedContent; FileReaderOptions readerOpts; + FileTagOptions tagOpts; PipelineContext ctx; static std::string logPathDir; static std::string gbkFile; @@ -82,8 +83,12 @@ std::string LastMatchedContainerdTextLineUnittest::utf8File; void LastMatchedContainerdTextLineUnittest::TestLastContainerdTextLineSingleLine() { { MultilineOptions multilineOpts; - LogFileReader logFileReader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader logFileReader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); BaseLineParse* baseLineParsePtr = nullptr; baseLineParsePtr = logFileReader.GetParser(LogFileReader::BUFFER_SIZE); logFileReader.mLineParsers.emplace_back(baseLineParsePtr); @@ -511,8 +516,12 @@ void LastMatchedContainerdTextLineUnittest::TestLastContainerdTextLineSingleLine void LastMatchedContainerdTextLineUnittest::TestLastContainerdTextLineMerge() { { MultilineOptions multilineOpts; - LogFileReader logFileReader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader logFileReader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); BaseLineParse* baseLineParsePtr = nullptr; baseLineParsePtr = logFileReader.GetParser(LogFileReader::BUFFER_SIZE); logFileReader.mLineParsers.emplace_back(baseLineParsePtr); @@ -978,6 +987,7 @@ class LastMatchedDockerJsonFileUnittest : public ::testing::Test { std::unique_ptr expectedContent; FileReaderOptions readerOpts; + FileTagOptions tagOpts; PipelineContext ctx; static std::string logPathDir; static std::string gbkFile; @@ -993,8 +1003,12 @@ std::string LastMatchedDockerJsonFileUnittest::utf8File; void LastMatchedDockerJsonFileUnittest::TestLastDockerJsonFile() { { MultilineOptions multilineOpts; - LogFileReader logFileReader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader logFileReader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); BaseLineParse* baseLineParsePtr = nullptr; baseLineParsePtr = logFileReader.GetParser(0); logFileReader.mLineParsers.emplace_back(baseLineParsePtr); @@ -1233,6 +1247,7 @@ class LastMatchedContainerdTextWithDockerJsonUnittest : public ::testing::Test { std::unique_ptr expectedContent; FileReaderOptions readerOpts; + FileTagOptions tagOpts; PipelineContext ctx; static std::string logPathDir; static std::string gbkFile; @@ -1248,8 +1263,12 @@ std::string LastMatchedContainerdTextWithDockerJsonUnittest::utf8File; void LastMatchedContainerdTextWithDockerJsonUnittest::TestContainerdTextWithDockerJson() { MultilineOptions multilineOpts; - LogFileReader logFileReader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader logFileReader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); BaseLineParse* baseLineParsePtr = nullptr; baseLineParsePtr = logFileReader.GetParser(0); logFileReader.mLineParsers.emplace_back(baseLineParsePtr); @@ -1303,8 +1322,12 @@ void LastMatchedContainerdTextWithDockerJsonUnittest::TestContainerdTextWithDock void LastMatchedContainerdTextWithDockerJsonUnittest::TestDockerJsonWithContainerdText() { MultilineOptions multilineOpts; - LogFileReader logFileReader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader logFileReader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); BaseLineParse* baseLineParsePtr = nullptr; baseLineParsePtr = logFileReader.GetParser(LogFileReader::BUFFER_SIZE); logFileReader.mLineParsers.emplace_back(baseLineParsePtr); diff --git a/core/unittest/reader/JsonLogFileReaderUnittest.cpp b/core/unittest/reader/JsonLogFileReaderUnittest.cpp index cd9f16f805..1522aac796 100644 --- a/core/unittest/reader/JsonLogFileReaderUnittest.cpp +++ b/core/unittest/reader/JsonLogFileReaderUnittest.cpp @@ -89,9 +89,14 @@ void JsonLogFileReaderUnittest::TestReadGBK() { { // buffer size big enough and is json MultilineOptions multilineOpts; FileReaderOptions readerOpts; + FileTagOptions tagOpts; readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; - JsonLogFileReader reader( - logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + JsonLogFileReader reader(logPathDir, + gbkFile, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); reader.CheckFileSignatureAndOffset(true); @@ -111,8 +116,13 @@ void JsonLogFileReaderUnittest::TestReadGBK() { multilineOpts.Init(config, ctx, ""); FileReaderOptions readerOpts; readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; - JsonLogFileReader reader( - logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + FileTagOptions tagOpts; + JsonLogFileReader reader(logPathDir, + gbkFile, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); LogFileReader::BUFFER_SIZE = 23; size_t BUFFER_SIZE_UTF8 = 25; // "{"first":"iLogtail 为可" reader.UpdateReaderManual(); @@ -130,8 +140,13 @@ void JsonLogFileReaderUnittest::TestReadGBK() { MultilineOptions multilineOpts; FileReaderOptions readerOpts; readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; - JsonLogFileReader reader( - logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + FileTagOptions tagOpts; + JsonLogFileReader reader(logPathDir, + gbkFile, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); int64_t fileSize = reader.mLogFileOp.GetFileSize(); @@ -151,8 +166,13 @@ void JsonLogFileReaderUnittest::TestReadUTF8() { { // buffer size big enough and is json MultilineOptions multilineOpts; FileReaderOptions readerOpts; - JsonLogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + FileTagOptions tagOpts; + JsonLogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); reader.CheckFileSignatureAndOffset(true); @@ -168,8 +188,13 @@ void JsonLogFileReaderUnittest::TestReadUTF8() { // should read buffer size MultilineOptions multilineOpts; FileReaderOptions readerOpts; - JsonLogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + FileTagOptions tagOpts; + JsonLogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); LogFileReader::BUFFER_SIZE = 25; reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); @@ -185,8 +210,13 @@ void JsonLogFileReaderUnittest::TestReadUTF8() { // should read until last json MultilineOptions multilineOpts; FileReaderOptions readerOpts; - JsonLogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + FileTagOptions tagOpts; + JsonLogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); int64_t fileSize = reader.mLogFileOp.GetFileSize(); @@ -203,8 +233,13 @@ void JsonLogFileReaderUnittest::TestReadUTF8() { { // read twice MultilineOptions multilineOpts; FileReaderOptions readerOpts; - JsonLogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + FileTagOptions tagOpts; + JsonLogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); int64_t fileSize = reader.mLogFileOp.GetFileSize(); @@ -232,8 +267,12 @@ void JsonLogFileReaderUnittest::TestReadUTF8() { class RemoveLastIncompleteLogUnittest : public ::testing::Test { public: void SetUp() override { - mLogFileReader.reset(new JsonLogFileReader( - "dir", "file", DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx))); + mLogFileReader.reset(new JsonLogFileReader("dir", + "file", + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx))); } void TestRemoveLastIncompleteLogSingleLine(); @@ -248,6 +287,7 @@ class RemoveLastIncompleteLogUnittest : public ::testing::Test { std::unique_ptr mLogFileReader; MultilineOptions multilineOpts; FileReaderOptions readerOpts; + FileTagOptions tagOpts; PipelineContext ctx; }; diff --git a/core/unittest/reader/LogFileReaderUnittest.cpp b/core/unittest/reader/LogFileReaderUnittest.cpp index 165e4e258d..2748ad8986 100644 --- a/core/unittest/reader/LogFileReaderUnittest.cpp +++ b/core/unittest/reader/LogFileReaderUnittest.cpp @@ -21,8 +21,8 @@ #include "common/RuntimeUtil.h" #include "common/memory/SourceBuffer.h" #include "file_server/FileServer.h" -#include "protobuf/sls/sls_logs.pb.h" #include "file_server/reader/LogFileReader.h" +#include "protobuf/sls/sls_logs.pb.h" #include "unittest/Unittest.h" DECLARE_FLAG_INT32(force_release_deleted_file_fd_timeout); @@ -77,6 +77,7 @@ class LogFileReaderUnittest : public ::testing::Test { static std::string utf8File; FileDiscoveryOptions discoveryOpts; FileReaderOptions readerOpts; + FileTagOptions fileTagOpts; PipelineContext ctx; }; @@ -93,8 +94,12 @@ void LogFileReaderUnittest::TestReadGBK() { FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; - LogFileReader reader( - logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + gbkFile, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); reader.CheckFileSignatureAndOffset(true); @@ -109,8 +114,12 @@ void LogFileReaderUnittest::TestReadGBK() { FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; - LogFileReader reader( - logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + gbkFile, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); reader.CheckFileSignatureAndOffset(true); @@ -132,8 +141,12 @@ void LogFileReaderUnittest::TestReadGBK() { readerOpts.mInputType = FileReaderOptions::InputType::InputFile; readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; multilineOpts.Init(config, ctx, ""); - LogFileReader reader( - logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + gbkFile, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); LogFileReader::BUFFER_SIZE = 14; size_t BUFFER_SIZE_UTF8 = 15; // "ilogtail 为可" reader.UpdateReaderManual(); @@ -154,8 +167,12 @@ void LogFileReaderUnittest::TestReadGBK() { FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; - LogFileReader reader( - logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + gbkFile, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); // reader.mDiscardUnmatch = false; reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); @@ -178,8 +195,12 @@ void LogFileReaderUnittest::TestReadGBK() { FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; - LogFileReader reader( - logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + gbkFile, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); // reader.mDiscardUnmatch = false; reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); @@ -207,8 +228,12 @@ void LogFileReaderUnittest::TestReadGBK() { FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; - LogFileReader reader( - logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + gbkFile, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); // reader.mDiscardUnmatch = false; reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); @@ -237,8 +262,12 @@ void LogFileReaderUnittest::TestReadGBK() { FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; - LogFileReader reader( - logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + gbkFile, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); LogBuffer logBuffer; @@ -255,8 +284,12 @@ void LogFileReaderUnittest::TestReadGBK() { FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; - LogFileReader reader( - logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + gbkFile, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); int64_t fileSize = reader.mLogFileOp.GetFileSize(); @@ -306,8 +339,12 @@ void LogFileReaderUnittest::TestReadUTF8() { MultilineOptions multilineOpts; FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); reader.CheckFileSignatureAndOffset(true); @@ -321,8 +358,12 @@ void LogFileReaderUnittest::TestReadUTF8() { MultilineOptions multilineOpts; FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); reader.CheckFileSignatureAndOffset(true); @@ -344,8 +385,12 @@ void LogFileReaderUnittest::TestReadUTF8() { multilineOpts.Init(config, ctx, ""); FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); LogFileReader::BUFFER_SIZE = 15; reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); @@ -365,8 +410,12 @@ void LogFileReaderUnittest::TestReadUTF8() { multilineOpts.Init(config, ctx, ""); FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); int64_t fileSize = reader.mLogFileOp.GetFileSize(); @@ -387,8 +436,12 @@ void LogFileReaderUnittest::TestReadUTF8() { multilineOpts.Init(config, ctx, ""); FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); int64_t fileSize = reader.mLogFileOp.GetFileSize(); @@ -414,8 +467,12 @@ void LogFileReaderUnittest::TestReadUTF8() { MultilineOptions multilineOpts; FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); int64_t fileSize = reader.mLogFileOp.GetFileSize(); @@ -442,8 +499,12 @@ void LogFileReaderUnittest::TestReadUTF8() { MultilineOptions multilineOpts; FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); LogBuffer logBuffer; @@ -459,8 +520,12 @@ void LogFileReaderUnittest::TestReadUTF8() { multilineOpts.Init(config, ctx, ""); FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); int64_t fileSize = reader.mLogFileOp.GetFileSize(); @@ -555,6 +620,7 @@ class LogMultiBytesUnittest : public ::testing::Test { static std::string gbkFile; static std::string utf8File; FileDiscoveryOptions discoveryOpts; + FileTagOptions fileTagOpts; PipelineContext ctx; }; @@ -572,8 +638,12 @@ void LogMultiBytesUnittest::TestAlignLastCharacterUTF8() { MultilineOptions multilineOpts; FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; - LogFileReader logFileReader( - "", "", DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader logFileReader("", + "", + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); std::string expectedLog = "为可观测场景而"; std::string testLog = expectedLog + "生"; size_t result = logFileReader.AlignLastCharacter(const_cast(testLog.data()), expectedLog.size()); @@ -583,8 +653,13 @@ void LogMultiBytesUnittest::TestAlignLastCharacterUTF8() { MultilineOptions multilineOpts; FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; - LogFileReader logFileReader( - "", "", DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + FileTagOptions fileTagOpts; + LogFileReader logFileReader("", + "", + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); std::string expectedLog = "为可观测场景而"; std::string testLog = expectedLog + "生"; size_t result = logFileReader.AlignLastCharacter(const_cast(testLog.data()), expectedLog.size() + 1); @@ -597,8 +672,13 @@ void LogMultiBytesUnittest::TestAlignLastCharacterGBK() { FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; - LogFileReader logFileReader( - "", "", DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + FileTagOptions fileTagOpts; + LogFileReader logFileReader("", + "", + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); { // case: no align std::string expectedLog = "\xce\xaa\xbf\xc9\xb9\xdb\xb2\xe2\xb3\xa1\xbe\xb0\xb6\xf8"; // equal to "为可观测场景而" @@ -619,8 +699,13 @@ void LogMultiBytesUnittest::TestReadUTF8() { MultilineOptions multilineOpts; FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; - LogFileReader reader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + FileTagOptions fileTagOpts; + LogFileReader reader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); LogFileReader::BUFFER_SIZE = 13; // equal to "iLogtail 为" plus one illegal byte reader.UpdateReaderManual(); reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); @@ -639,8 +724,13 @@ void LogMultiBytesUnittest::TestReadGBK() { FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; - LogFileReader reader( - logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + FileTagOptions fileTagOpts; + LogFileReader reader(logPathDir, + gbkFile, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); LogFileReader::BUFFER_SIZE = 12; // equal to "iLogtail 为" plus one illegal byte size_t BUFFER_SIZE_UTF8 = 12; // "ilogtail 为可" reader.UpdateReaderManual(); @@ -680,6 +770,7 @@ class LogFileReaderCheckpointUnittest : public ::testing::Test { static std::string logPathDir; static std::string utf8File; FileDiscoveryOptions discoveryOpts; + FileTagOptions fileTagOpts; PipelineContext ctx; }; @@ -693,8 +784,12 @@ void LogFileReaderCheckpointUnittest::TestDumpMetaToMem() { MultilineOptions multilineOpts; FileReaderOptions readerOpts; readerOpts.mInputType = FileReaderOptions::InputType::InputFile; - LogFileReader reader1( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader1(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); reader1.UpdateReaderManual(); reader1.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); int64_t fileSize = reader1.mLogFileOp.GetFileSize(); @@ -708,8 +803,12 @@ void LogFileReaderCheckpointUnittest::TestDumpMetaToMem() { APSARA_TEST_GE_FATAL(reader1.mCache.size(), 0UL); reader1.DumpMetaToMem(false); // second read - LogFileReader reader2( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader reader2(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&fileTagOpts, &ctx)); reader2.UpdateReaderManual(); reader2.InitReader(false, LogFileReader::BACKWARD_TO_BEGINNING); reader2.CheckFileSignatureAndOffset(true); diff --git a/core/unittest/reader/RemoveLastIncompleteLogUnittest.cpp b/core/unittest/reader/RemoveLastIncompleteLogUnittest.cpp index 5ce57053b3..3acdffca80 100644 --- a/core/unittest/reader/RemoveLastIncompleteLogUnittest.cpp +++ b/core/unittest/reader/RemoveLastIncompleteLogUnittest.cpp @@ -70,6 +70,7 @@ class RemoveLastIncompleteLogUnittest : public ::testing::Test { std::unique_ptr expectedContent; FileReaderOptions readerOpts; + FileTagOptions tagOpts; PipelineContext ctx; static std::string logPathDir; static std::string gbkFile; @@ -85,8 +86,12 @@ std::string RemoveLastIncompleteLogUnittest::utf8File; void RemoveLastIncompleteLogUnittest::TestSingleline() { MultilineOptions multilineOpts; - LogFileReader logFileReader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader logFileReader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); { // case single line std::string line1 = "first."; std::string line2 = "second."; @@ -139,8 +144,12 @@ void RemoveLastIncompleteLogUnittest::TestMultiline() { config["StartPattern"] = LOG_BEGIN_REGEX; MultilineOptions multilineOpts; multilineOpts.Init(config, ctx, ""); - LogFileReader logFileReader( - logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader logFileReader(logPathDir, + utf8File, + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); { // case multi line std::vector index; std::string firstLog = LOG_BEGIN_STRING + "first.\nmultiline1\nmultiline2"; @@ -206,6 +215,7 @@ class RemoveLastIncompleteLogMultilineUnittest : public ::testing::Test { private: FileReaderOptions readerOpts; + FileTagOptions tagOpts; PipelineContext ctx; }; @@ -221,8 +231,12 @@ void RemoveLastIncompleteLogMultilineUnittest::TestRemoveLastIncompleteLogWithBe config["ContinuePattern"] = LOG_CONTINUE_REGEX; MultilineOptions multilineOpts; multilineOpts.Init(config, ctx, ""); - LogFileReader logFileReader( - "dir", "file", DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader logFileReader("dir", + "file", + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); // logFileReader.mDiscardUnmatch = true; { // case: end with begin continue std::string expectMatch = LOG_BEGIN_STRING + "\n" + LOG_CONTINUE_STRING + "\n" + LOG_CONTINUE_STRING + '\n'; @@ -272,8 +286,12 @@ void RemoveLastIncompleteLogMultilineUnittest::TestRemoveLastIncompleteLogWithBe config["EndPattern"] = LOG_END_REGEX; MultilineOptions multilineOpts; multilineOpts.Init(config, ctx, ""); - LogFileReader logFileReader( - "dir", "file", DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader logFileReader("dir", + "file", + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); // logFileReader.mDiscardUnmatch = true; { // case: end with begin end std::string expectMatch = LOG_BEGIN_STRING + "\n" + LOG_UNMATCH + "\n" + LOG_END_STRING + '\n'; @@ -322,8 +340,12 @@ void RemoveLastIncompleteLogMultilineUnittest::TestRemoveLastIncompleteLogWithBe config["StartPattern"] = LOG_BEGIN_REGEX; MultilineOptions multilineOpts; multilineOpts.Init(config, ctx, ""); - LogFileReader logFileReader( - "dir", "file", DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader logFileReader("dir", + "file", + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); // logFileReader.mDiscardUnmatch = true; { // case: end with begin std::string expectMatch = LOG_BEGIN_STRING + "\n" + LOG_UNMATCH + "\n" + LOG_UNMATCH + '\n'; @@ -363,8 +385,12 @@ void RemoveLastIncompleteLogMultilineUnittest::TestRemoveLastIncompleteLogWithCo config["EndPattern"] = LOG_END_REGEX; MultilineOptions multilineOpts; multilineOpts.Init(config, ctx, ""); - LogFileReader logFileReader( - "dir", "file", DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader logFileReader("dir", + "file", + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); // logFileReader.mDiscardUnmatch = true; { // case: end with continue end std::string expectMatch = LOG_CONTINUE_STRING + "\n" + LOG_CONTINUE_STRING + "\n" + LOG_END_STRING + '\n'; @@ -413,8 +439,12 @@ void RemoveLastIncompleteLogMultilineUnittest::TestRemoveLastIncompleteLogWithEn config["EndPattern"] = LOG_END_REGEX; MultilineOptions multilineOpts; multilineOpts.Init(config, ctx, ""); - LogFileReader logFileReader( - "dir", "file", DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + LogFileReader logFileReader("dir", + "file", + DevInode(), + std::make_pair(&readerOpts, &ctx), + std::make_pair(&multilineOpts, &ctx), + std::make_pair(&tagOpts, &ctx)); // logFileReader.mDiscardUnmatch = true; { // case: end with end std::string expectMatch = LOG_UNMATCH + "\n" + LOG_UNMATCH + "\n" + LOG_END_STRING + '\n'; diff --git a/pkg/config/global_config.go b/pkg/config/global_config.go index dab6f67804..2b394214e0 100644 --- a/pkg/config/global_config.go +++ b/pkg/config/global_config.go @@ -48,9 +48,8 @@ type GlobalConfig struct { EnableContainerdUpperDirDetect bool EnableSlsMetricsFormat bool - PipelineMetaTagKey map[string]string - PipelineMetaTagKeyDefaultValue map[string]string - AgentEnvMetaTagKey map[string]string + PipelineMetaTagKey map[string]string + AgentEnvMetaTagKey map[string]string } // LoongcollectorGlobalConfig is the singleton instance of GlobalConfig. diff --git a/pkg/helper/docker_center.go b/pkg/helper/docker_center.go index 193303e736..07c644d134 100644 --- a/pkg/helper/docker_center.go +++ b/pkg/helper/docker_center.go @@ -551,7 +551,6 @@ func (dc *DockerCenter) CreateInfoDetail(info types.ContainerJSON, envConfigPref k8sInfo := K8SInfo{} ip := dc.getIPAddress(info) - containerNameTag["_image_name_"] = dc.getImageName(info.Image, info.Config.Image) if strings.HasPrefix(info.Name, "/k8s_") || strings.HasPrefix(info.Name, "k8s_") || strings.Count(info.Name, "_") >= 4 { // 1. container name is k8s // k8s_php-redis_frontend-2337258262-154p7_default_d8a2e2dd-3617-11e7-a4b0-ecf4bbe5d414_0 @@ -570,7 +569,8 @@ func (dc *DockerCenter) CreateInfoDetail(info types.ContainerJSON, envConfigPref if len(tags) == 6 { baseIndex = 1 } - containerNameTag["_container_name_"] = tags[baseIndex] + containerNameTag["_k8s_image_name_"] = dc.getImageName(info.Image, info.Config.Image) + containerNameTag["_k8s_container_name_"] = tags[baseIndex] containerNameTag["_pod_name_"] = tags[baseIndex+1] containerNameTag["_namespace_"] = tags[baseIndex+2] containerNameTag["_pod_uid_"] = tags[baseIndex+3] @@ -578,9 +578,13 @@ func (dc *DockerCenter) CreateInfoDetail(info types.ContainerJSON, envConfigPref k8sInfo.Pod = tags[baseIndex+1] k8sInfo.Namespace = tags[baseIndex+2] k8sInfo.ExtractK8sLabels(info) + if len(ip) > 0 { + containerNameTag["_k8s_container_ip_"] = ip + } } else if _, ok := info.Config.Labels[k8sPodNameLabel]; ok { // 2. container labels has k8sPodNameLabel - containerNameTag["_container_name_"] = info.Name + containerNameTag["_k8s_image_name_"] = dc.getImageName(info.Image, info.Config.Image) + containerNameTag["_k8s_container_name_"] = info.Name containerNameTag["_pod_name_"] = info.Config.Labels[k8sPodNameLabel] containerNameTag["_namespace_"] = info.Config.Labels[k8sPodNameSpaceLabel] containerNameTag["_pod_uid_"] = info.Config.Labels[k8sPodUUIDLabel] @@ -590,16 +594,20 @@ func (dc *DockerCenter) CreateInfoDetail(info types.ContainerJSON, envConfigPref // the following method is couped with the CRI adapter, only the original docker container labels // would be added to the labels of the k8s info. k8sInfo.ExtractK8sLabels(info) + if len(ip) > 0 { + containerNameTag["_k8s_container_ip_"] = ip + } } else { // 3. treat as normal container + containerNameTag["_image_name_"] = dc.getImageName(info.Image, info.Config.Image) if strings.HasPrefix(info.Name, "/") { containerNameTag["_container_name_"] = info.Name[1:] } else { containerNameTag["_container_name_"] = info.Name } - } - if len(ip) > 0 { - containerNameTag["_container_ip_"] = ip + if len(ip) > 0 { + containerNameTag["_container_ip_"] = ip + } } did := &DockerInfoDetail{ diff --git a/pluginmanager/logstore_config.go b/pluginmanager/logstore_config.go index c31de03736..846b266bfc 100644 --- a/pluginmanager/logstore_config.go +++ b/pluginmanager/logstore_config.go @@ -440,6 +440,12 @@ func createLogstoreConfig(project string, logstore string, configName string, lo } } logstoreC.GlobalConfig = pluginConfig + if logstoreC.GlobalConfig.PipelineMetaTagKey == nil { + logstoreC.GlobalConfig.PipelineMetaTagKey = make(map[string]string) + } + if logstoreC.GlobalConfig.AgentEnvMetaTagKey == nil { + logstoreC.GlobalConfig.AgentEnvMetaTagKey = make(map[string]string) + } logger.Debug(contextImp.GetRuntimeContext(), "load plugin config", *logstoreC.GlobalConfig) }