Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat][reader] - Adding support for parquet reader #35183

Merged
merged 56 commits into from
May 23, 2023

Conversation

ShourieG
Copy link
Contributor

@ShourieG ShourieG commented Apr 24, 2023

Type of change

  • Enhancement

What does this PR do?

This PR adds support for reading and parsing apache parquet files.

Why is it important?

This change enables us to support future amazon security lake integrations/solutions.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
    - [ ] I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc .

Author's Checklist

  • Code style
  • Code syntax

Related issues

Benchmark

command : go test -v -cpu 1,2,4,8,10 -benchmem -run=^$ -bench . - used on personal macbook 10 core cpu
Output :

Benchmark Format : BenchmarkName-(ConcurrencyLevel)    Iterations    TimePerIteration    MemoryAllocatedPerIteration    AllocsPerIteration

Descriptions: 
BenchmarkName: The name of the function being benchmarked.
(ConcurrencyLevel): The number of parallel processes used for the benchmark.
Iterations: The number of times the function was executed during the benchmark.
TimePerIteration: The average execution time for the function, measured in nanoseconds (ns) per operation.
MemoryAllocatedPerIteration: The average amount of memory allocated per operation, measured in bytes (B) per operation.
AllocsPerIteration: The average number of memory allocations per operation.


Some Benchmark Results : 

Serial Benchmarks: 
File: testdata/taxi_2023_1.parquet, records: 1533383 
Memory consumption:  762919806488 Bytes approx (x_x)
File Size : 47.7 MB
BatchSize : 1
Batches: 1533383

BenchmarkReadParquetSerial-10                  1        498926780292 ns/op      762919806488 B/op       3337097252 allocs/op
PASS
ok      github.com/elastic/beats/v7/x-pack/libbeat/reader/parquet       499.246s

Serial Benchmarks: 
File: testdata/taxi_2023_1.parquet, records: 1533383 
Memory consumption:  7.1 GB approx
File Size : 47.7 MB
BatchSize : 10000
Batches: 154 

BenchmarkReadParquetSingleSerialBatch_10000
BenchmarkReadParquetSingleSerialBatch_10000                    1        7136202750 ns/op        7161997624 B/op 40874004 allocs/op
BenchmarkReadParquetSingleSerialBatch_10000-2                  1        7249478666 ns/op        7161029488 B/op 40870153 allocs/op
BenchmarkReadParquetSingleSerialBatch_10000-4                  1        7015922458 ns/op        7161377352 B/op 40871251 allocs/op
BenchmarkReadParquetSingleSerialBatch_10000-8                  1        7101368125 ns/op        7161955840 B/op 40872282 allocs/op
BenchmarkReadParquetSingleSerialBatch_10000-10                 1        7113368875 ns/op        7162300232 B/op 40872797 allocs/op
PASS
ok      github.com/elastic/beats/v7/x-pack/libbeat/reader/parquet       36.133s

Explanation:
For example, the first benchmark result shows that when the function was executed with a concurrency level of 1, 
it took an average of 7.136202750 seconds to complete one iteration of the function (TimePerIteration), allocated 
an average of 7.161997624 GB of memory (MemoryAllocatedPerIteration), and made an average of 40,874,004 memory allocations (AllocsPerIteration). 

Serial Benchmarks: 
File: testdata/vpc_flow.parquet, records: 652
Memory consumption:  15 mb (approx)
File Size : 33 KB
BatchSize : 1000
Batches: 1
BenchmarkReadParquetSingleVPCSerialBatch_1000

BenchmarkReadParquetSingleVPCSerialBatch_1000       	     100	  11252157 ns/op	15247132 B/op	   55042 allocs/op
BenchmarkReadParquetSingleVPCSerialBatch_1000-2     	     148	   8014055 ns/op	15260143 B/op	   55069 allocs/op
BenchmarkReadParquetSingleVPCSerialBatch_1000-4     	     169	   7139663 ns/op	15266732 B/op	   55042 allocs/op
BenchmarkReadParquetSingleVPCSerialBatch_1000-8     	     132	   8927298 ns/op	15280284 B/op	   55071 allocs/op
BenchmarkReadParquetSingleVPCSerialBatch_1000-10    	     124	   9600573 ns/op	15282426 B/op	   55067 allocs/op

