Skip to content

Commit

Permalink
Merge branch 'main' into upgrade-go
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ authored Jul 24, 2023
2 parents 6a3a410 + 2429af1 commit e27c147
Show file tree
Hide file tree
Showing 23 changed files with 438 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add benchmarking to HTTPJSON input testing. {pull}35138[35138]
- Allow non-AWS endpoints for testing Filebeat awss3 input. {issue}35496[35496] {pull}35520[35520]
- Add AUTH (username) and SSL/TLS support for Redis module {pull}35240[35240]
- Pin PyYAML version to 5.3.1 to avoid CI errors temporarily {pull}36091[36091]

==== Deprecated

Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix recovering from invalid output configuration when running under Elastic-Agent {pull}36016[36016]
- Improve StreamBuf append to improve performance when reading long lines from files. {pull}35928[35928]
- Eliminate cloning of event in deepUpdate {pull}35945[35945]
- Fix ndjson parser to store JSON fields correctly under `target` {issue}29395[29395]
- Support build of projects outside of beats directory {pull}36126[36126]

*Auditbeat*

Expand Down Expand Up @@ -149,6 +151,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Improve error reporting and fix IPv6 handling of TCP and UDP metric collection. {pull}35996[35996]
- Fix handling of NUL-terminated log lines in Fortinet Firewall module. {issue}36026[36026] {pull}36027[36027]
- Make redact field configuration recommended in CEL input and log warning if missing. {pull}36008[36008]
- Fix handling of region name configuration in awss3 input {pull}36034[36034]
- Make CEL input's `now` global variable static for evaluation lifetime. {pull}36107[36107]

*Heartbeat*

Expand Down Expand Up @@ -347,9 +351,11 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add device support for Azure AD entity analytics. {pull}35807[35807]
- Improve CEL input performance. {pull}35915[35915]
- Adding filename details from zip to response for httpjson {issue}33952[33952] {pull}34044[34044]
- Added support for min/max template functions in httpjson input. {issue}36094[36094] {pull}36036[36036]
- Add `clean_session` configuration setting for MQTT input. {pull}35806[16204]
- Add fingerprint mode for the filestream scanner and new file identity based on it {issue}34419[34419] {pull}35734[35734]
- Add file system metadata to events ingested via filestream {issue}35801[35801] {pull}36065[36065]
- Allow parsing bytes in and bytes out as long integer in CEF processor. {issue}36100[36100] {pull}36108[36108]

*Auditbeat*
- Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817]
Expand Down
12 changes: 12 additions & 0 deletions dev-tools/mage/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ func GolangCrossBuild(params BuildArgs) error {
return err
}

// Support projects outside of the beats directory.
repoInfo, err := GetProjectRepoInfo()
if err != nil {
return err
}

// TODO: Support custom build dir/subdir
projectMountPoint := filepath.ToSlash(filepath.Join("/go", "src", repoInfo.CanonicalRootImportPath))
if err := sh.Run("git", "config", "--global", "--add", "safe.directory", projectMountPoint); err != nil {
return err
}

return Build(params)
}

