Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support go to cpp PipelineEventGroup transfer #1771

Merged
merged 23 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ macro(link_protobuf target_name)
endmacro()
logtail_define(protobuf_BIN "Absolute path to protoc" "${DEPS_BINARY_ROOT}/protoc")
set(PROTO_FILE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/protobuf/sls")
set(PROTO_FILES ${PROTO_FILE_PATH}/sls_logs.proto ${PROTO_FILE_PATH}/logtail_buffer_meta.proto ${PROTO_FILE_PATH}/metric.proto ${PROTO_FILE_PATH}/checkpoint.proto)
set(PROTO_FILES ${PROTO_FILE_PATH}/sls_logs.proto ${PROTO_FILE_PATH}/logtail_buffer_meta.proto ${PROTO_FILE_PATH}/metric.proto ${PROTO_FILE_PATH}/checkpoint.proto ${PROTO_FILE_PATH}/pipeline_event.proto)
execute_process(COMMAND ${protobuf_BIN} --proto_path=${PROTO_FILE_PATH} --cpp_out=${PROTO_FILE_PATH} ${PROTO_FILES})

# re2
Expand Down
70 changes: 70 additions & 0 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "monitor/LogtailAlarm.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "protobuf/sls/pipeline_event.pb.h"
#include "provider/Provider.h"

DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
Expand All @@ -42,6 +43,8 @@ using namespace logtail;

LogtailPlugin* LogtailPlugin::s_instance = NULL;

std::optional<logtail::PipelineEventGroup> TransferPBToPipelineEventGroup(const sls_logs::PipelineEventGroup& src);

LogtailPlugin::LogtailPlugin() {
mPluginAdapterPtr = NULL;
mPluginBasePtr = NULL;
Expand Down Expand Up @@ -563,3 +566,70 @@ K8sContainerMeta LogtailPlugin::GetContainerMeta(const string& containerID) {
}
return K8sContainerMeta();
}

std::optional<logtail::PipelineEventGroup> TransferPBToPipelineEventGroup(const sls_logs::PipelineEventGroup& src) {
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
logtail::PipelineEventGroup dst(std::make_shared<SourceBuffer>());
switch (src.type())
{
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_LOG:
if (src.logs_size() == 0) {
return std::nullopt;
}
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
dst.MutableEvents().reserve(src.logs_size());
for (auto& logSrc : src.logs()) {
auto logDst = dst.CreateLogEvent();
std::optional<uint32_t> ns;
time_t t = time_t(logSrc.time());
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
if (logSrc.has_time_ns()) {
ns = logSrc.time_ns();
}
logDst->SetTimestamp(t, ns);
for (auto& content_pair : logSrc.contents()) {
logDst->SetContent(content_pair.key(), content_pair.value());
}
dst.MutableEvents().emplace_back(std::move(logDst));
}
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved
break;
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_METRIC:
if (src.metrics_size() == 0) {
return std::nullopt;
}
dst.MutableEvents().reserve(src.metrics_size());
for (auto& metricSrc : src.metrics()) {
auto metricDst = dst.CreateMetricEvent();
uint32_t t = metricSrc.time();
std::optional<uint32_t> ns;
if (metricSrc.has_time_ns()) {
ns = metricSrc.time_ns();
}
metricDst->SetTimestamp(t, ns);
metricDst->SetName(metricSrc.name());
switch (metricSrc.type()) {
case sls_logs::MetricEvent::MetricValueType::MetricEvent_MetricValueType_SINGLE:
metricDst->SetValue(UntypedSingleValue{metricSrc.singlevalue()});
break;
case sls_logs::MetricEvent::MetricValueType::MetricEvent_MetricValueType_MULTI:
LOG_ERROR(sLogger, ("metric value type mutivalue unsported", ""));
}
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
for (auto& tag_pair : metricSrc.tags()) {
metricDst->SetTag(tag_pair.first, tag_pair.second);
}
dst.MutableEvents().emplace_back(std::move(metricDst));
}
break;
default:
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
return std::nullopt;
}

for (auto& tag : src.tags()) {
dst.SetTag(tag.first, tag.second);
}

for (auto& metaData : src.metadata()) {
if (metaData.first == "source") {
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
dst.SetMetadata(logtail::EventGroupMetaKey::SOURCE_ID, metaData.second);
}
}

return dst;
}
85 changes: 85 additions & 0 deletions core/protobuf/sls/pipeline_event.proto
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
syntax = "proto2";
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
package sls_logs;
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved

message LogEvent {
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
required uint32 Time = 1;
optional fixed32 Time_ns = 2;
message Content {
required string Key = 1;
required bytes Value = 2;
}
repeated Content Contents= 3;
repeated string Values = 4;
optional uint64 Offset = 5;
optional string Level = 6;
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
}

message MetricEvent {
required uint32 Time = 1;// UNIX Time Format
optional fixed32 Time_ns = 2;
required string Name = 3;
enum MetricValueType {
SINGLE = 0;
MULTI = 1;
}
required MetricValueType Type = 4;
optional double SingleValue = 5;
map<string, double> MultiValue = 6;
map<string, string> Tags = 7;
}
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved

message SpanEvent {
required uint32 Time = 1;
optional fixed32 Time_ns = 2;
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
required string TraceID = 3;
required string SpanID = 4;
optional string TraceState = 5;
optional string ParentSpanID = 6;
optional string Name = 7;
enum SpanKind {
UNSPECIFIED = 0;
INTERVAL = 1;
SERVER = 2;
CLIENT = 3;
PRODUCER = 4;
CONSUMER = 5;
}
optional SpanKind Kind = 8;
required uint64 StartTimeNs = 9;
required uint64 EndTimeNs = 10;
map<string, string> Tags = 11;
message InnerEvent {
required uint64 TimestampNs = 1;
required string Name = 2;
map<string, string> Tags = 3;
}
repeated InnerEvent Events = 12;
message SpanLink {
required string TraceID = 1;
required string SpanID = 2;
optional string TraceState = 3;
map<string, string> Tags = 4;
}
repeated SpanLink Links = 13;
enum StatusCode {
Unset = 0;
Ok = 1;
Error = 2;
}
optional StatusCode Status = 14;
map<string, string> ScopeTags = 15;
}

message PipelineEventGroup {
map<string, bytes> Metadata = 1;
map<string, bytes> Tags = 2;
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved
enum EventType {
LOG = 0;
METRIC = 1;
SPAN = 2;
}
required EventType Type = 3;
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
repeated LogEvent Logs = 4;
repeated MetricEvent Metrics = 5;
repeated SpanEvent Spans = 6;
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
}
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
184 changes: 184 additions & 0 deletions pkg/helper/pipeline_event_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package helper

import (
"fmt"
"sync"
"time"

"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/protocol"
"github.com/alibaba/ilogtail/pkg/util"
)

var LogEventPool = sync.Pool{
New: func() interface{} {
return new(protocol.LogEvent)
},
}

func CreateLogEvent(t time.Time, enableTimestampNano bool, fields map[string]string) (*protocol.LogEvent, error) {
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved
logEvent := LogEventPool.Get().(*protocol.LogEvent)
if len(logEvent.Contents) < len(fields) {
slice := make([]*protocol.LogEvent_Content, len(logEvent.Contents), len(fields))
copy(slice, logEvent.Contents)
logEvent.Contents = slice
} else {
logEvent.Contents = logEvent.Contents[:len(fields)]
}
i := 0
for key, val := range fields {
if i >= len(logEvent.Contents) {
logEvent.Contents = append(logEvent.Contents, &protocol.LogEvent_Content{})
}
logEvent.Contents[i].Key = key
logEvent.Contents[i].Value = util.String2Bytes(val)
i++
}
logEvent.Time = uint32(t.Unix())
if enableTimestampNano {
logEvent.TimeNs = uint32(t.Nanosecond())
}
return logEvent, nil
}

func CreateLogEventByArray(t time.Time, enableTimestampNano bool, columns []string, values []string) (*protocol.LogEvent, error) {
logEvent := LogEventPool.Get().(*protocol.LogEvent)
logEvent.Contents = make([]*protocol.LogEvent_Content, 0, len(columns))
if len(columns) != len(values) {
return nil, fmt.Errorf("columns and values not equal")
}
for index := range columns {
cont := &protocol.LogEvent_Content{
Key: columns[index],
Value: util.String2Bytes(values[index]),
}
logEvent.Contents = append(logEvent.Contents, cont)
}

logEvent.Time = uint32(t.Unix())
if enableTimestampNano {
logEvent.TimeNs = uint32(t.Nanosecond())
}
return logEvent, nil
}

func CreateLogEventByRawLogV1(log *protocol.Log) (*protocol.LogEvent, error) {
logEvent := LogEventPool.Get().(*protocol.LogEvent)
logEvent.Contents = make([]*protocol.LogEvent_Content, 0, len(log.Contents))
for _, logC := range log.Contents {
cont := &protocol.LogEvent_Content{
Key: logC.Key,
Value: util.String2Bytes(logC.Value),
}
logEvent.Contents = append(logEvent.Contents, cont)
}
logEvent.Time = log.Time
logEvent.TimeNs = *log.TimeNs
return logEvent, nil
}

func CreateLogEventByRawLogV2(log *models.Log) (*protocol.LogEvent, error) {
logEvent := LogEventPool.Get().(*protocol.LogEvent)
logEvent.Contents = make([]*protocol.LogEvent_Content, 0, log.Contents.Len())
for k, v := range log.Contents.Iterator() {
cont := &protocol.LogEvent_Content{
Key: k,
Value: util.String2Bytes(v.(string)),
}
logEvent.Contents = append(logEvent.Contents, cont)
}
logEvent.Time = uint32(log.GetTimestamp())
return logEvent, nil
}

func CreateMetricEventByRawMetricV2(metric *models.Metric) (*protocol.MetricEvent, error) {
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
var metricEvent protocol.MetricEvent
metricEvent.Name = metric.Name
if metric.GetValue().IsSingleValue() {
metricEvent.Type = protocol.MetricEvent_SINGLE
metricEvent.SingleValue = metric.GetValue().GetSingleValue()
} else {
metricEvent.Type = protocol.MetricEvent_MULTI
metricEvent.MultiValue = make(map[string]float64, metric.GetValue().GetMultiValues().Len())
for k, v := range metric.GetValue().GetMultiValues().Iterator() {
metricEvent.MultiValue[k] = v
}
}
metricEvent.Tags = make(map[string]string, metric.GetTags().Len())
for k, v := range metric.GetTags().Iterator() {
metricEvent.Tags[k] = v
}
metricEvent.Time = uint32(metric.GetTimestamp())
return &metricEvent, nil
}

func CreatePipelineEventGroupV1(logEvents []*protocol.LogEvent, configTag map[string]string, logTags map[string]string, ctx map[string]interface{}) (*protocol.PipelineEventGroup, error) {
var pipelineEventGroup protocol.PipelineEventGroup
pipelineEventGroup.Type = protocol.PipelineEventGroup_LOG
pipelineEventGroup.Logs = logEvents
pipelineEventGroup.Tags = make(map[string][]byte, len(configTag)+len(logTags))
for k, v := range configTag {
pipelineEventGroup.Tags[k] = util.String2Bytes(v)
}
for k, v := range logTags {
pipelineEventGroup.Tags[k] = util.String2Bytes(v)
}
if ctx != nil {
if source, ok := ctx["source"].(string); ok {
pipelineEventGroup.Metadata = make(map[string][]byte)
pipelineEventGroup.Metadata["source"] = util.String2Bytes(source)
}
}
return &pipelineEventGroup, nil
}

func CreatePipelineEventGroupV2(groupInfo *models.GroupInfo, events []models.PipelineEvent) (*protocol.PipelineEventGroup, error) {
var pipelineEventGroup protocol.PipelineEventGroup
if len(events) == 0 {
return nil, fmt.Errorf("events is empty")
}

eventType := events[0].GetType()
switch eventType {
case models.EventTypeLogging:
pipelineEventGroup.Type = protocol.PipelineEventGroup_LOG
pipelineEventGroup.Logs = make([]*protocol.LogEvent, 0, len(events))
case models.EventTypeMetric:
pipelineEventGroup.Type = protocol.PipelineEventGroup_METRIC
pipelineEventGroup.Metrics = make([]*protocol.MetricEvent, 0, len(events))
case models.EventTypeSpan:
pipelineEventGroup.Type = protocol.PipelineEventGroup_SPAN
pipelineEventGroup.Spans = make([]*protocol.SpanEvent, 0, len(events))
}

for _, event := range events {
if event.GetType() != eventType {
return nil, fmt.Errorf("event type is not same")
}
switch eventType {
case models.EventTypeLogging:
logSrc, ok := event.(*models.Log)
if ok {
logDst, _ := CreateLogEventByRawLogV2(logSrc)
pipelineEventGroup.Logs = append(pipelineEventGroup.Logs, logDst)
}
case models.EventTypeMetric:
metricSrc, ok := event.(*models.Metric)
if ok {
metricDst, _ := CreateMetricEventByRawMetricV2(metricSrc)
pipelineEventGroup.Metrics = append(pipelineEventGroup.Metrics, metricDst)
}
case models.EventTypeSpan:
}
}

pipelineEventGroup.Tags = make(map[string][]byte, groupInfo.Tags.Len())
for k, v := range groupInfo.Tags.Iterator() {
pipelineEventGroup.Tags[k] = util.String2Bytes(v)
}
pipelineEventGroup.Metadata = make(map[string][]byte, groupInfo.Metadata.Len())
for k, v := range groupInfo.Metadata.Iterator() {
pipelineEventGroup.Metadata[k] = util.String2Bytes(v)
}
return &pipelineEventGroup, nil
}
Loading
Loading