More Benchmarks : benchmarkResults.txt

Sample Log

{
    "@timestamp": "2023-04-24T08:04:11.717Z",
    "@metadata": {
        "beat": "filebeat",
        "type": "_doc",
        "version": "8.8.0",
        "_id": "2fd1e2fbf5-000000000000"
    },
    "message": "{\"activity_id\":3,\"activity_name\":\"Operational\",\"api\":{\"operation\":\"GetBucketAcl\",\"request\":{\"uid\":\"5CQ7E6RQPH8MX989\"},\"response\":{\"error\":null,\"message\":null},\"service\":{\"name\":\"s3.amazonaws.com\"},\"version\":null},\"category_name\":\"Cloud Activity\",\"category_uid\":5,\"class_name\":\"Cloud API\",\"class_uid\":5001,\"cloud\":{\"provider\":\"AWS\",\"region\":\"us-east-1\"},\"http_request\":{\"user_agent\":\"cloudtrail.amazonaws.com\"},\"identity\":{\"idp\":{\"name\":null},\"invoked_by\":\"cloudtrail.amazonaws.com\",\"session\":{\"created_time\":null,\"issuer\":null,\"mfa\":null},\"user\":{\"account_uid\":null,\"credential_uid\":null,\"name\":null,\"type\":\"AWSService\",\"uid\":null,\"uuid\":null}},\"metadata\":{\"product\":{\"feature\":{\"name\":\"Management, Data, and Insights\"},\"name\":\"CloudTrail\",\"vendor_name\":\"AWS\",\"version\":\"1.08\"},\"profiles\":[\"cloud\"],\"version\":\"0.26.1\"},\"ref_event_uid\":\"f7a441e2-c283-4ce3-88e8-811abe3020a4\",\"resources\":[\"arn:aws:s3:::cloudtrail-awslogs-422354213072-innkkddg-isengard-do-not-delete\"],\"severity\":\"Unknown\",\"severity_id\":0,\"src_endpoint\":{\"domain\":\"cloudtrail.amazonaws.com\",\"ip\":null,\"uid\":null},\"time\":1680138206000,\"type_name\":\"Cloud API: Operational\",\"type_uid\":500103,\"unmapped\":[{\"key\":\"eventCategory\",\"value\":\"Management\"},{\"key\":\"sharedEventID\",\"value\":\"db2c11c9-14ce-4a6c-8c68-61484391855b\"},{\"key\":\"requestParameters\",\"value\":\"{\\\"bucketName\\\":\\\"cloudtrail-awslogs-422354213072-innkkddg-isengard-do-not-delete\\\",\\\"Host\\\":\\\"cloudtrail-awslogs-422354213072-innkkddg-isengard-do-not-delete.s3.us-east-1.amazonaws.com\\\",\\\"acl\\\":\\\"\\\"}\"},{\"key\":\"recipientAccountId\",\"value\":\"422354213072\"},{\"key\":\"readOnly\",\"value\":\"true\"},{\"key\":\"eventType\",\"value\":\"AwsApiCall\"},{\"key\":\"managementEvent\",\"value\":\"true\"},{\"key\":\"additionalEventData\",\"value\":\"{\\\"SignatureVersion\\\":\\\"SigV4\\\",\\\"CipherSuite\\\":\\\"ECDHE-RSA-AES128-GCM-SHA256\\\",\\\"bytesTransferredIn\\\":0,\\\"AuthenticationMethod\\\":\\\"AuthHeader\\\",\\\"x-amz-id-2\\\":\\\"YxIU+BXMAq9DAg4pVtxXCWa+42UKP18/B9zrp+LBdFYzjEGmIel619E44tjCig45WJTQhyT1EIE=\\\",\\\"bytesTransferredOut\\\":546}\"}]}",
    "log": {
        "offset": 0,
        "file": {
            "path": "https://securitylakeaws.s3.us-east-1.amazonaws.com/s3_elastic_security_lake/aws/CLOUD_TRAIL/region%3Dus-east-1/accountId%3D422354213072/eventHour%3D2023033001/5ec39807962c1fd7804b70acb94b4abe.gz.parquet"
        }
    },
    "aws": {
        "s3": {
            "bucket": {
                "name": "securitylakeaws",
                "arn": "arn:aws:s3:::securitylakeaws"
            },
            "object": {
                "key": "s3_elastic_security_lake/aws/CLOUD_TRAIL/region=us-east-1/accountId=422354213072/eventHour=2023033001/5ec39807962c1fd7804b70acb94b4abe.gz.parquet"
            }
        }
    },
    "cloud": {
        "provider": "aws",
        "region": "us-east-1"
    },
    "input": {
        "type": "aws-s3"
    },
    "agent": {
        "type": "filebeat",
        "version": "8.8.0",
        "ephemeral_id": "53f26411-5e07-4de5-963c-c7e7de9c3d27",
        "id": "fc0c94b0-ad0e-486e-9f55-c643cf7ef542",
        "name": "Shouries-MacBook-Pro.local"
    },
    "ecs": {
        "version": "8.0.0"
    }
}

