Skip to content

Commit

Permalink
Merge branch 'main' into feat/prom-tls-curl
Browse files Browse the repository at this point in the history
  • Loading branch information
catdogpandas committed Nov 25, 2024
2 parents 8c27ef3 + b11fd8f commit c57ff19
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 22 deletions.
35 changes: 32 additions & 3 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "common/FileSystemUtil.h"
#include "common/JsonUtil.h"
#include "common/LogtailCommonFlags.h"
#include "common/version.h"
#include "config/InstanceConfigManager.h"
#include "config/watcher/InstanceConfigWatcher.h"
#include "file_server/ConfigManager.h"
Expand All @@ -39,6 +40,10 @@

using namespace std;

#define ILOGTAIL_PREFIX "ilogtail_"
#define ILOGTAIL_PIDFILE_SUFFIX ".pid"
#define LOONGCOLLECTOR_PREFIX "loongcollector_"

DEFINE_FLAG_BOOL(logtail_mode, "logtail mode", false);
DEFINE_FLAG_INT32(max_buffer_num, "max size", 40);
DEFINE_FLAG_INT32(pub_max_buffer_num, "max size", 8);
Expand Down Expand Up @@ -520,7 +525,7 @@ std::string GetAgentName() {
return "ilogtail";
} else {
return "loongcollector";
}
}
}

std::string GetMonitorInfoFileName() {
Expand All @@ -531,6 +536,30 @@ std::string GetMonitorInfoFileName() {
}
}

std::string GetSymLinkName() {
if (BOOL_FLAG(logtail_mode)) {
return GetProcessExecutionDir() + "ilogtail";
} else {
return GetProcessExecutionDir() + "loongcollector";
}
}

std::string GetPidFileName() {
if (BOOL_FLAG(logtail_mode)) {
return GetProcessExecutionDir() + ILOGTAIL_PREFIX + ILOGTAIL_VERSION + ILOGTAIL_PIDFILE_SUFFIX;
} else {
return GetAgentRunDir() + "loongcollector.pid";
}
}

std::string GetAgentPrefix() {
if (BOOL_FLAG(logtail_mode)) {
return ILOGTAIL_PREFIX;
} else {
return LOONGCOLLECTOR_PREFIX;
}
}

AppConfig::AppConfig() {
LOG_INFO(sLogger, ("AppConfig AppConfig", "success"));
SetIlogtailConfigJson("");
Expand Down Expand Up @@ -1509,8 +1538,8 @@ void AppConfig::ReadFlagsFromMap(const std::unordered_map<std::string, std::stri
* - 记录无法转换的值
*/
void AppConfig::RecurseParseJsonToFlags(const Json::Value& confJson, std::string prefix) {
const static unordered_set<string> sIgnoreKeySet = {"data_server_list", "legacy_data_server_list"};
const static unordered_set<string> sForceKeySet = {"config_server_address_list", "config_server_list"};
const static unordered_set<string> sIgnoreKeySet = {"data_server_list", "data_servers"};
const static unordered_set<string> sForceKeySet = {"config_server_address_list", "config_servers"};
for (auto name : confJson.getMemberNames()) {
auto jsonvalue = confJson[name];
string fullName;
Expand Down
3 changes: 3 additions & 0 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ std::string GetVersionTag();
std::string GetGoPluginCheckpoint();
std::string GetAgentName();
std::string GetMonitorInfoFileName();
std::string GetSymLinkName();
std::string GetPidFileName();
std::string GetAgentPrefix();

template <class T>
class DoubleBuffer {
Expand Down
3 changes: 0 additions & 3 deletions core/common/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,4 @@ extern const char* const ILOGTAIL_UPDATE_SUFFIX;
extern const char* const ILOGTAIL_GIT_HASH;
extern const char* const ILOGTAIL_BUILD_DATE;

#define ILOGTAIL_PREFIX "ilogtail_"
#define ILOGTAIL_PIDFILE_SUFFIX ".pid"

#endif
2 changes: 1 addition & 1 deletion core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ bool Pipeline::Init(PipelineConfig&& config) {
= PluginRegistry::GetInstance()->CreateFlusher(pluginType, GenNextPluginMeta(false));
if (flusher) {
Json::Value optionalGoPipeline;
if (!flusher->Init(detail, mContext, optionalGoPipeline)) {
if (!flusher->Init(detail, mContext, i, optionalGoPipeline)) {
return false;
}
mFlushers.emplace_back(std::move(flusher));
Expand Down
16 changes: 11 additions & 5 deletions core/pipeline/batch/Batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class Batcher {
}
if (mGroupQueue->IsEmpty()) {
TimeoutFlushManager::GetInstance()->UpdateRecord(mFlusher->GetContext().GetConfigName(),
0,
mFlusher->GetFlusherIndex(),
0,
mGroupFlushStrategy->GetTimeoutSecs(),
mFlusher);
Expand All @@ -193,8 +193,11 @@ class Batcher {
g.GetSourceBuffer(),
g.GetExactlyOnceCheckpoint(),
g.GetMetadata(EventGroupMetaKey::SOURCE_ID));
TimeoutFlushManager::GetInstance()->UpdateRecord(
mFlusher->GetContext().GetConfigName(), 0, key, mEventFlushStrategy.GetTimeoutSecs(), mFlusher);
TimeoutFlushManager::GetInstance()->UpdateRecord(mFlusher->GetContext().GetConfigName(),
mFlusher->GetFlusherIndex(),
key,
mEventFlushStrategy.GetTimeoutSecs(),
mFlusher);
mBufferedGroupsTotal->Add(1);
mBufferedDataSizeByte->Add(item.DataSize());
} else if (i == 0) {
Expand Down Expand Up @@ -243,8 +246,11 @@ class Batcher {
mGroupQueue->Flush(res);
}
if (mGroupQueue->IsEmpty()) {
TimeoutFlushManager::GetInstance()->UpdateRecord(
mFlusher->GetContext().GetConfigName(), 0, 0, mGroupFlushStrategy->GetTimeoutSecs(), mFlusher);
TimeoutFlushManager::GetInstance()->UpdateRecord(mFlusher->GetContext().GetConfigName(),
mFlusher->GetFlusherIndex(),
0,
mGroupFlushStrategy->GetTimeoutSecs(),
mFlusher);
}
iter->second.Flush(mGroupQueue.value());
mEventQueueMap.erase(iter);
Expand Down
3 changes: 2 additions & 1 deletion core/pipeline/plugin/instance/FlusherInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ using namespace std;

namespace logtail {

bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, Json::Value& optionalGoPipeline) {
bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, size_t flusherIdx, Json::Value& optionalGoPipeline) {
mPlugin->SetContext(context);
mPlugin->SetPluginID(PluginID());
mPlugin->SetFlusherIndex(flusherIdx);
mPlugin->SetMetricsRecordRef(Name(), PluginID());
if (!mPlugin->Init(config, optionalGoPipeline)) {
return false;
Expand Down
5 changes: 3 additions & 2 deletions core/pipeline/plugin/instance/FlusherInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ namespace logtail {

class FlusherInstance : public PluginInstance {
public:
FlusherInstance(Flusher* plugin, const PluginInstance::PluginMeta& pluginMeta) : PluginInstance(pluginMeta), mPlugin(plugin) {}
FlusherInstance(Flusher* plugin, const PluginInstance::PluginMeta& pluginMeta)
: PluginInstance(pluginMeta), mPlugin(plugin) {}

const std::string& Name() const override { return mPlugin->Name(); };
const Flusher* GetPlugin() const { return mPlugin.get(); }

bool Init(const Json::Value& config, PipelineContext& context, Json::Value& optionalGoPipeline);
bool Init(const Json::Value& config, PipelineContext& context, size_t flusherIdx, Json::Value& optionalGoPipeline);
bool Start() { return mPlugin->Start(); }
bool Stop(bool isPipelineRemoving) { return mPlugin->Stop(isPipelineRemoving); }
bool Send(PipelineEventGroup&& g);
Expand Down
3 changes: 3 additions & 0 deletions core/pipeline/plugin/interface/Flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class Flusher : public Plugin {

QueueKey GetQueueKey() const { return mQueueKey; }
void SetPluginID(const std::string& pluginID) { mPluginID = pluginID; }
size_t GetFlusherIndex() { return mIndex; }
void SetFlusherIndex(size_t idx) { mIndex = idx; }
const std::string& GetPluginID() const { return mPluginID; }

protected:
Expand All @@ -54,6 +56,7 @@ class Flusher : public Plugin {

QueueKey mQueueKey;
std::string mPluginID;
size_t mIndex = 0;

#ifdef APSARA_UNIT_TEST_MAIN
friend class FlusherInstanceUnittest;
Expand Down
12 changes: 6 additions & 6 deletions core/unittest/pipeline/PipelineUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2736,13 +2736,13 @@ void PipelineUnittest::TestSend() const {
{
auto flusher
= PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false));
flusher->Init(Json::Value(), ctx, tmp);
flusher->Init(Json::Value(), ctx, 0, tmp);
pipeline.mFlushers.emplace_back(std::move(flusher));
}
{
auto flusher
= PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false));
flusher->Init(Json::Value(), ctx, tmp);
flusher->Init(Json::Value(), ctx, 0, tmp);
pipeline.mFlushers.emplace_back(std::move(flusher));
}
vector<pair<size_t, const Json::Value*>> configs;
Expand Down Expand Up @@ -2788,13 +2788,13 @@ void PipelineUnittest::TestSend() const {
{
auto flusher
= PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false));
flusher->Init(Json::Value(), ctx, tmp);
flusher->Init(Json::Value(), ctx, 0, tmp);
pipeline.mFlushers.emplace_back(std::move(flusher));
}
{
auto flusher
= PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false));
flusher->Init(Json::Value(), ctx, tmp);
flusher->Init(Json::Value(), ctx, 0, tmp);
pipeline.mFlushers.emplace_back(std::move(flusher));
}

Expand Down Expand Up @@ -2855,13 +2855,13 @@ void PipelineUnittest::TestFlushBatch() const {
{
auto flusher
= PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false));
flusher->Init(Json::Value(), ctx, tmp);
flusher->Init(Json::Value(), ctx, 0, tmp);
pipeline.mFlushers.emplace_back(std::move(flusher));
}
{
auto flusher
= PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false));
flusher->Init(Json::Value(), ctx, tmp);
flusher->Init(Json::Value(), ctx, 0, tmp);
pipeline.mFlushers.emplace_back(std::move(flusher));
}
{
Expand Down
2 changes: 1 addition & 1 deletion core/unittest/plugin/FlusherInstanceUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void FlusherInstanceUnittest::TestInit() const {
= make_unique<FlusherInstance>(new FlusherMock(), PluginInstance::PluginMeta("0"));
Json::Value config, opt;
PipelineContext context;
APSARA_TEST_TRUE(flusher->Init(config, context, opt));
APSARA_TEST_TRUE(flusher->Init(config, context, 0, opt));
APSARA_TEST_EQUAL(&context, &flusher->GetPlugin()->GetContext());
}

Expand Down

0 comments on commit c57ff19

Please sign in to comment.