Skip to content

Commit

Permalink
refa error & proto file load
Browse files Browse the repository at this point in the history
  • Loading branch information
ychensha committed Aug 9, 2022
1 parent b9ee01c commit 7b9e2d7
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 84 deletions.
9 changes: 7 additions & 2 deletions apisix/plugins/tencent-cloud-cls.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local cls_sdk = require("apisix.plugins.tencent-cloud-cls.cls-sdk")
local plugin = require("apisix.plugin")
local math = math
local ngx = ngx
local pairs = pairs
Expand Down Expand Up @@ -127,8 +128,12 @@ function _M.log(conf, ctx)
end

local process = function(entries)
return cls_sdk.send_to_cls(conf.secret_id, conf.secret_key,
conf.cls_host, conf.cls_topic, entries)
local sdk, err = cls_sdk.new(conf.cls_host, conf.cls_topic, conf.secret_id, conf.secret_key)
if err then
core.log.error("init sdk failed err:", err)
return false, err
end
return sdk:send_to_cls(entries)
end

batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, process)
Expand Down
109 changes: 97 additions & 12 deletions apisix/plugins/tencent-cloud-cls/cls-sdk.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
--

local pb = require "pb"
local assert = assert
assert(pb.loadfile("apisix/plugins/tencent-cloud-cls/cls.pb"))
local protoc = require("protoc").new()
local http = require("resty.http")
local socket = require("socket")
local str_util = require("resty.string")
Expand All @@ -40,6 +39,8 @@ local ipairs = ipairs
local pairs = pairs
local type = type
local tostring = tostring
local setmetatable = setmetatable
local pcall = pcall

local MAX_SINGLE_VALUE_SIZE = 1 * 1024 * 1024
local MAX_LOG_GROUP_VALUE_SIZE = 5 * 1024 * 1024 -- 5MB
Expand Down Expand Up @@ -112,7 +113,13 @@ local function sign(secret_id, secret_key)
end


local function send_cls_request(host, topic, secret_id, secret_key, pb_data)
local function send_cls_request(host, topic, secret_id, secret_key, pb_obj)
local ok, pb_data = pcall(pb.encode, "cls.LogGroupList", pb_obj)
if not ok or not pb_data then
core.log.error("failed to encode LogGroupList, err: ", pb_data)
return false, pb_data
end

local http_new = http:new()
http_new:set_timeouts(cls_conn_timeout, cls_send_timeout, cls_read_timeout)

Expand Down Expand Up @@ -178,13 +185,88 @@ local function normalize_log(log)
end


local function send_to_cls(secret_id, secret_key, host, topic_id, logs)
local _M = { version = 0.1 }
local mt = { __index = _M }

local pb_state
local function init_pb_state()
pb.state(nil)
protoc.reload()
local cls_sdk_protoc = protoc.new()
if not cls_sdk_protoc.loaded["tencent-cloud-cls/cls.proto"] then
-- https://www.tencentcloud.com/document/product/614/42787
local ok, err = pcall(cls_sdk_protoc.load, cls_sdk_protoc, [[
package cls;
message Log
{
message Content
{
required string key = 1; // Key of each field group
required string value = 2; // Value of each field group
}
required int64 time = 1; // Unix timestamp
repeated Content contents = 2; // Multiple key-value pairs in one log
}
message LogTag
{
required string key = 1;
required string value = 2;
}
message LogGroup
{
repeated Log logs = 1; // Log array consisting of multiple logs
optional string contextFlow = 2; // This parameter does not take effect currently
optional string filename = 3; // Log filename
optional string source = 4; // Log source, which is generally the machine IP
repeated LogTag logTags = 5;
}
message LogGroupList
{
repeated LogGroup logGroupList = 1; // Log group list
}
]], "tencent-cloud-cls/cls.proto")
if not ok then
cls_sdk_protoc:reset()
return "failed to load cls.proto: ".. err
end
end
pb_state = pb.state(nil)
end


function _M.new(host, topic, secret_id, secret_key)
if not pb_state then
local err = init_pb_state()
if err then
return nil, err
end
end
local self = {
host = host,
topic = topic,
secret_id = secret_id,
secret_key = secret_key,
}
return setmetatable(self, mt)
end


function _M.send_to_cls(self, logs)
clear_tab(log_group_list)
local now = ngx_now() * 1000

-- recovery of stored pb_store
pb.state(pb_state)

local total_size = 0
local format_logs = new_tab(#logs, 0)
-- sums of all value in a LogGroup should be no more than 5MB
-- sums of all value in all LogGroup should be no more than 5MB
-- so send whenever size exceed max size
local group_list_start = 1
for i = 1, #logs, 1 do
local contents, log_size = normalize_log(logs[i])
if log_size > MAX_LOG_GROUP_VALUE_SIZE then
Expand All @@ -197,10 +279,14 @@ local function send_to_cls(secret_id, secret_key, host, topic_id, logs)
logs = format_logs,
source = host_ip,
})
local ok, err = send_cls_request(self.host, self.topic,
self.secret_id, self.secret_key, log_group_list_pb)
if not ok then
return false, err, group_list_start
end
group_list_start = i
format_logs = new_tab(#logs - i, 0)
total_size = 0
local data = assert(pb.encode("cls.LogGroupList", log_group_list_pb))
send_cls_request(host, topic_id, secret_id, secret_key, data)
clear_tab(log_group_list)
end
insert_tab(format_logs, {
Expand All @@ -214,10 +300,9 @@ local function send_to_cls(secret_id, secret_key, host, topic_id, logs)
logs = format_logs,
source = host_ip,
})
local data = assert(pb.encode("cls.LogGroupList", log_group_list_pb))
return send_cls_request(host, topic_id, secret_id, secret_key, data)
local ok, err = send_cls_request(self.host, self.topic, self.secret_id,
self.secret_key, log_group_list_pb)
return ok, err, group_list_start
end

return {
send_to_cls = send_to_cls
}
return _M
20 changes: 0 additions & 20 deletions apisix/plugins/tencent-cloud-cls/cls.pb

This file was deleted.

50 changes: 0 additions & 50 deletions apisix/plugins/tencent-cloud-cls/cls.proto

This file was deleted.

0 comments on commit 7b9e2d7

Please sign in to comment.