@ShourieG ShourieG requested review from a team as code owners April 24, 2023 08:09
@ShourieG ShourieG requested review from rdner and cmacknz and removed request for a team April 24, 2023 08:09
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Apr 24, 2023
@ShourieG ShourieG requested review from a team and andrewkroh April 24, 2023 08:09
@mergify
Copy link
Contributor

mergify bot commented Apr 24, 2023

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @ShourieG? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-external-integrations (Team:Security-External Integrations)

@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Apr 24, 2023
@ShourieG ShourieG added needs_team Indicates that the issue/PR needs a Team:* label 8.9-candidate labels Apr 24, 2023
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Apr 24, 2023
@ShourieG ShourieG requested a review from efd6 April 24, 2023 08:35
@elasticmachine
Copy link
Collaborator

elasticmachine commented Apr 24, 2023

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2023-05-23T06:29:18.693+0000

  • Duration: 98 min 23 sec

Test stats 🧪

Test Results
Failed 0
Passed 26399
Skipped 1975
Total 28374

💚 Flaky test report

Tests succeeded.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

Copy link
Member

@andrewkroh andrewkroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am keen to see some tests and some benchmarks. The areas I'm interested in are how much memory this requires for a given file size and what are the processing costs of converting arrow -> JSON -> beat.Event. We know this reader will be used with VPC flow data from Security Lake, and VPC flows are usually high volume so we want to ensure that this is up to that task (and ideally be faster / more efficient than its text based format equivalent which requires parsing).

I didn't leave any code level comments at this time. I kind of think that this could have its own package somewhere because the aws-s3 input is already complex. Then that package would be used from this input to apply parquet decoding to a stream.


All of the current parsers are stream based. The input can read a chuck of the stream, extract an event, and continue. This has modest memory requirements because it only needs to buffer enough data to uncompress and parse a single chunk.

In contrast, because the Parquet reader needs an io.ReadSeeker this implementation allocates memory (via io.ReadAll) to hold the entire uncompressed object in a buffer. Users are pretty sensitive to high memory usage. Imagine the input downloads a 100 MiB gzip compressed parquet file that has an 80% compression ratio. That file will require at least 800 MiB to be allocated. Now imagine the user has configured max_number_of_messages to 5 so they are trying to process five files in parallel. If all their Parquet files are uniform then they need close 5 GiB of memory just for Filebeat.

