Skip to content

Commit

Permalink
rename pipeline_config to continuous_pipeline_config and rename comma…
Browse files Browse the repository at this point in the history
…nd to onetime_pipeline_config (#1902)
  • Loading branch information
quzard authored Nov 22, 2024
1 parent 7c80c3b commit ae008fc
Show file tree
Hide file tree
Showing 23 changed files with 305 additions and 331 deletions.
148 changes: 72 additions & 76 deletions config_server/protocol/v2/README.md

Large diffs are not rendered by default.

153 changes: 71 additions & 82 deletions config_server/protocol/v2/agentV2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,32 @@ enum ConfigStatus {

// Define the Config information carried in the request
message ConfigInfo {
string name = 1; // Required, Config's unique identification
int64 version = 2; // Required, Config's version number or hash code
ConfigStatus status = 3; // Config's status
string message = 4; // Optional error message
map<string, bytes> extra = 5; // Optional extra info
}

// Define the Command information carried in the request
message CommandInfo {
string type = 1; // Command's type
string name = 2; // Required, Command's unique identification
ConfigStatus status = 3; // Command's status
string message = 4; // Optional error message
string name = 1; // Required, Config's unique identification
int64 version = 2; // Required, Config's version number or hash code
ConfigStatus status = 3; // Config's status
string message = 4; // Optional error message
map<string, bytes> extra = 5; // Optional extra info
}

// Define Agent's basic attributes
message AgentAttributes {
bytes version = 1; // Agent's version
bytes ip = 2; // Agent's ip
bytes hostname = 3; // Agent's hostname
bytes hostid = 4; // Agent's hostid https://opentelemetry.io/docs/specs/semconv/attributes-registry/host/
map<string, bytes> extras = 100; // Agent's other attributes
bytes version = 1; // Agent's version
bytes ip = 2; // Agent's ip
bytes hostname = 3; // Agent's hostname
bytes hostid = 4; // Agent's hostid https://opentelemetry.io/docs/specs/semconv/attributes-registry/host/
map<string, bytes> extras = 100; // Agent's other attributes
// before 100 (inclusive) are reserved for future official fields
}

enum AgentCapabilities {
// The capabilities field is unspecified.
UnspecifiedAgentCapability = 0;
// The Agent can accept pipeline configuration from the Server.
AcceptsPipelineConfig = 0x00000001;
UnspecifiedAgentCapability = 0;
// The Agent can accept continuous pipeline configuration from the Server.
AcceptsContinuousPipelineConfig = 0x00000001;
// The Agent can accept instance configuration from the Server.
AcceptsInstanceConfig = 0x00000002;
// The Agent can accept custom command from the Server.
AcceptsCustomCommand = 0x00000004;
AcceptsInstanceConfig = 0x00000002;
// The Agent can accept onetime pipeline configuration from the Server.
AcceptsOnetimePipelineConfig = 0x00000004;

// Add new capabilities here, continuing with the least significant unused bit.
}
Expand All @@ -66,7 +57,7 @@ enum RequestFlags {
// Flags is a bit mask. Values below define individual bits.

// Must be set if this request contains full state
FullState = 0x00000001;
FullState = 0x00000001;
// bits before 2^16 (inclusive) are reserved for future official fields
}

Expand All @@ -75,49 +66,48 @@ enum RequestFlags {
// Agent sends requests to the ConfigServer to get config updates and receive commands.
message HeartbeatRequest {
bytes request_id = 1;
uint64 sequence_num = 2; // Increment every request, for server to check sync status
uint64 capabilities = 3; // Bitmask of flags defined by AgentCapabilities enum
bytes instance_id = 4; // Required, Agent's unique identification, consistent throughout the process lifecycle
string agent_type = 5; // Required, Agent's type(ilogtail, ..)
AgentAttributes attributes = 6; // Agent's basic attributes
repeated AgentGroupTag tags = 7; // Agent's tags
string running_status = 8; // Human readable running status
int64 startup_time = 9; // Required, Agent's startup time
repeated ConfigInfo pipeline_configs = 10; // Information about the current PIPELINE_CONFIG held by the Agent
repeated ConfigInfo instance_configs = 11; // Information about the current AGENT_CONFIG held by the Agent
repeated CommandInfo custom_commands = 12; // Information about command history
uint64 flags = 13; // Predefined command flag
bytes opaque = 14; // Opaque data for extension
uint64 sequence_num = 2; // Increment every request, for server to check sync status
uint64 capabilities = 3; // Bitmask of flags defined by AgentCapabilities enum
bytes instance_id = 4; // Required, Agent's unique identification, consistent throughout the process lifecycle
string agent_type = 5; // Required, Agent's type(ilogtail, ..)
AgentAttributes attributes = 6; // Agent's basic attributes
repeated AgentGroupTag tags = 7; // Agent's tags
string running_status = 8; // Human readable running status
int64 startup_time = 9; // Required, Agent's startup time
repeated ConfigInfo continuous_pipeline_configs = 10; // Information about the current continuous pipeline configs held by the Agent
repeated ConfigInfo instance_configs = 11; // Information about the current instance configs held by the Agent
repeated ConfigInfo onetime_pipeline_configs = 12; // Information about onetime pipeline configs history
uint64 flags = 13; // Predefined command flag
bytes opaque = 14; // Opaque data for extension
// before 100 (inclusive) are reserved for future official fields
}

// Define Config's detail
message ConfigDetail {
string name = 1; // Required, Config's unique identification
int64 version = 2; // Required, Config's version number or hash code
bytes detail = 3; // Required, Config's detail
string name = 1; // Required, Config's unique identification
int64 version = 2; // Required, Config's version number or hash code
bytes detail = 3; // Required, Config's detail
map<string, bytes> extra = 4; // Optional extra info
}

message CommandDetail {
string type = 1; // Required, Command type
string name = 2; // Required, Command name
bytes detail = 3; // Required, Command's detail
int64 expire_time = 4; // After which the command can be safely removed from history
map<string, bytes> extra = 5; // Optional extra info
string name = 1; // Required, Command name
bytes detail = 2; // Required, Command's detail
int64 expire_time = 3; // After which the command can be safely removed from history
map<string, bytes> extra = 4; // Optional extra info
}

enum ServerCapabilities {
// The capabilities field is unspecified.
UnspecifiedServerCapability = 0;
UnspecifiedServerCapability = 0;
// The Server can remember agent attributes.
RembersAttribute = 0x00000001;
// The Server can remember pipeline config status.
RembersPipelineConfigStatus = 0x00000002;
RembersAttribute = 0x00000001;
// The Server can remember continuous pipeline config status.
RembersContinuousPipelineConfigStatus = 0x00000002;
// The Server can remember instance config status.
RembersInstanceConfigStatus = 0x00000004;
// The Server can remember custom command status.
RembersCustomCommandStatus = 0x00000008;
RembersInstanceConfigStatus = 0x00000004;
// The Server can remember onetime pipeline config status.
RembersOnetimePipelineConfigStatus = 0x00000008;

// bits before 2^16 (inclusive) are reserved for future official fields
}
Expand All @@ -131,66 +121,65 @@ enum ResponseFlags {
// some sub-message in the last AgentToServer message (which is an allowed
// optimization) but the Server detects that it does not have it (e.g. was
// restarted and lost state).
ReportFullState = 0x00000001;
// FetchPipelineConfigDetail can be used by the Server to tell Agent to fetch config details by FetchConfig api,
ReportFullState = 0x00000001;
// FetchContinuousPipelineConfigDetail can be used by the Server to tell Agent to fetch continuous pipeline config details by FetchConfig api,
// HB response ConfigDetail will not contains details.
FetchPipelineConfigDetail = 0x00000002;
// like FetchPipelineConfigDetail, but for instance config.
FetchContinuousPipelineConfigDetail = 0x00000002;
// like FetchContinuousPipelineConfigDetail, but for instance config.
FetchInstanceConfigDetail = 0x00000004;
// bits before 2^16 (inclusive) are reserved for future official fields
}

// ConfigServer's response to Agent's request
message HeartbeatResponse {
bytes request_id = 1;
CommonResponse commonResponse = 2; // Set common response
uint64 capabilities = 3; // Bitmask of flags defined by ServerCapabilities enum
bytes request_id = 1;
CommonResponse common_response = 2; // Set common response
uint64 capabilities = 3; // Bitmask of flags defined by ServerCapabilities enum

repeated ConfigDetail pipeline_config_updates = 4; // Agent's pipeline config update status
repeated ConfigDetail instance_config_updates = 5; // Agent's instance config update status
repeated CommandDetail custom_command_updates = 6; // Agent's commands updates
uint64 flags = 7; // Predefined command flag
bytes opaque = 8; // Opaque data for extension
repeated ConfigDetail continuous_pipeline_config_updates = 4; // Agent's continuous pipeline config update status
repeated ConfigDetail instance_config_updates = 5; // Agent's instance config update status
repeated CommandDetail onetime_pipeline_config_updates = 6; // Agent's onetime pipeline config updates
uint64 flags = 7; // Predefined command flag
bytes opaque = 8; // Opaque data for extension
}

// API: /Agent/FetchConfig
// optional api for fetching configs details, but not by heartbeat response with config details, see README.
message FetchConfigRequest {
bytes request_id = 1;
bytes instance_id = 2; // Agent's unique identification
repeated ConfigInfo pipeline_configs = 3; // Information about the current PIPELINE_CONFIG held by the Agent
repeated ConfigInfo instance_configs = 4; // Information about the current AGENT_CONFIG held by the Agent
repeated CommandInfo custom_commands = 5; // Information about command history
bytes request_id = 1;
bytes instance_id = 2; // Agent's unique identification
repeated ConfigInfo continuous_pipeline_configs = 3; // Information about the current continuous pipeline configs held by the Agent
repeated ConfigInfo instance_configs = 4; // Information about the current instance configs held by the Agent
repeated ConfigInfo onetime_pipeline_configs = 5; // Information about onetime pipeline configs history
}

// ConfigServer response to Agent's config fetching request
message FetchConfigResponse {
bytes request_id = 1;
CommonResponse commonResponse = 2;
repeated ConfigDetail pipeline_config_updates = 3; // Agent's pipeline config with details
repeated ConfigDetail instance_config_updates = 4; // Agent's instance config with details
repeated CommandDetail custom_command_updates = 5; // Agent's commands details
bytes request_id = 1;
CommonResponse common_response = 2;
repeated ConfigDetail continuous_pipeline_config_updates = 3; // Agent's continuous pipeline config with details
repeated ConfigDetail instance_config_updates = 4; // Agent's instance config with details
repeated CommandDetail onetime_pipeline_config_updates = 5; // Agent's onetime pipeline config details
}

// API: /Agent/ReportStatus
// optional api for report config status, but not wait util next heartbeat, see README.
// if HB server and Status server are different service, this api may be help.
message ReportStatusRequest {
bytes request_id = 1;
bytes instance_id = 2; // Agent's unique identification
repeated ConfigInfo pipeline_configs = 3; // status about the current PIPELINE_CONFIG held by the Agent
repeated ConfigInfo instance_configs = 4; // status about the current AGENT_CONFIG held by the Agent
repeated CommandInfo custom_commands = 5; // status about command history
bytes instance_id = 2; // Agent's unique identification
repeated ConfigInfo continuous_pipeline_configs = 3; // status about the current continuous pipeline configs held by the Agent
repeated ConfigInfo instance_configs = 4; // status about the current instance configs held by the Agent
repeated ConfigInfo onetime_pipeline_configs = 5; // status about onetime pipeline configs history
}

// ConfigServer response to Agent's report status request
message ReportStatusResponse {
bytes request_id = 1;
CommonResponse commonResponse = 2;
CommonResponse common_response = 2;
}

message CommonResponse
{
message CommonResponse {
int32 status = 1;
bytes errorMessage = 2;
}
4 changes: 2 additions & 2 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,11 +483,11 @@ string GetFileTagsDir() {
}
}

string GetPipelineConfigDir() {
string GetContinuousPipelineConfigDir() {
if (BOOL_FLAG(logtail_mode)) {
return "config";
} else {
return "pipeline_config";
return "continuous_pipeline_config";
}
}

Expand Down
3 changes: 2 additions & 1 deletion core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ std::string GetObserverEbpfHostPath();
std::string GetSendBufferFileNamePrefix();
std::string GetLegacyUserLocalConfigFilePath();
std::string GetExactlyOnceCheckpoint();
std::string GetContinuousPipelineConfigDir();
std::string GetPipelineConfigDir();
std::string GetPluginLogName();
std::string GetVersionTag();
Expand Down Expand Up @@ -304,7 +305,7 @@ class AppConfig {

public:
AppConfig();
~AppConfig() {};
~AppConfig(){};

void LoadInstanceConfig(const std::map<std::string, std::shared_ptr<InstanceConfig>>&);

Expand Down
6 changes: 3 additions & 3 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,13 @@ void Application::Start() { // GCOVR_EXCL_START

{
// add local config dir
filesystem::path localConfigPath
= filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir()) / GetPipelineConfigDir() / "local";
filesystem::path localConfigPath = filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir())
/ GetContinuousPipelineConfigDir() / "local";
error_code ec;
filesystem::create_directories(localConfigPath, ec);
if (ec) {
LOG_WARNING(sLogger,
("failed to create dir for local pipeline_config",
("failed to create dir for local continuous_pipeline_config",
"manual creation may be required")("error code", ec.value())("error msg", ec.message()));
}
PipelineConfigWatcher::GetInstance()->AddSource(localConfigPath.string());
Expand Down
Loading

0 comments on commit ae008fc

Please sign in to comment.