Skip to content

Commit

Permalink
Update Redis protocol to use ECS fields (#10126)
Browse files Browse the repository at this point in the history
Here's a summary of what fields changed.

Part of #7968

Changed

- bytes_in -> source.bytes
- bytes_out -> destination.bytes
- responsetime -> event.duration (unit are now nanoseconds)

Added

- source
- destination
- event.dataset = redis
- event.end
- event.start
- network.community_id
- network.transport = tcp
- network.protocol = redis
- network.bytes
- network.type

Unchanged Packetbeat Fields

- method
- resource
- path
- query
- status
- type = redis (we might remove this since we have event.dataset)
  • Loading branch information
andrewkroh authored Jan 18, 2019
1 parent 2697d46 commit 4cb81a5
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Changed ICMP protocol fields to align with ECS. {pull}10062[10062]
- Changed DHCPv4 protocol fields to align with ECS. {pull}10089[10089]
- Changed AMQP protocol fields to align with ECS. {pull}10090[10090]
- Changed Redis protocol fields to align with ECS. {pull}10126[10126]
- Changed HTTP protocol fields to align with ECS. {pull}9976[9976]
- Changed Thrift protocol fields to align with ECS. {pull}10125[10125]
- Changed Cassandra protocol fields to align with ECS. {pull}10093[10093]
Expand Down
61 changes: 26 additions & 35 deletions packetbeat/protos/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"

"github.com/elastic/beats/packetbeat/pb"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/applayer"
Expand Down Expand Up @@ -275,55 +276,45 @@ func (redis *redisPlugin) correlate(conn *redisConnectionData) {
}

func (redis *redisPlugin) newTransaction(requ, resp *redisMessage) beat.Event {
error := common.OK_STATUS
if resp.isError {
error = common.ERROR_STATUS
}

var returnValue map[string]common.NetString
if resp.isError {
returnValue = map[string]common.NetString{
"error": resp.message,
}
} else {
returnValue = map[string]common.NetString{
"return_value": resp.message,
}
}

source, destination := common.MakeEndpointPair(requ.tcpTuple.BaseTuple, requ.cmdlineTuple)
src, dst := &source, &destination
if requ.direction == tcp.TCPDirectionReverse {
src, dst = dst, src
}

// resp_time in milliseconds
responseTime := int32(resp.ts.Sub(requ.ts).Nanoseconds() / 1e6)

fields := common.MapStr{
"type": "redis",
"status": error,
"responsetime": responseTime,
"redis": returnValue,
"method": common.NetString(bytes.ToUpper(requ.method)),
"resource": requ.path,
"query": requ.message,
"bytes_in": uint64(requ.size),
"bytes_out": uint64(resp.size),
"src": src,
"dst": dst,
evt, pbf := pb.NewBeatEvent(requ.ts)
pbf.SetSource(src)
pbf.SetDestination(dst)
pbf.Source.Bytes = int64(requ.size)
pbf.Destination.Bytes = int64(resp.size)
pbf.Event.Dataset = "redis"
pbf.Event.Start = requ.ts
pbf.Event.End = resp.ts
pbf.Network.Transport = "tcp"
pbf.Network.Protocol = pbf.Event.Dataset

fields := evt.Fields
fields["type"] = pbf.Event.Dataset
fields["method"] = common.NetString(bytes.ToUpper(requ.method))
fields["resource"] = requ.path
fields["query"] = requ.message

if resp.isError {
evt.PutValue("status", common.ERROR_STATUS)
evt.PutValue("redis.error", resp.message)
} else {
evt.PutValue("status", common.OK_STATUS)
evt.PutValue("redis.return_value", resp.message)
}

if redis.sendRequest {
fields["request"] = requ.message
}
if redis.sendResponse {
fields["response"] = resp.message
}

return beat.Event{
Timestamp: requ.ts,
Fields: fields,
}
return evt
}

func (redis *redisPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8,
Expand Down
13 changes: 7 additions & 6 deletions packetbeat/tests/system/test_0013_redis_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def test_redis_session(self):

objs = self.read_output()
assert all([o["type"] == "redis" for o in objs])
assert all([o["event.dataset"] == "redis" for o in objs])

assert objs[0]["method"] == "SET"
assert objs[0]["resource"] == "key3"
Expand All @@ -45,8 +46,8 @@ def test_redis_session(self):
assert all([isinstance(o["resource"], six.string_types) for o in objs[3:]])
assert all([isinstance(o["query"], six.string_types) for o in objs[3:]])

assert all(["bytes_in" in o for o in objs])
assert all(["bytes_out" in o for o in objs])
assert all(["source.bytes" in o for o in objs])
assert all(["destination.bytes" in o for o in objs])

def test_byteout_bytein(self):
"""
Expand All @@ -60,7 +61,7 @@ def test_byteout_bytein(self):
objs = self.read_output()
assert all([o["type"] == "redis" for o in objs])

assert all([isinstance(o["bytes_out"], int) for o in objs])
assert all([isinstance(o["bytes_in"], int) for o in objs])
assert all([o["bytes_out"] > 0 for o in objs])
assert all([o["bytes_in"] > 0 for o in objs])
assert all([isinstance(o["source.bytes"], int) for o in objs])
assert all([isinstance(o["destination.bytes"], int) for o in objs])
assert all([o["source.bytes"] > 0 for o in objs])
assert all([o["destination.bytes"] > 0 for o in objs])

0 comments on commit 4cb81a5

Please sign in to comment.