If we cannot process the parquet file in chunks directly from S3 then we should consider some means of implementing a reader that has less demands on memory. One method would be to switch over to using disk instead of memory if the content is larger than a few MiB.

dev-tools/notice/rules.json Outdated Show resolved Hide resolved
@andrewkroh
Copy link
Member

Please update the PR title and description to reflect latest the contents.

@ShourieG ShourieG changed the title [filebeat][aws-s3] - Adding support for parquet files [libbeat][reader] - Adding support for parquet reader May 18, 2023
@ShourieG
Copy link
Contributor Author

@andrewkroh, have resolved the suggestions and updated the PR.

@mergify
Copy link
Contributor

mergify bot commented May 19, 2023

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b awss3/parquet upstream/awss3/parquet
git merge upstream/main
git push upstream awss3/parquet

@ShourieG
Copy link
Contributor Author

@efd6, @andrewkroh if all looks good atm could you approve the PR ?

x-pack/libbeat/reader/parquet/parquet.go Outdated Show resolved Hide resolved
@ShourieG ShourieG dismissed shmsr’s stale review May 23, 2023 06:00

already approved

Copy link
Member

@rdner rdner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need do add this to any external documentation or it's just for internal implementation only?

@ShourieG
Copy link
Contributor Author

Do we need do add this to any external documentation or it's just for internal implementation only?

@rdner For now it's an internal implementation. We will add documentation to the inputs which will leverage this reader for reading parquet files.

@ShourieG ShourieG merged commit 90e370b into elastic:main May 23, 2023
@ShourieG ShourieG deleted the awss3/parquet branch May 23, 2023 09:35
chrisberkhout pushed a commit that referenced this pull request Jun 1, 2023
* initial commit for s3 parquet support

* updated changelog

* added license updates

* updated notice and go mod/sum

* removed libgering panic

* added parquet benchmark tests

* updated osquery package due to update in dependant thrift package

* added parquet reader with benchmark tests and implemented that reader in awss3 package

* addressed linting errors

* refactored parquet reader, added tests and benchmarks and addressed pr comments

* addressed pr comments

* resolved merged conflicts

* updated notice

* added more parquet file tests with json comparisons, addressed pr comments

* removed commented codeS

* removed bad imports & cleaned up tests

* updated notice

* added graceful closures with err checks in test

* added graceful closures with err checks in test

* removed s3 parquet implementation from this PR

* removed s3 parquet implementation from this PR

* Update filebeat.yml

* Update filebeat.yml

* updated notice

* addressed PR suggestions

* addressed PR comments

* updated godoc comment

* addressed PR comments, switched path with filebath

* updated CODEOWNERS and addressed PR comments

* addressed PR comments, added a rand seeding process

* fixed test seed value to 1

* updated comments

* removed defers in loops

* updated notice

* updated godoc comments as suggested

* updated changelog

* Update x-pack/libbeat/reader/parquet/parquet.go

Co-authored-by: subham sarkar <[email protected]>

---------

Co-authored-by: subham sarkar <[email protected]>
cmacknz added a commit that referenced this pull request Nov 8, 2023
Manual backport of this change from
#35183
cmacknz added a commit that referenced this pull request Nov 9, 2023
Manual backport of the same upgrade in
#35183 to ensure we are compatible
with the updated thrift dependency.
cmacknz added a commit that referenced this pull request Nov 9, 2023
* Update go grpc version to 1.58.3 (#36904)

(cherry picked from commit 09823f3)

# Conflicts:
#	NOTICE.txt
#	go.mod
#	go.sum

* Resolve conflicts.

* Add CC0-1.0 License to notice rules.

Manual backport of this change from
#35183

* Update notice.

* Update gRPC to the expected 1.58.3 version

* Upgrade osquery-go to fix broken build.

Manual backport of the same upgrade in
#35183 to ensure we are compatible
with the updated thrift dependency.

---------

Co-authored-by: Michal Pristas <[email protected]>
Co-authored-by: Craig MacKenzie <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants