-
Notifications
You must be signed in to change notification settings - Fork 255
Transformers
Transformer是Parser的一个补充,它针对字段进行数据变换。
在大多数场景下,你可能用Parser就解决了全部问题,但是有些场合,你使用的Parser可能比较简单,如json parser,此时你发现数据里面有一部分字段你希望做一些扩展,比如有个ip字符串,你希望将其扩展为 ip 对应的区域、城市、省份、运营商等信息,此时你就可以配置一个Transformer,对IP字段进行转换。
再比如,当你希望做一些字符串替换的时候,你又不想写一个Parser,只希望对某个字段做一个字符串替换处理,那就可以写配置一个replace transformer。
Transform既可以在Parser前调用,也可以在Parser后调用。同样,也可以连着调用,调用顺序就是你在配置文件中配置的顺序。
一份带有 Transformer的配置如下:
{
"name":"test2.csv",
"reader":{
"log_path":"./tests/logdir",
"mode":"dir"
},
"parser":{
"name":"jsonps",
"type":"json"
},
"transforms":[{
"type":"replace",
"stage":"before_parser",
"old":"\\x",
"new":"\\\\x"
}],
"senders":[{
"name":"file_sender",
"sender_type":"file",
"file_send_path":"./test2/test2_csv_file.txt"
}]
}
可以看到,在reader、parser 和 sender 同级别下面,增加一个 transforms 字段,对应的就是不同transformer对应的列表。
type
字段是固定的,每个Transformer都有,表示Transformer的类型名称,其他字段则根据Transformer表达的转换行为各不相同。可以参见每个Transformer的介绍页。
目前支持的Transformer有:
- replace transformer : 针对字段做字符串替换。
- IP transformer: 针对ip做运营商信息字段扩展。
- date transformer: 将字段解析为时间并做一些转换。
- discard transformer: 将指定字段的数据抛弃,其他保留,与pick Transformer相反。
- pick transformer: 将指定字段的数据保留,其他抛弃,与discard Transformer相反。
- split transformer: 将指定字段的数据切分为字符串数组。
- convert transformer: 按照dsl将数据进行格式转换。
- urlparam transformer: 将指定字段的数据按照url参数的格式转换为键值对。
- arrayexpand transformer: 将指定字段的数组展开并转换为键值对。
- rename transformer: 将字段名称重命名,解决不同下游系统对字段名称中特殊字符不支持的问题。
- label transformer: 添加一个带有固定值的字段到数据中,相当于加个标签。
- json transformer: 将一个符合json格式的字符串字段,反序列化为对应结构体类型。
- script transformer: 执行脚本文件并记录脚本执行结果。
- clocktrail transformer: 针对 AWS ClockTrail的数据做格式转换的Transformer,可以将ClockTrail的json格式中的 Records逐条变为数据。
- pandora_key_convert transformer: 将不符合pandora 字段类型的key字符转化为下划线。
- UserAgent transformer: 浏览器中的user agent信息解析,可以解析出包括设备、操作系统、版本号在内的多种信息。
-
xml transformer: 将一个符合xml格式的字符串字段,反序列化为对应
map[string]interface{}
结构体类型。 - mapreplace transformer: 将字符串的值替换到对应映射的内容。
- k8stag transformer: 对于运行在Kubernetes上的logkit,可以通过k8stag Transformer获得容器的具体信息,如podname, container id 等。
目前logkit的Transformer接口如下:
// 注意: transform的规则是,出错要把数据原样返回
type Transformer interface {
Description() string
SampleConfig() string
ConfigOptions() []utils.Option
Type() string
Transform([]sender.Data) ([]sender.Data, error)
RawTransform([]string) ([]string, error)
Stage() string
Stats() utils.StatsInfo
}
-
Description() string
: 描述并返回Transformer作用的字符串 -
SampleConfig() string
: 描述并返回Transformer示例配置的字符串 -
ConfigOptions() []utils.Option
: 描述并返回Transform配置选项的方法。 -
Type() string
: 描述Transform 类型的方法 -
Transform([]sender.Data) ([]sender.Data, error)
: 转换Transform数据,该方法转换的是Parser后的数据,接受的是Data数组。 -
RawTransform([]string) ([]string, error)
: 转换Transform数据,该方法转换的是Parser前的数据,接受的是string数组 -
Stage() string
: 表示Transform是用于Parser前还是Parser后 -
Stats() utils.StatsInfo
: 统计Transform成功失败的数据
如下所示,每个Transformer需要在init方法中注册自己,这样只需import对应的包就能注册,无需额外的代码侵入。
func init() {
transforms.Add("discard", func() transforms.Transformer {
return &Discarder{}
})
}
注册时返回的实际上是一个create方法,create的就是对应Transformer的结构体。
Transformer中最有趣的是如何将各种不同的配置统一的定义到Transformer中呢? 答案就是利用json协议,将配置的json字符串反序列化到生成Transformer结构体中,即实现了每个具体Transformer的赋值。
creater, _ := transforms.Transformers[TransformType] //从注册中获得对应类型的Transformer creator
trans := creater() //调用Transformer creator创建Transformer对象
err = json.Unmarshal(bts, trans) // 将配置的json字符串反序列化到对象的结构体中
通过上述过程,用户的不同配置就能让不同Transformer感知了。
所以在定义Transformer时需要注意描述好对应的json格式key名称,并且保证变量名是大写的,如下所示为一个表示字符串替换的Transformer定义:
type Replacer struct {
StageTime string `json:"stage"`
Key string `json:"key"`
Old string `json:"old"`
New string `json:"new"`
}
Transformer可以在Parser前使用也可以在Parser后使用,其中 Stage()
方法表示的就是这个位置。
Parser前使用即Reader刚刚获取的数据就做一次Transform转换,针对的就是reader中读取的字符串,这里你可以做一些诸如字符串替换、字符串切割的Transformer,以便直接使用诸如json parser
这样通用的Parser,省的再去编写复杂的解析过程。
Parser后使用的Transform就多了,因为经过了Parser,带有了schema信息,可以知道数据的字段名称和类型,所以可以针对某些字段做转换,比如字符串操作、IP扩展等等。
多个Transformer可以按顺序串联起来依次执行,写配置文件的时候按顺序填写在json数组中即可。
为了使logkit可以在前端更好的展示,Transformer中还需要写ConfigOptions()
接口,该接口返回的就是每个Transformer的配置对应的字段名称、数据类型以及作用说明。
package mutate
import (
"errors"
"github.com/qiniu/logkit/sender"
"github.com/qiniu/logkit/transforms"
"github.com/qiniu/logkit/utils"
)
type Discarder struct {
Key string `json:"key"`
stats utils.StatsInfo
}
func (g *Discarder) RawTransform(datas []string) ([]string, error) {
return datas, errors.New("discard transformer not support rawTransform")
}
func (g *Discarder) Transform(datas []sender.Data) ([]sender.Data, error) {
var ferr error
errnums := 0
for i := range datas {
delete(datas[i], g.Key)
}
g.stats.Errors += int64(errnums)
g.stats.Success += int64(len(datas) - errnums)
return datas, ferr
}
func (g *Discarder) Description() string {
return "discard onefield from data"
}
func (g *Discarder) Type() string {
return "discard"
}
func (g *Discarder) SampleConfig() string {
return `{
"type":"discard",
"key":"DiscardFieldKey"
}`
}
func (g *Discarder) ConfigOptions() []utils.Option {
return []utils.Option{
transforms.KeyStageAfterOnly,
transforms.KeyFieldName,
}
}
func (g *Discarder) Stage() string {
return transforms.StageAfterParser
}
func (g *Discarder) Stats() utils.StatsInfo {
return g.stats
}
func init() {
transforms.Add("discard", func() transforms.Transformer {
return &Discarder{}
})
}
快速开始 | Pandora | Readers | Parsers | Senders | Download | 七牛智能日志管理平台 | logkit-pro专业版