Expand Down
6 changes: 6 additions & 0 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {

// remaining files in newFiles are newly created files
for path, fd := range newFilesByName {
// no need to react on empty new files
if fd.Info.Size() == 0 {
w.log.Warnf("file %q has no content yet, skipping", fd.Filename)
delete(paths, path)
continue
}
select {
case <-ctx.Done():
return
Expand Down
61 changes: 57 additions & 4 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

func TestFileWatcher(t *testing.T) {
Expand Down Expand Up @@ -219,10 +220,10 @@ scanner:
paths := []string{filepath.Join(dir, "*.log")}
cfgStr := `
scanner:
check_interval: 100ms
check_interval: 10ms
`

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()

fw := createWatcherWithConfig(t, paths, cfgStr)
Expand Down Expand Up @@ -252,16 +253,68 @@ scanner:
require.Equal(t, loginp.OpDone, e.Op)
})

t.Run("does not emit events for empty files", func(t *testing.T) {
dir := t.TempDir()
paths := []string{filepath.Join(dir, "*.log")}
cfgStr := `
scanner:
check_interval: 10ms
`

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

err := logp.DevelopmentSetup(logp.ToObserverOutput())
require.NoError(t, err)

fw := createWatcherWithConfig(t, paths, cfgStr)
go fw.Run(ctx)

basename := "created.log"
filename := filepath.Join(dir, basename)
err = os.WriteFile(filename, nil, 0777)
require.NoError(t, err)

t.Run("issues a warning in logs", func(t *testing.T) {
var lastWarning string
expLogMsg := fmt.Sprintf("file %q has no content yet, skipping", filename)
require.Eventually(t, func() bool {
logs := logp.ObserverLogs().FilterLevelExact(logp.WarnLevel.ZapLevel()).TakeAll()
if len(logs) == 0 {
return false
}
lastWarning = logs[len(logs)-1].Message
return strings.Contains(lastWarning, expLogMsg)
}, 100*time.Millisecond, 10*time.Millisecond, "required a warning message %q but got %q", expLogMsg, lastWarning)
})

t.Run("emits a create event once something is written to the empty file", func(t *testing.T) {
err = os.WriteFile(filename, []byte("hello"), 0777)
require.NoError(t, err)

e := fw.Event()
expEvent := loginp.FSEvent{
NewPath: filename,
Op: loginp.OpCreate,
Descriptor: loginp.FileDescriptor{
Filename: filename,
Info: testFileInfo{name: basename, size: 5}, // +5 bytes appended
},
}
requireEqualEvents(t, expEvent, e)
})
})

t.Run("does not emit an event for a fingerprint collision", func(t *testing.T) {
dir := t.TempDir()
paths := []string{filepath.Join(dir, "*.log")}
cfgStr := `
scanner:
check_interval: 100ms
check_interval: 10ms
fingerprint.enabled: true
`

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

fw := createWatcherWithConfig(t, paths, cfgStr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) {
Tags: []types.TagDescription{},
}, nil
},
processorOverwrite: false,
processorOverwrite: true,
previousEvent: mapstr.M{
"cloud.provider": "aws",
},
Expand Down Expand Up @@ -349,6 +349,7 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) {

config, err := conf.NewConfigFrom(map[string]interface{}{
"overwrite": tc.processorOverwrite,
"providers": []string{"aws"},
})
if err != nil {
t.Fatalf("error creating config from map: %s", err.Error())
Expand Down
48 changes: 48 additions & 0 deletions libbeat/reader/parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,54 @@ func TestJSONParsersWithFields(t *testing.T) {
},
},
},
"JSON post processor with dotted target key": {
message: reader.Message{
Content: []byte("{\"key\":\"value\"}"),
Fields: mapstr.M{},
},
config: map[string]interface{}{
"parsers": []map[string]interface{}{
map[string]interface{}{
"ndjson": map[string]interface{}{
"target": "kubernetes.audit",
},
},
},
},
expectedMessage: reader.Message{
Content: []byte(""),
Fields: mapstr.M{
"kubernetes": mapstr.M{
"audit": mapstr.M{
"key": "value",
},
},
},
},
},
"JSON post processor with non-dotted target key": {
message: reader.Message{
Content: []byte("{\"key\":\"value\"}"),
Fields: mapstr.M{},
},
config: map[string]interface{}{
"parsers": []map[string]interface{}{
map[string]interface{}{
"ndjson": map[string]interface{}{
"target": "kubernetes",
},
},
},
},
expectedMessage: reader.Message{
Content: []byte(""),
Fields: mapstr.M{
"kubernetes": mapstr.M{
"key": "value",
},
},
},
},
"JSON post processor with document ID": {
message: reader.Message{
Content: []byte("{\"key\":\"value\", \"my-id-field\":\"my-id\"}"),
Expand Down
4 changes: 3 additions & 1 deletion libbeat/reader/readjson/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ func (p *JSONParser) Next() (reader.Message, error) {
message.Fields = event.Fields
message.Meta = event.Meta
} else {
message.AddFields(mapstr.M{p.target: jsonFields})
fields := mapstr.M{}
fields.Put(p.target, jsonFields)
message.AddFields(fields)
}

return message, err
Expand Down
2 changes: 1 addition & 1 deletion libbeat/tests/system/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pyrsistent==0.16.0
pytest==7.3.2
pytest-rerunfailures==9.1.1
pytest-timeout==1.4.2
PyYAML==5.4.1
PyYAML==5.3.1
redis==4.4.4
requests==2.31.0
semver==2.8.1
Expand Down
2 changes: 1 addition & 1 deletion libbeat/tests/system/requirements_aix.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pyrsistent==0.16.0
pytest==7.3.2
pytest-rerunfailures==9.1.1
pytest-timeout==1.4.2
PyYAML==5.4.1
PyYAML==5.3.1
redis==4.4.4
requests==2.31.0
semver==2.8.1
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN pip3 install --upgrade pip==20.1.1
RUN pip3 install --upgrade docker-compose==1.23.2
RUN pip3 install --upgrade setuptools==47.3.2
RUN pip3 install --upgrade PyYAML==6.0.0
RUN pip3 install --upgrade PyYAML==5.3.1

# Oracle instant client
RUN cd /usr/lib \
Expand Down
8 changes: 6 additions & 2 deletions metricbeat/docs/running-on-docker.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ docker run \
--mount type=bind,source=/proc,target=/hostfs/proc,readonly \ <1>
--mount type=bind,source=/sys/fs/cgroup,target=/hostfs/sys/fs/cgroup,readonly \ <2>
--mount type=bind,source=/,target=/hostfs,readonly \ <3>
--net=host \ <4>
--mount type=bind,source=/var/run/dbus/system_bus_socket,target=/hostfs/var/run/dbus/system_bus_socket,readonly \ <4>
--env DBUS_SYSTEM_BUS_ADDRESS='unix:path=/hostfs/var/run/dbus/system_bus_socket' \ <4>
--net=host \ <5>
{dockerimage} -e -system.hostfs=/hostfs
----

Expand All @@ -36,7 +38,9 @@ mounted inside the directory specified by the `hostfs` config value.
<3> If you want to be able to monitor filesystems from the host by using the
<<metricbeat-metricset-system-filesystem,system filesystem metricset>>, then those filesystems need to be mounted inside
of the container. They can be mounted at any location.
<4> The <<metricbeat-metricset-system-network,system network metricset>> uses data from `/proc/net/dev`, or
<4> The <<metricbeat-metricset-system-users,system users metricset>> and <<metricbeat-metricset-system-service,system service metricset>>
both require access to dbus. Mount the dbus socket and set the `DBUS_SYSTEM_BUS_ADDRESS` environment variable to the mounted system socket path.
<5> The <<metricbeat-metricset-system-network,system network metricset>> uses data from `/proc/net/dev`, or
`/hostfs/proc/net/dev` when using `hostfs=/hostfs`. The only way
to make this file contain the host's network devices is to use the `--net=host`
flag. This is due to Linux namespacing; simply bind mounting the host's `/proc`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ docutils==0.15.2
jmespath==0.9.5
pyasn1==0.4.8
python-dateutil==2.8.1
PyYAML==5.4.1
PyYAML==5.3.1
rsa==4.7.2
s3transfer==0.3.3
six==1.14.0
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ Some built-in helper functions are provided to work with the input state inside
- `hmacBase64`: calculates the hmac signature of a list of strings concatenated together. Returns a base64 encoded signature. Supports sha1 or sha256. Example `[[hmac "sha256" "secret" "string1" "string2" (formatDate (now) "RFC1123")]]`
- `hmac`: calculates the hmac signature of a list of strings concatenated together. Returns a hex encoded signature. Supports sha1 or sha256. Example `[[hmac "sha256" "secret" "string1" "string2" (formatDate (now) "RFC1123")]]`
- `join`: joins a list using the specified separator. Example: `[[join .body.arr ","]]`
- `max`: returns the maximum of two values.
- `min`: returns the minimum of two values.
- `mul`: multiplies two integers.
- `now`: returns the current `time.Time` object in UTC. Optionally, it can receive a `time.Duration` as a parameter. Example: `[[now (parseDuration "-1h")]]` returns the time at 1 hour before now.
- `parseDate`: parses a date string and returns a `time.Time` in UTC. By default the expected layout is `RFC3339` but optionally can accept any of the Golang predefined layouts or a custom one. Example: `[[ parseDate "2020-11-05T12:25:32Z" ]]`, `[[ parseDate "2020-11-05T12:25:32.1234567Z" "RFC3339Nano" ]]`, `[[ (parseDate "Thu Nov 5 12:25:32 +0000 2020" "Mon Jan _2 15:04:05 -0700 2006").UTC ]]`.
Expand Down
25 changes: 19 additions & 6 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
if err != nil && in.config.RegionName == "" {
return fmt.Errorf("failed to get AWS region from queue_url: %w", err)
}
if regionName != in.config.RegionName {
inputContext.Logger.Warnf("configured region disagrees with queue_url region: %q != %q: using %[1]q",
in.config.RegionName, regionName)
regionName = in.config.RegionName
var warn regionMismatchError
if errors.As(err, &warn) {
// Warn of mismatch, but go ahead with configured region name.
inputContext.Logger.Warnf("%v: using %q", err, regionName)
}
in.awsConfig.Region = regionName

Expand Down Expand Up @@ -306,7 +306,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli

var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")

func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (string, error) {
func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (region string, err error) {
// get region from queueURL
// Example: https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs
u, err := url.Parse(queueURL)
Expand All @@ -317,7 +317,11 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (str
queueHostSplit := strings.SplitN(u.Host, ".", 3)
if len(queueHostSplit) == 3 {
if queueHostSplit[2] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) {
return queueHostSplit[1], nil
region = queueHostSplit[1]
if defaultRegion != "" && region != defaultRegion {
return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion}
}
return region, nil
}
} else if defaultRegion != "" {
return defaultRegion, nil
Expand All @@ -326,6 +330,15 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (str
return "", errBadQueueURL
}

type regionMismatchError struct {
queueURLRegion string
defaultRegion string
}

func (e regionMismatchError) Error() string {
return fmt.Sprintf("configured region disagrees with queue_url region: %q != %q", e.queueURLRegion, e.defaultRegion)
}

func getRegionForBucket(ctx context.Context, s3Client *s3.Client, bucketName string) (string, error) {
getBucketLocationOutput, err := s3Client.GetBucketLocation(ctx, &s3.GetBucketLocationInput{
Bucket: awssdk.String(bucketName),
Expand Down
13 changes: 13 additions & 0 deletions x-pack/filebeat/input/awss3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ func TestGetRegionFromQueueURL(t *testing.T) {
endpoint: "googlecloud.com",
wantErr: errBadQueueURL,
},
{
name: "mismatch_regions_no_default",
queueURL: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
deflt: "",
want: "us-east-1",
},
{
name: "mismatch_regions",
queueURL: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
deflt: "ap-west-1",
want: "ap-west-1",
wantErr: regionMismatchError{queueURLRegion: "us-east-1", defaultRegion: "ap-west-1"},
},
{
name: "localstack",
queueURL: "http://localhost:4566/000000000000/filebeat-s3-integtest-d9clk9",
Expand Down
Loading

0 comments on commit e27c147

Please sign in to comment.