Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support go to cpp PipelineEventGroup transfer #1771

Merged
merged 23 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ _deps
/build/
core/build/
core/protobuf/sls/*.pb.*
core/protobuf/models/*.pb.*
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved
core/common/Version.cpp
!/Makefile
# Enterprise
Expand Down
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ set(SUB_DIRECTORIES_LIST
config config/watcher
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
runner runner/sink/http
protobuf/sls
protobuf/sls protobuf/models
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
prometheus prometheus/labels prometheus/schedulers prometheus/async
ebpf ebpf/observer ebpf/security ebpf/handler
Expand Down
3 changes: 3 additions & 0 deletions core/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ logtail_define(protobuf_BIN "Absolute path to protoc" "${DEPS_BINARY_ROOT}/proto
set(PROTO_FILE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/protobuf/sls")
set(PROTO_FILES ${PROTO_FILE_PATH}/sls_logs.proto ${PROTO_FILE_PATH}/logtail_buffer_meta.proto ${PROTO_FILE_PATH}/metric.proto ${PROTO_FILE_PATH}/checkpoint.proto)
execute_process(COMMAND ${protobuf_BIN} --proto_path=${PROTO_FILE_PATH} --cpp_out=${PROTO_FILE_PATH} ${PROTO_FILES})
set(PROTO_FILE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/protobuf/models")
set(PROTO_FILES ${PROTO_FILE_PATH}/log_event.proto ${PROTO_FILE_PATH}/metric_event.proto ${PROTO_FILE_PATH}/span_event.proto ${PROTO_FILE_PATH}/pipeline_event_group.proto)
execute_process(COMMAND ${protobuf_BIN} --proto_path=${PROTO_FILE_PATH} --cpp_out=${PROTO_FILE_PATH} ${PROTO_FILES})

# re2
macro(link_re2 target_name)
Expand Down
1 change: 1 addition & 0 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,3 +563,4 @@ K8sContainerMeta LogtailPlugin::GetContainerMeta(const string& containerID) {
}
return K8sContainerMeta();
}

11 changes: 8 additions & 3 deletions core/models/LogEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ void LogEvent::SetContentNoCopy(const StringBuffer& key, const StringBuffer& val
}

void LogEvent::SetContentNoCopy(StringView key, StringView val) {
auto it = mIndex.find(key);
if (it != mIndex.end()) {
auto rst = mIndex.insert(make_pair(key, mContents.size()));
if (!rst.second) {
auto& it = rst.first;
auto& field = mContents[it->second].first;
mAllocatedContentSize += key.size() + val.size() - field.first.size() - field.second.size();
field = make_pair(key, val);
} else {
mAllocatedContentSize += key.size() + val.size();
mContents.emplace_back(make_pair(key, val), true);
mIndex[key] = mContents.size() - 1;
}
}

Expand All @@ -78,6 +78,11 @@ void LogEvent::DelContent(StringView key) {
}
}

void LogEvent::SetLevel(const std::string& level) {
const StringBuffer& b = GetSourceBuffer()->CopyString(level);
mLevel = StringView(b.data, b.size);
}

LogEvent::ContentIterator LogEvent::FindContent(StringView key) {
auto it = mIndex.find(key);
if (it != mIndex.end()) {
Expand Down
12 changes: 8 additions & 4 deletions core/models/LogEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,14 @@ class LogEvent : public PipelineEvent {
void SetContentNoCopy(StringView key, StringView val);
void DelContent(StringView key);

void SetPosition(uint32_t offset, uint32_t size) {
void SetPosition(uint64_t offset, uint64_t size) {
mFileOffset = offset;
mRawSize = size;
}
std::pair<uint32_t, uint32_t> GetPosition() const { return {mFileOffset, mRawSize}; }
std::pair<uint64_t, uint64_t> GetPosition() const { return {mFileOffset, mRawSize}; }

StringView GetLevel() const { return mLevel; }
void SetLevel(const std::string& level);

bool Empty() const { return mIndex.empty(); }
size_t Size() const { return mIndex.size(); }
Expand Down Expand Up @@ -117,8 +120,9 @@ class LogEvent : public PipelineEvent {
ContentsContainer mContents;
size_t mAllocatedContentSize = 0;
std::map<StringView, size_t> mIndex;
uint32_t mFileOffset = 0;
uint32_t mRawSize = 0;
uint64_t mFileOffset = 0;
uint64_t mRawSize = 0;
StringView mLevel;
};

} // namespace logtail
12 changes: 12 additions & 0 deletions core/models/SpanEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class SpanEvent : public PipelineEvent {
void SetTagNoCopy(const StringBuffer& key, const StringBuffer& val);
void SetTagNoCopy(StringView key, StringView val);
void DelTag(StringView key);
std::map<StringView, StringView>::const_iterator TagsBegin() const { return mTags.mInner.begin(); }
std::map<StringView, StringView>::const_iterator TagsEnd() const { return mTags.mInner.end(); }
size_t TagsSize() const { return mTags.mInner.size(); }

std::shared_ptr<SourceBuffer>& GetSourceBuffer();

Expand Down Expand Up @@ -95,6 +98,9 @@ class SpanEvent : public PipelineEvent {
void SetTagNoCopy(const StringBuffer& key, const StringBuffer& val);
void SetTagNoCopy(StringView key, StringView val);
void DelTag(StringView key);
std::map<StringView, StringView>::const_iterator TagsBegin() const { return mTags.mInner.begin(); }
std::map<StringView, StringView>::const_iterator TagsEnd() const { return mTags.mInner.end(); }
size_t TagsSize() const { return mTags.mInner.size(); }

std::shared_ptr<SourceBuffer>& GetSourceBuffer();

Expand Down Expand Up @@ -153,6 +159,9 @@ class SpanEvent : public PipelineEvent {
void SetTagNoCopy(const StringBuffer& key, const StringBuffer& val);
void SetTagNoCopy(StringView key, StringView val);
void DelTag(StringView key);
std::map<StringView, StringView>::const_iterator TagsBegin() const { return mTags.mInner.begin(); }
std::map<StringView, StringView>::const_iterator TagsEnd() const { return mTags.mInner.end(); }
size_t TagsSize() const { return mTags.mInner.size(); }

const std::vector<InnerEvent>& GetEvents() const { return mEvents; }
InnerEvent* AddEvent();
Expand All @@ -170,6 +179,9 @@ class SpanEvent : public PipelineEvent {
void SetScopeTagNoCopy(const StringBuffer& key, const StringBuffer& val);
void SetScopeTagNoCopy(StringView key, StringView val);
void DelScopeTag(StringView key);
std::map<StringView, StringView>::const_iterator ScopeTagsBegin() const { return mScopeTags.mInner.begin(); }
std::map<StringView, StringView>::const_iterator ScopeTagsEnd() const { return mScopeTags.mInner.end(); }
size_t ScopeTagsSize() const { return mScopeTags.mInner.size(); }

size_t DataSize() const override;

Expand Down
Loading
Loading