diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b0585fa593a..356a067fb67 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -399,6 +399,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - The s3 input can now automatically detect gzipped objects. {issue}18283[18283] {pull}18764[18764] - Add geoip AS lookup & improve ECS categorization in aws cloudtrail fileset. {issue}18644[18644] {pull}18958[18958] - Improved performance of PANW sample dashboards. {issue}19031[19031] {pull}19032[19032] +- Add support for v1 consumer API in Cloud Foundry input, use it by default. {pull}19125[19125] *Heartbeat* diff --git a/NOTICE.txt b/NOTICE.txt index baeaa60356f..5a1c642335d 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -988,6 +988,31 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------- +Dependency: github.com/cloudfoundry/noaa +Version: v2.1.0 +License type (autodetected): Apache-2.0 +./vendor/github.com/cloudfoundry/noaa/LICENSE: +-------------------------------------------------------------------- +Apache License 2.0 + +-------NOTICE----- +noaa + +Copyright (c) 2015-Present CloudFoundry.org Foundation, Inc. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + -------------------------------------------------------------------- Dependency: github.com/cloudfoundry/sonde-go Revision: b33733203bb4 diff --git a/deploy/cloudfoundry/filebeat/filebeat.yml b/deploy/cloudfoundry/filebeat/filebeat.yml index ad3ca5ff92c..f4c4943d4bb 100644 --- a/deploy/cloudfoundry/filebeat/filebeat.yml +++ b/deploy/cloudfoundry/filebeat/filebeat.yml @@ -12,6 +12,7 @@ filebeat.inputs: #uaa_address: ${UAA_ADDRESS} #rlp_address: ${RLP_ADDRESS} #shard_id: ${SHARD_ID} + #version: v1 #================================ Outputs ===================================== diff --git a/go.mod b/go.mod index ff8759afeea..a476ff09f25 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/akavel/rsrc v0.8.0 // indirect github.com/andrewkroh/sys v0.0.0-20151128191922-287798fe3e43 github.com/antlr/antlr4 v0.0.0-20200225173536-225249fdaef5 + github.com/apoydence/eachers v0.0.0-20181020210610-23942921fe77 // indirect github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 github.com/aws/aws-lambda-go v1.6.0 github.com/aws/aws-sdk-go-v2 v0.9.0 @@ -36,6 +37,7 @@ require ( github.com/cavaliercoder/go-rpm v0.0.0-20190131055624-7a9c54e3d83e github.com/cespare/xxhash/v2 v2.1.1 github.com/cloudfoundry-community/go-cfclient v0.0.0-20190808214049-35bcce23fc5f + github.com/cloudfoundry/noaa v2.1.0+incompatible github.com/cloudfoundry/sonde-go v0.0.0-20171206171820-b33733203bb4 github.com/containerd/fifo v0.0.0-20190816180239-bda0ff6ed73c github.com/coreos/bbolt v1.3.1-coreos.6.0.20180318001526-af9db2027c98 @@ -123,6 +125,7 @@ require ( github.com/pierrre/gotestcover v0.0.0-20160113212533-7b94f124d338 github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.0 + github.com/poy/eachers v0.0.0-20181020210610-23942921fe77 // indirect github.com/prometheus/client_golang v1.1.1-0.20190913103102-20428fa0bffc // indirect github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 github.com/prometheus/common v0.7.0 diff --git a/go.sum b/go.sum index 9f55b113025..f4882202a84 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/antlr/antlr4 v0.0.0-20200225173536-225249fdaef5 h1:nkZ9axP+MvUFCu8JRN/MCY+DmTfs6lY7hE0QnJbxSdI= github.com/antlr/antlr4 v0.0.0-20200225173536-225249fdaef5/go.mod h1:T7PbCXFs94rrTttyxjbyT5+/1V8T2TYDejxUfHJjw1Y= +github.com/apoydence/eachers v0.0.0-20181020210610-23942921fe77 h1:afT88tB6u9JCKQZVAAaa9ICz/uGn5Uw9ekn6P22mYKM= +github.com/apoydence/eachers v0.0.0-20181020210610-23942921fe77/go.mod h1:bXvGk6IkT1Agy7qzJ+DjIw/SJ1AaB3AvAuMDVV+Vkoo= github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= @@ -146,6 +148,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudfoundry-community/go-cfclient v0.0.0-20190808214049-35bcce23fc5f h1:fK3ikA1s77arBhpDwFuyO0hUZ2Aa8O6o2Uzy8Q6iLbs= github.com/cloudfoundry-community/go-cfclient v0.0.0-20190808214049-35bcce23fc5f/go.mod h1:RtIewdO+K/czvxvIFCMbPyx7jdxSLL1RZ+DA/Vk8Lwg= +github.com/cloudfoundry/noaa v2.1.0+incompatible h1:hr6VnM5VlYRN3YD+NmAedQLW8686sUMknOSe0mFS2vo= +github.com/cloudfoundry/noaa v2.1.0+incompatible/go.mod h1:5LmacnptvxzrTvMfL9+EJhgkUfIgcwI61BVSTh47ECo= github.com/cloudfoundry/sonde-go v0.0.0-20171206171820-b33733203bb4 h1:cWfya7mo/zbnwYVio6eWGsFJHqYw4/k/uhwIJ1eqRPI= github.com/cloudfoundry/sonde-go v0.0.0-20171206171820-b33733203bb4/go.mod h1:GS0pCHd7onIsewbw8Ue9qa9pZPv2V88cUZDttK6KzgI= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= @@ -566,6 +570,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/poy/eachers v0.0.0-20181020210610-23942921fe77 h1:SNdqPRvRsVmYR0gKqFvrUKhFizPJ6yDiGQ++VAJIoDg= +github.com/poy/eachers v0.0.0-20181020210610-23942921fe77/go.mod h1:x1vqpbcMW9T/KRcQ4b48diSiSVtYgvwQ5xzDByEg4WE= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQP1xR9D75/vuwEF3g= diff --git a/vendor/github.com/cloudfoundry/noaa/.gitignore b/vendor/github.com/cloudfoundry/noaa/.gitignore new file mode 100644 index 00000000000..8f6229ff4be --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/.gitignore @@ -0,0 +1,9 @@ +*.coverprofile +src/ +pkg/ +.idea +*.iml + +# compiled binaries +bin/ +!bin/test diff --git a/vendor/github.com/cloudfoundry/noaa/.travis.yml b/vendor/github.com/cloudfoundry/noaa/.travis.yml new file mode 100644 index 00000000000..4a0a9fb3481 --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/.travis.yml @@ -0,0 +1,28 @@ +language: go +notifications: + email: + - cf-lamb@pivotallabs.com +before_install: +- go get github.com/mattn/goveralls +- go get github.com/onsi/ginkgo/ginkgo +after_success: +- 'echo "mode: set" > all.coverprofile' +- 'find . -name "*.coverprofile" -exec grep -v mode: {} >> all.coverprofile \;' +- PATH=$HOME/gopath/bin:$PATH goveralls -coverprofile=all.coverprofile -repotoken=$COVERALLS_TOKEN +install: +- go get -d -v -t ./... + +script: PATH=$HOME/gopath/bin:$PATH bin/test + +go: +- 1.6 +- 1.7 +- tip + +matrix: + allow_failures: + - go: tip + +env: + global: + secure: wulSvmmwbaIe8APoYwTjN6zLFdIXYrazmBTOuFpyui0BUpxmKdXJ/hEMVVI0p3BvehYkKU+xVrjjBc3/IZgUXFybM9MwYQ+CH4wtsMSp0ndHnzkYGaxut1kUXb+e5edjJ5bOi9Xy9qGxeH9rqpl/F1z4piGnujd2jJjVTlwVXGM= diff --git a/vendor/github.com/cloudfoundry/noaa/LICENSE b/vendor/github.com/cloudfoundry/noaa/LICENSE new file mode 100644 index 00000000000..e06d2081865 --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/LICENSE @@ -0,0 +1,202 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/vendor/github.com/cloudfoundry/noaa/NOTICE b/vendor/github.com/cloudfoundry/noaa/NOTICE new file mode 100644 index 00000000000..53aae89d36c --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/NOTICE @@ -0,0 +1,15 @@ +noaa + +Copyright (c) 2015-Present CloudFoundry.org Foundation, Inc. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/vendor/github.com/cloudfoundry/noaa/README.md b/vendor/github.com/cloudfoundry/noaa/README.md new file mode 100644 index 00000000000..2e9d87934ac --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/README.md @@ -0,0 +1,131 @@ +# NOAA [![slack.cloudfoundry.org][slack-badge]][loggregator-slack] + +[![Concourse Status](https://loggregator.ci.cf-app.com/api/v1/pipelines/submodules/jobs/noaa-unit-tests/badge)](https://loggregator.ci.cf-app.com/teams/main/pipelines/submodules/jobs/noaa-unit-tests) +[![Coverage Status](https://coveralls.io/repos/cloudfoundry/noaa/badge.png)](https://coveralls.io/r/cloudfoundry/noaa) +[![GoDoc](https://godoc.org/github.com/cloudfoundry/noaa?status.png)](https://godoc.org/github.com/cloudfoundry/noaa) + +noaa is a client library to consume metric and log messages from Doppler. + +## Get the Code + +This Go project is designed to be imported into `$GOPATH`, rather than being cloned into any working directory. There are two ways to do this. + +- The easiest way with with `go get`. This will import the project, along with all dependencies, into your `$ GOPATH`. + ``` + $ echo $GOPATH + /Users/myuser/go + + $ go get github.com/cloudfoundry/noaa + + $ ls ~/go/src/github.com/cloudfoundry/ + noaa/ sonde-go/ + ``` + +- You can also manually clone the repo into your `$GOPATH`, but you then have to manually import dependencies. + ``` + $ echo $GOPATH + /Users/myuser/go + + $ cd /Users/myuser/go/src/github.com/cloudfoundry + $ git clone git@github.com:cloudfoundry/noaa.git + $ cd noaa + $ go get ./... + ``` + +## Updates + +### Reconnecting to Traffic Controller + +noaa has recently updated its reconnect strategy from trying to reconnect five +times in quick succession to a back-off strategy. The back-off strategy can be +configured by setting the [SetMinRetryDelay()](https://godoc.org/github.com/cloudfoundry/noaa/consumer#Consumer.SetMinRetryDelay) +and the [SetMaxRetryDelay()](https://godoc.org/github.com/cloudfoundry/noaa/consumer#Consumer.SetMaxRetryDelay). + +During reconnection, noaa will wait initially at the `MinRetryDelay` interval +and double until it reaches `MaxRetryDelay` where it will try reconnecting +indefinitely at the `MaxRetryDelay` interval. + +This behavior will affect functions like `consumer.Firehose()`, `consumer.Stream()` +and `consumer.TailingLogs()`. + +## Sample Applications + +### Prerequisites + +In order to use the sample applications below, you will have to export the +following environment variables: + +* `CF_ACCESS_TOKEN` - You can get this value by executing (`$ cf oauth-token`). + Example: + +```bash +export CF_ACCESS_TOKEN="bearer eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI3YmM2MzllOC0wZGM0LTQ4YzItYTAzYS0xYjkyYzRhMWFlZTIiLCJzdWIiOiI5YTc5MTVkOS04MDc1LTQ3OTUtOTBmOS02MGM0MTU0YTJlMDkiLCJzY29wZSI6WyJzY2ltLnJlYWQiLCJjbG91ZF9jb250cm9sbGVyLmFkbWluIiwicGFzc3dvcmQud3JpdGUiLCJzY2ltLndyaXRlIiwib3BlbmlkIiwiY2xvdWRfY29udHJvbGxlci53cml0ZSIsImNsb3VkX2NvbnRyb2xsZXIucmVhZCJdLCJjbGllbnRfaWQiOiJjZiIsImNpZCI6ImNmIiwiZ3JhbnRfdHlwZSI6InBhc3N3b3JkIiwidXNlcl9pZCI6IjlhNzkxNWQ5LTgwNzUtNDc5NS05MGY5LTYwYzQxNTRhMmUwOSIsInVzZXJfbmFtZSI6ImFkbWluIiwiZW1haWwiOiJhZG1pbiIsImlhdCI6MTQwNDg0NzU3NywiZXhwIjoxNDA0ODQ4MTc3LCJpc3MiOiJodHRwczovL3VhYS4xMC4yNDQuMC4zNC54aXAuaW8vb2F1dGgvdG9rZW4iLCJhdWQiOlsic2NpbSIsIm9wZW5pZCIsImNsb3VkX2NvbnRyb2xsZXIiLCJwYXNzd29yZCJdfQ.mAaOJthCotW763lf9fysygqdES_Mz1KFQ3HneKbwY4VJx-ARuxxiLh8l_8Srx7NJBwGlyEtfYOCBcIdvyeDCiQ0wT78Zw7ZJYFjnJ5-ZkDy5NbMqHbImDFkHRnPzKFjJHip39jyjAZpkFcrZ8_pUD8XxZraqJ4zEf6LFdAHKFBM" +``` + +* `DOPPLER_ADDR` - It is based on your environment. Example: + +```bash +export DOPPLER_ADDR="wss://doppler.10.244.0.34.xip.io:4443" +``` + + +### Application logs + +The `samples/app_logs/main.go` application streams logs for a particular app. +The following environment variable needs to be set: + +* `APP_GUID` - You can get this value from running `$ cf app APP --guid`. + Example: + +``` +export APP_GUID=55fdb274-d6c9-4b8c-9b1f-9b7e7f3a346c +``` + +Then you can run the sample app like this: + +``` +go build -o bin/app_logs samples/app_logs/main.go +bin/app_logs +``` + +### Logs and metrics firehose + +The `samples/firehose/main.go` application streams metrics data and logs for +all apps. + +You can run the firehose sample app like this: + +``` +go build -o bin/firehose samples/firehose/main.go +bin/firehose +``` + +Multiple subscribers may connect to the firehose endpoint, each with a unique +subscription_id (configurable in `main.go`). Each subscriber (in practice, a +pool of clients with a common subscription_id) receives the entire stream. For +each subscription_id, all data will be distributed evenly among that +subscriber's client pool. + +### Container metrics + +The `samples/container_metrics/consumer/main.go` application streams container +metrics for the specified appId. + +You can run the container metrics sample app like this: + +``` +go build -o bin/container_metrics samples/container_metrics/consumer/main.go +bin/container_metrics +``` + +For more information to setup a test environment in order to pull container +metrics look at the README.md in the container_metrics sample. + +## Development + +Use `go get -d -v -t ./... && ginkgo --race --randomizeAllSpecs --failOnPending --skipMeasurements --cover` to +run the tests. + + +[slack-badge]: https://slack.cloudfoundry.org/badge.svg +[loggregator-slack]: https://cloudfoundry.slack.com/archives/loggregator diff --git a/vendor/github.com/cloudfoundry/noaa/consumer/async.go b/vendor/github.com/cloudfoundry/noaa/consumer/async.go new file mode 100644 index 00000000000..698f1209bc5 --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/consumer/async.go @@ -0,0 +1,512 @@ +package consumer + +import ( + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" + "sync" + "sync/atomic" + "time" + + noaa_errors "github.com/cloudfoundry/noaa/errors" + "github.com/cloudfoundry/sonde-go/events" + "github.com/gogo/protobuf/proto" + "github.com/gorilla/websocket" +) + +const ( + DefaultMinRetryDelay = 500 * time.Millisecond + DefaultMaxRetryDelay = time.Minute + DefaultMaxRetryCount = 1000 +) + +// SetMinRetryDelay sets the duration that automatically reconnecting methods +// on c (e.g. Firehose, Stream, TailingLogs) will sleep for after receiving +// an error from the traffic controller. +// +// Successive errors will double the sleep time, up to c's max retry delay, +// set by c.SetMaxRetryDelay. +// +// Defaults to DefaultMinRetryDelay. +func (c *Consumer) SetMinRetryDelay(d time.Duration) { + atomic.StoreInt64(&c.minRetryDelay, int64(d)) +} + +// SetMaxRetryDelay sets the maximum duration that automatically reconnecting +// methods on c (e.g. Firehose, Stream, TailingLogs) will sleep for after +// receiving many successive errors from the traffic controller. +// +// Defaults to DefaultMaxRetryDelay. +func (c *Consumer) SetMaxRetryDelay(d time.Duration) { + atomic.StoreInt64(&c.maxRetryDelay, int64(d)) +} + +// SetMaxRetryCount sets the maximum number of reconnnection attemps that +// methods on c (e.g. Firehose, Stream, TailingLogs) will make before failing. +// +// Defaults to DefaultMaxRetryCount. +func (c *Consumer) SetMaxRetryCount(count int) { + atomic.StoreInt64(&c.maxRetryCount, int64(count)) +} + +// TailingLogs listens indefinitely for log messages only; other event types +// are dropped. +// Whenever an error is encountered, the error will be sent down the error +// channel and TailingLogs will attempt to reconnect up to 5 times. After +// five failed reconnection attempts, TailingLogs will give up and close the +// error and LogMessage channels. +// +// If c is closed, the returned channels will both be closed. +// +// Errors must be drained from the returned error channel for it to continue +// retrying; if they are not drained, the connection attempts will hang. +func (c *Consumer) TailingLogs(appGuid, authToken string) (<-chan *events.LogMessage, <-chan error) { + return c.tailingLogs(appGuid, authToken, true) +} + +// TailingLogsWithoutReconnect functions identically to TailingLogs but without +// any reconnect attempts when errors occur. +func (c *Consumer) TailingLogsWithoutReconnect(appGuid string, authToken string) (<-chan *events.LogMessage, <-chan error) { + return c.tailingLogs(appGuid, authToken, false) +} + +// Stream listens indefinitely for all log and event messages. +// +// Messages are presented in the order received from the loggregator server. +// Chronological or other ordering is not guaranteed. It is the responsibility +// of the consumer of these channels to provide any desired sorting mechanism. +// +// Whenever an error is encountered, the error will be sent down the error +// channel and Stream will attempt to reconnect indefinitely. +func (c *Consumer) Stream(appGuid string, authToken string) (outputChan <-chan *events.Envelope, errorChan <-chan error) { + return c.runStream(appGuid, authToken, true) +} + +// StreamWithoutReconnect functions identically to Stream but without any +// reconnect attempts when errors occur. +func (c *Consumer) StreamWithoutReconnect(appGuid string, authToken string) (<-chan *events.Envelope, <-chan error) { + return c.runStream(appGuid, authToken, false) +} + +// Firehose streams all data. All clients with the same subscriptionId will +// receive a proportionate share of the message stream. Each pool of clients +// will receive the entire stream. +// +// Messages are presented in the order received from the loggregator server. +// Chronological or other ordering is not guaranteed. It is the responsibility +// of the consumer of these channels to provide any desired sorting mechanism. +// +// Whenever an error is encountered, the error will be sent down the error +// channel and Firehose will attempt to reconnect indefinitely. +func (c *Consumer) Firehose( + subscriptionId string, + authToken string, +) (<-chan *events.Envelope, <-chan error) { + return c.firehose(newFirehose( + subscriptionId, + authToken, + )) +} + +// FirehoseWithoutReconnect functions identically to Firehose but without any +// reconnect attempts when errors occur. +func (c *Consumer) FirehoseWithoutReconnect( + subscriptionId string, + authToken string, +) (<-chan *events.Envelope, <-chan error) { + return c.firehose(newFirehose( + subscriptionId, + authToken, + WithRetry(false), + )) +} + +// FilteredFirehose streams a filtered set of envelopes. It has functionality +// similar to Firehose. +func (c *Consumer) FilteredFirehose( + subscriptionId string, + authToken string, + filter EnvelopeFilter, +) (<-chan *events.Envelope, <-chan error) { + return c.firehose(newFirehose( + subscriptionId, + authToken, + WithEnvelopeFilter(filter), + )) +} + +// SetDebugPrinter sets the websocket connection to write debug information to +// debugPrinter. +func (c *Consumer) SetDebugPrinter(debugPrinter DebugPrinter) { + c.debugPrinter = debugPrinter +} + +// SetOnConnectCallback sets a callback function to be called with the +// websocket connection is established. +func (c *Consumer) SetOnConnectCallback(cb func()) { + c.callbackLock.Lock() + defer c.callbackLock.Unlock() + c.callback = cb +} + +// Close terminates all previously opened websocket connections to the traffic +// controller. It will return an error if there are no open connections, or +// if it has problems closing any connection. +func (c *Consumer) Close() error { + c.connsLock.Lock() + defer c.connsLock.Unlock() + if len(c.conns) == 0 { + return errors.New("connection does not exist") + } + for len(c.conns) > 0 { + if err := c.conns[0].close(); err != nil { + return err + } + c.conns = c.conns[1:] + } + return nil +} + +func (c *Consumer) SetIdleTimeout(idleTimeout time.Duration) { + c.idleTimeout = idleTimeout +} + +func (c *Consumer) onConnectCallback() func() { + c.callbackLock.RLock() + defer c.callbackLock.RUnlock() + return c.callback +} + +func (c *Consumer) tailingLogs(appGuid, authToken string, retry bool) (<-chan *events.LogMessage, <-chan error) { + outputs := make(chan *events.LogMessage) + errors := make(chan error, 1) + callback := func(env *events.Envelope) { + if env.GetEventType() == events.Envelope_LogMessage { + outputs <- env.GetLogMessage() + } + } + + conn := c.newConn() + go func() { + defer close(errors) + defer close(outputs) + c.streamAppDataTo(conn, appGuid, authToken, callback, errors, retry) + }() + return outputs, errors +} + +func (c *Consumer) runStream(appGuid, authToken string, retry bool) (<-chan *events.Envelope, <-chan error) { + outputs := make(chan *events.Envelope) + errors := make(chan error, 1) + + callback := func(env *events.Envelope) { + outputs <- env + } + + conn := c.newConn() + go func() { + defer close(errors) + defer close(outputs) + c.streamAppDataTo(conn, appGuid, authToken, callback, errors, retry) + }() + return outputs, errors +} + +func (c *Consumer) streamAppDataTo(conn *connection, appGuid, authToken string, callback func(*events.Envelope), errors chan<- error, retry bool) { + streamPath := fmt.Sprintf("/apps/%s/stream", appGuid) + if retry { + c.retryAction(c.listenAction(conn, streamPath, authToken, callback), errors) + return + } + err, _ := c.listenAction(conn, streamPath, authToken, callback)() + errors <- err +} + +func (c *Consumer) firehose(options *firehose) (<-chan *events.Envelope, <-chan error) { + outputs := make(chan *events.Envelope) + errors := make(chan error, 1) + callback := func(env *events.Envelope) { + outputs <- env + } + + conn := c.newConn() + go func() { + defer close(errors) + defer close(outputs) + if options.retry { + c.retryAction(c.listenAction(conn, options.streamPath(), options.authToken, callback), errors) + return + } + err, _ := c.listenAction(conn, options.streamPath(), options.authToken, callback)() + errors <- err + }() + return outputs, errors +} + +func (c *Consumer) listenForMessages(conn *connection, callback func(*events.Envelope)) error { + if conn.closed() { + return nil + } + ws := conn.websocket() + for { + if c.idleTimeout != 0 { + ws.SetReadDeadline(time.Now().Add(c.idleTimeout)) + } + _, data, err := ws.ReadMessage() + + // If the connection was closed (i.e. if conn.Close() was called), we + // will have a non-nil error, but we want to return a nil error. + if conn.closed() { + return nil + } + + if c.isTimeoutErr(err) { + return noaa_errors.NewRetryError(err) + } + + if err != nil { + return err + } + + envelope := &events.Envelope{} + err = proto.Unmarshal(data, envelope) + if err != nil { + continue + } + + callback(envelope) + } +} + +func (c *Consumer) listenAction(conn *connection, streamPath, authToken string, callback func(*events.Envelope)) func() (err error, done bool) { + return func() (error, bool) { + if conn.closed() { + return nil, true + } + ws, err := c.establishWebsocketConnection(streamPath, authToken) + if err != nil { + return err, false + } + conn.setWebsocket(ws) + return c.listenForMessages(conn, callback), false + } +} + +func (c *Consumer) retryAction(action func() (err error, done bool), errors chan<- error) { + oldConnectCallback := c.onConnectCallback() + defer c.SetOnConnectCallback(oldConnectCallback) + + context := retryContext{ + sleep: atomic.LoadInt64(&c.minRetryDelay), + count: 0, + } + + c.SetOnConnectCallback(func() { + atomic.StoreInt64(&context.sleep, atomic.LoadInt64(&c.minRetryDelay)) + atomic.StoreInt64(&context.count, 0) + if oldConnectCallback != nil { + oldConnectCallback() + } + }) + + for { + err, done := action() + if done { + return + } + + if _, ok := err.(noaa_errors.NonRetryError); ok { + c.debugPrinter.Print("WEBSOCKET ERROR", err.Error()) + errors <- err + return + } + + retryCount := atomic.LoadInt64(&context.count) + maxRetryCount := atomic.LoadInt64(&c.maxRetryCount) + if retryCount >= maxRetryCount { + c.debugPrinter.Print("WEBSOCKET ERROR", fmt.Sprintf("Maximum number of retries %d reached", maxRetryCount)) + errors <- ErrMaxRetriesReached + return + } + atomic.StoreInt64(&context.count, retryCount+1) + + if err != nil { + c.debugPrinter.Print("WEBSOCKET ERROR", fmt.Sprintf("%s. Retrying...", err.Error())) + err = noaa_errors.NewRetryError(err) + } + + errors <- err + + ns := atomic.LoadInt64(&context.sleep) + time.Sleep(time.Duration(ns)) + ns = atomic.AddInt64(&context.sleep, ns) + max := atomic.LoadInt64(&c.maxRetryDelay) + if ns > max { + atomic.StoreInt64(&context.sleep, max) + } + } +} + +func (c *Consumer) isTimeoutErr(err error) bool { + if err == nil { + return false + } + + // This is an unfortunate way to validate this, + // however the error type is `*websocket.netError` + // which is not exported + return strings.Contains(err.Error(), "i/o timeout") +} + +func (c *Consumer) newConn() *connection { + conn := &connection{} + c.connsLock.Lock() + defer c.connsLock.Unlock() + c.conns = append(c.conns, conn) + return conn +} + +func (c *Consumer) websocketConn(path, authToken string) (*websocket.Conn, error) { + if authToken == "" && c.refreshTokens { + return c.websocketConnNewToken(path) + } + + URL, err := url.Parse(c.trafficControllerUrl + path) + if err != nil { + return nil, noaa_errors.NewNonRetryError(err) + } + + if URL.Scheme != "wss" && URL.Scheme != "ws" { + return nil, noaa_errors.NewNonRetryError(fmt.Errorf("Invalid scheme '%s'", URL.Scheme)) + } + + ws, httpErr := c.tryWebsocketConnection(path, authToken) + if httpErr != nil { + err = httpErr.error + if httpErr.statusCode == http.StatusUnauthorized && c.refreshTokens { + ws, err = c.websocketConnNewToken(path) + } + } + return ws, err +} + +func (c *Consumer) websocketConnNewToken(path string) (*websocket.Conn, error) { + token, err := c.getToken() + if err != nil { + return nil, err + } + ws, httpErr := c.tryWebsocketConnection(path, token) + if httpErr != nil { + return nil, httpErr.error + } + return ws, nil +} + +func (c *Consumer) establishWebsocketConnection(path, authToken string) (*websocket.Conn, error) { + ws, err := c.websocketConn(path, authToken) + if err != nil { + return nil, err + } + + callback := c.onConnectCallback() + if err == nil && callback != nil { + callback() + } + + return ws, nil +} + +func (c *Consumer) tryWebsocketConnection(path, token string) (*websocket.Conn, *httpError) { + header := http.Header{"Origin": []string{c.trafficControllerUrl}, "Authorization": []string{token}} + url := c.trafficControllerUrl + path + + c.debugPrinter.Print("WEBSOCKET REQUEST", + "GET "+path+" HTTP/1.1\n"+ + "Host: "+c.trafficControllerUrl+"\n"+ + "Upgrade: websocket\nConnection: Upgrade\nSec-WebSocket-Version: 13\nSec-WebSocket-Key: [HIDDEN]\n"+ + headersString(header)) + + ws, resp, err := c.dialer.Dial(url, header) + if resp != nil { + c.debugPrinter.Print("WEBSOCKET RESPONSE", + resp.Proto+" "+resp.Status+"\n"+ + headersString(resp.Header)) + } + + httpErr := &httpError{} + if resp != nil { + if resp.StatusCode == http.StatusUnauthorized { + bodyData, _ := ioutil.ReadAll(resp.Body) + err = noaa_errors.NewUnauthorizedError(string(bodyData)) + } + httpErr.statusCode = resp.StatusCode + } + if err != nil { + errMsg := "Error dialing trafficcontroller server: %s.\n" + + "Please ask your Cloud Foundry Operator to check the platform configuration (trafficcontroller is %s)." + httpErr.error = fmt.Errorf(errMsg, err.Error(), c.trafficControllerUrl) + return nil, httpErr + } + return ws, nil +} + +func headersString(header http.Header) string { + var result string + for name, values := range header { + result += name + ": " + strings.Join(values, ", ") + "\n" + } + return result +} + +type connection struct { + ws *websocket.Conn + isClosed bool + lock sync.Mutex +} + +func (c *connection) websocket() *websocket.Conn { + c.lock.Lock() + defer c.lock.Unlock() + return c.ws +} + +func (c *connection) setWebsocket(ws *websocket.Conn) { + c.lock.Lock() + defer c.lock.Unlock() + if c.isClosed { + return + } + c.ws = ws +} + +func (c *connection) close() error { + c.lock.Lock() + defer c.lock.Unlock() + c.isClosed = true + if c.ws == nil { + return nil + } + err := c.ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Time{}) + if err != nil { + return err + } + return c.ws.Close() +} + +func (c *connection) closed() bool { + c.lock.Lock() + defer c.lock.Unlock() + return c.isClosed +} + +// retryContext is a struct to keep track of a retryAction call's context. We +// use it primarily to guarantee 64-bit byte alignment on 32-bit systems. +// https://golang.org/src/sync/atomic/doc.go?#L50 +type retryContext struct { + // sleep and count must be the first words within this struct to ensure + // 64-bit byte alignment. + sleep, count int64 +} diff --git a/vendor/github.com/cloudfoundry/noaa/consumer/consumer.go b/vendor/github.com/cloudfoundry/noaa/consumer/consumer.go new file mode 100644 index 00000000000..e6b2079f96d --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/consumer/consumer.go @@ -0,0 +1,126 @@ +package consumer + +import ( + "crypto/tls" + "errors" + "io/ioutil" + "net/http" + "net/url" + "regexp" + "sync" + "time" + + "github.com/cloudfoundry/noaa/consumer/internal" + + noaa_errors "github.com/cloudfoundry/noaa/errors" + "github.com/gorilla/websocket" +) + +var ( + // KeepAlive sets the interval between keep-alive messages sent by the client to loggregator. + KeepAlive = 25 * time.Second + + boundaryRegexp = regexp.MustCompile("boundary=(.*)") + ErrNotOK = errors.New("unknown issue when making HTTP request to Loggregator") + ErrNotFound = ErrNotOK // NotFound isn't an accurate description of how this is used; please use ErrNotOK instead + ErrBadResponse = errors.New("bad server response") + ErrBadRequest = errors.New("bad client request") + ErrLostConnection = errors.New("remote server terminated connection unexpectedly") + ErrMaxRetriesReached = errors.New("maximum number of connection retries reached") +) + +//go:generate hel --type DebugPrinter --output mock_debug_printer_test.go + +// DebugPrinter is a type which handles printing debug information. +type DebugPrinter interface { + Print(title, dump string) +} + +type nullDebugPrinter struct { +} + +func (nullDebugPrinter) Print(title, body string) { +} + +// Consumer represents the actions that can be performed against trafficcontroller. +// See sync.go and async.go for trafficcontroller access methods. +type Consumer struct { + // minRetryDelay, maxRetryDelay, and maxRetryCount must be the first words in + // this struct in order to be used atomically by 32-bit systems. + // https://golang.org/src/sync/atomic/doc.go?#L50 + minRetryDelay, maxRetryDelay, maxRetryCount int64 + + trafficControllerUrl string + idleTimeout time.Duration + callback func() + callbackLock sync.RWMutex + debugPrinter DebugPrinter + client *http.Client + dialer websocket.Dialer + + conns []*connection + connsLock sync.Mutex + + refreshTokens bool + refresherMutex sync.RWMutex + tokenRefresher TokenRefresher +} + +// New creates a new consumer to a trafficcontroller. +func New(trafficControllerUrl string, tlsConfig *tls.Config, proxy func(*http.Request) (*url.URL, error)) *Consumer { + if proxy == nil { + proxy = http.ProxyFromEnvironment + } + + return &Consumer{ + trafficControllerUrl: trafficControllerUrl, + debugPrinter: nullDebugPrinter{}, + client: &http.Client{ + Transport: &http.Transport{ + Proxy: proxy, + TLSClientConfig: tlsConfig, + TLSHandshakeTimeout: internal.Timeout, + DisableKeepAlives: true, + }, + Timeout: internal.Timeout, + }, + minRetryDelay: int64(DefaultMinRetryDelay), + maxRetryDelay: int64(DefaultMaxRetryDelay), + maxRetryCount: int64(DefaultMaxRetryCount), + dialer: websocket.Dialer{ + HandshakeTimeout: internal.Timeout, + Proxy: proxy, + TLSClientConfig: tlsConfig, + }, + } +} + +type httpError struct { + statusCode int + error error +} + +func checkForErrors(resp *http.Response) *httpError { + if resp.StatusCode == http.StatusUnauthorized { + data, _ := ioutil.ReadAll(resp.Body) + return &httpError{ + statusCode: resp.StatusCode, + error: noaa_errors.NewUnauthorizedError(string(data)), + } + } + + if resp.StatusCode == http.StatusBadRequest { + return &httpError{ + statusCode: resp.StatusCode, + error: ErrBadRequest, + } + } + + if resp.StatusCode != http.StatusOK { + return &httpError{ + statusCode: resp.StatusCode, + error: ErrNotOK, + } + } + return nil +} diff --git a/vendor/github.com/cloudfoundry/noaa/consumer/filter.go b/vendor/github.com/cloudfoundry/noaa/consumer/filter.go new file mode 100644 index 00000000000..8a247298a73 --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/consumer/filter.go @@ -0,0 +1,20 @@ +package consumer + +type EnvelopeFilter int + +const ( + LogMessages EnvelopeFilter = iota + Metrics + allEnvelopes +) + +func (f EnvelopeFilter) queryStringParam() string { + switch f { + case LogMessages: + return "filter-type=logs" + case Metrics: + return "filter-type=metrics" + default: + return "" + } +} diff --git a/vendor/github.com/cloudfoundry/noaa/consumer/firehose.go b/vendor/github.com/cloudfoundry/noaa/consumer/firehose.go new file mode 100644 index 00000000000..57e93509582 --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/consumer/firehose.go @@ -0,0 +1,44 @@ +package consumer + +type firehose struct { + subscriptionID string + authToken string + retry bool + envelopeFilter EnvelopeFilter +} + +type FirehoseOption func(*firehose) + +func WithRetry(retry bool) FirehoseOption { + return func(f *firehose) { + f.retry = retry + } +} +func WithEnvelopeFilter(filter EnvelopeFilter) FirehoseOption { + return func(f *firehose) { + f.envelopeFilter = filter + } +} + +func newFirehose( + subID string, + authToken string, + opts ...FirehoseOption, +) *firehose { + f := &firehose{ + subscriptionID: subID, + authToken: authToken, + retry: true, + envelopeFilter: allEnvelopes, + } + + for _, o := range opts { + o(f) + } + + return f +} + +func (f *firehose) streamPath() string { + return "/firehose/" + f.subscriptionID + "?" + f.envelopeFilter.queryStringParam() +} diff --git a/vendor/github.com/cloudfoundry/noaa/consumer/internal/timeout.go b/vendor/github.com/cloudfoundry/noaa/consumer/internal/timeout.go new file mode 100644 index 00000000000..c8a56b598ca --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/consumer/internal/timeout.go @@ -0,0 +1,8 @@ +package internal + +import "time" + +var ( + // DO NOT USE + Timeout = 10 * time.Second +) diff --git a/vendor/github.com/cloudfoundry/noaa/consumer/sync.go b/vendor/github.com/cloudfoundry/noaa/consumer/sync.go new file mode 100644 index 00000000000..7665adaa4bc --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/consumer/sync.go @@ -0,0 +1,171 @@ +package consumer + +import ( + "bytes" + "errors" + "fmt" + "mime/multipart" + "net/http" + "net/url" + "strings" + + "github.com/cloudfoundry/noaa" + "github.com/cloudfoundry/sonde-go/events" + "github.com/gogo/protobuf/proto" +) + +// RecentLogs connects to trafficcontroller via its 'recentlogs' http(s) +// endpoint and returns a slice of recent messages. It does not guarantee any +// order of the messages; they are in the order returned by trafficcontroller. +// +// The noaa.SortRecent function is provided to sort the data returned by +// this method. +func (c *Consumer) RecentLogs(appGuid string, authToken string) ([]*events.LogMessage, error) { + envelopes, err := c.readTC(appGuid, authToken, "recentlogs") + if err != nil { + return nil, err + } + messages := make([]*events.LogMessage, 0, 200) + for _, env := range envelopes { + messages = append(messages, env.GetLogMessage()) + } + return messages, nil +} + +// ContainerMetrics is deprecated in favor of ContainerEnvelopes, since +// returning the ContainerMetric type directly hides important +// information, like the timestamp. +// +// The returned values will be the same as ContainerEnvelopes, just with +// the Envelope stripped out. +func (c *Consumer) ContainerMetrics(appGuid string, authToken string) ([]*events.ContainerMetric, error) { + envelopes, err := c.ContainerEnvelopes(appGuid, authToken) + if err != nil { + return nil, err + } + messages := make([]*events.ContainerMetric, 0, len(envelopes)) + for _, env := range envelopes { + messages = append(messages, env.GetContainerMetric()) + } + noaa.SortContainerMetrics(messages) + return messages, nil +} + +// ContainerEnvelopes connects to trafficcontroller via its 'containermetrics' +// http(s) endpoint and returns the most recent dropsonde envelopes for an app. +func (c *Consumer) ContainerEnvelopes(appGuid, authToken string) ([]*events.Envelope, error) { + envelopes, err := c.readTC(appGuid, authToken, "containermetrics") + if err != nil { + return nil, err + } + for _, env := range envelopes { + if env.GetEventType() == events.Envelope_LogMessage { + return nil, errors.New(fmt.Sprintf("Upstream error: %s", env.GetLogMessage().GetMessage())) + } + } + return envelopes, nil +} + +func (c *Consumer) readTC(appGuid string, authToken string, endpoint string) ([]*events.Envelope, error) { + trafficControllerUrl, err := url.ParseRequestURI(c.trafficControllerUrl) + if err != nil { + return nil, err + } + + scheme := "https" + if trafficControllerUrl.Scheme == "ws" { + scheme = "http" + } + + recentPath := fmt.Sprintf("%s://%s/apps/%s/%s", scheme, trafficControllerUrl.Host, appGuid, endpoint) + + resp, err := c.requestTC(recentPath, authToken) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + reader, err := getMultipartReader(resp) + if err != nil { + return nil, err + } + + var buffer bytes.Buffer + + var envelopes []*events.Envelope + for part, loopErr := reader.NextPart(); loopErr == nil; part, loopErr = reader.NextPart() { + buffer.Reset() + + _, err = buffer.ReadFrom(part) + if err != nil { + break + } + + envelope := new(events.Envelope) + proto.Unmarshal(buffer.Bytes(), envelope) + + envelopes = append(envelopes, envelope) + } + + return envelopes, nil +} + +func (c *Consumer) requestTC(path, authToken string) (*http.Response, error) { + if authToken == "" && c.refreshTokens { + return c.requestTCNewToken(path) + } + var err error + resp, httpErr := c.tryTCConnection(path, authToken) + if httpErr != nil { + err = httpErr.error + if httpErr.statusCode == http.StatusUnauthorized && c.refreshTokens { + resp, err = c.requestTCNewToken(path) + } + } + return resp, err +} + +func (c *Consumer) requestTCNewToken(path string) (*http.Response, error) { + token, err := c.getToken() + if err != nil { + return nil, err + } + conn, httpErr := c.tryTCConnection(path, token) + if httpErr != nil { + return nil, httpErr.error + } + return conn, nil +} + +func (c *Consumer) tryTCConnection(recentPath, token string) (*http.Response, *httpError) { + req, _ := http.NewRequest("GET", recentPath, nil) + req.Header.Set("Authorization", token) + + resp, err := c.client.Do(req) + if err != nil { + message := `Error dialing trafficcontroller server: %s. +Please ask your Cloud Foundry Operator to check the platform configuration (trafficcontroller endpoint is %s).` + return nil, &httpError{ + statusCode: -1, + error: errors.New(fmt.Sprintf(message, err, c.trafficControllerUrl)), + } + } + + return resp, checkForErrors(resp) +} + +func getMultipartReader(resp *http.Response) (*multipart.Reader, error) { + contentType := resp.Header.Get("Content-Type") + + if len(strings.TrimSpace(contentType)) == 0 { + return nil, ErrBadResponse + } + + matches := boundaryRegexp.FindStringSubmatch(contentType) + + if len(matches) != 2 || len(strings.TrimSpace(matches[1])) == 0 { + return nil, ErrBadResponse + } + reader := multipart.NewReader(resp.Body, matches[1]) + return reader, nil +} diff --git a/vendor/github.com/cloudfoundry/noaa/consumer/token_refresher.go b/vendor/github.com/cloudfoundry/noaa/consumer/token_refresher.go new file mode 100644 index 00000000000..b8ebef726b6 --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/consumer/token_refresher.go @@ -0,0 +1,22 @@ +package consumer + +//go:generate hel --type TokenRefresher --output mock_token_refresher_test.go + +type TokenRefresher interface { + RefreshAuthToken() (token string, authError error) +} + +func (c *Consumer) RefreshTokenFrom(tr TokenRefresher) { + c.refresherMutex.Lock() + defer c.refresherMutex.Unlock() + + c.refreshTokens = true + c.tokenRefresher = tr +} + +func (c *Consumer) getToken() (string, error) { + c.refresherMutex.RLock() + defer c.refresherMutex.RUnlock() + + return c.tokenRefresher.RefreshAuthToken() +} diff --git a/vendor/github.com/cloudfoundry/noaa/errors/error_codes.go b/vendor/github.com/cloudfoundry/noaa/errors/error_codes.go new file mode 100644 index 00000000000..24c6337eb2a --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/errors/error_codes.go @@ -0,0 +1,4 @@ +package errors + +const ERR_LOST_CONNECTION = int32(1) +const ERR_DIAL = int32(2) diff --git a/vendor/github.com/cloudfoundry/noaa/errors/non_retry_error.go b/vendor/github.com/cloudfoundry/noaa/errors/non_retry_error.go new file mode 100644 index 00000000000..c4473ca52fe --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/errors/non_retry_error.go @@ -0,0 +1,22 @@ +package errors + +import "fmt" + +// NonRetryError is a type that noaa uses when it encountered an error, +// and is not going to retry the operation. When errors of this type +// are encountered, they should result in a closed connection. +type NonRetryError struct { + Err error +} + +// NewNonRetryError constructs a NonRetryError from any error. +func NewNonRetryError(err error) NonRetryError { + return NonRetryError{ + Err: err, + } +} + +// Error implements error. +func (e NonRetryError) Error() string { + return fmt.Sprintf("Please ask your Cloud Foundry Operator to check the platform configuration: %s", e.Err.Error()) +} diff --git a/vendor/github.com/cloudfoundry/noaa/errors/retry_error.go b/vendor/github.com/cloudfoundry/noaa/errors/retry_error.go new file mode 100644 index 00000000000..46ef3916c4f --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/errors/retry_error.go @@ -0,0 +1,20 @@ +package errors + +// RetryError is a type that noaa uses when it encountered an error, +// but is going to retry the operation. When errors of this type +// are encountered, they should not result in a closed connection. +type RetryError struct { + Err error +} + +// NewRetryError constructs a RetryError from any error. +func NewRetryError(err error) RetryError { + return RetryError{ + Err: err, + } +} + +// Error implements error. +func (e RetryError) Error() string { + return e.Err.Error() +} diff --git a/vendor/github.com/cloudfoundry/noaa/errors/unauthorized_error.go b/vendor/github.com/cloudfoundry/noaa/errors/unauthorized_error.go new file mode 100644 index 00000000000..3f8a2e08d0b --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/errors/unauthorized_error.go @@ -0,0 +1,13 @@ +package errors + +type UnauthorizedError struct { + description string +} + +func NewUnauthorizedError(description string) error { + return &UnauthorizedError{description: description} +} + +func (err *UnauthorizedError) Error() string { + return "Unauthorized error: " + err.description +} diff --git a/vendor/github.com/cloudfoundry/noaa/sort_container_metrics.go b/vendor/github.com/cloudfoundry/noaa/sort_container_metrics.go new file mode 100644 index 00000000000..25e76b36568 --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/sort_container_metrics.go @@ -0,0 +1,29 @@ +package noaa + +import ( + "sort" + + "github.com/cloudfoundry/sonde-go/events" +) + +// SortContainerMetrics sorts a slice of containerMetrics by InstanceIndex. +// +// The input slice is sorted; the return value is simply a pointer to the same slice. +func SortContainerMetrics(messages []*events.ContainerMetric) []*events.ContainerMetric { + sort.Sort(containerMetricSlice(messages)) + return messages +} + +type containerMetricSlice []*events.ContainerMetric + +func (lms containerMetricSlice) Len() int { + return len(lms) +} + +func (lms containerMetricSlice) Less(i, j int) bool { + return (*(lms[i])).GetInstanceIndex() < (*(lms[j])).GetInstanceIndex() +} + +func (lms containerMetricSlice) Swap(i, j int) { + lms[i], lms[j] = lms[j], lms[i] +} diff --git a/vendor/github.com/cloudfoundry/noaa/sort_recent.go b/vendor/github.com/cloudfoundry/noaa/sort_recent.go new file mode 100644 index 00000000000..f66097488c5 --- /dev/null +++ b/vendor/github.com/cloudfoundry/noaa/sort_recent.go @@ -0,0 +1,30 @@ +package noaa + +import ( + "sort" + + "github.com/cloudfoundry/sonde-go/events" +) + +// SortRecent sorts a slice of LogMessages by timestamp. The sort is stable, so messages with the same timestamp are sorted +// in the order that they are received. +// +// The input slice is sorted; the return value is simply a pointer to the same slice. +func SortRecent(messages []*events.LogMessage) []*events.LogMessage { + sort.Stable(logMessageSlice(messages)) + return messages +} + +type logMessageSlice []*events.LogMessage + +func (lms logMessageSlice) Len() int { + return len(lms) +} + +func (lms logMessageSlice) Less(i, j int) bool { + return *(lms[i]).Timestamp < *(lms[j]).Timestamp +} + +func (lms logMessageSlice) Swap(i, j int) { + lms[i], lms[j] = lms[j], lms[i] +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 86eee7cb174..758eb151cf1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -295,6 +295,11 @@ github.com/cavaliercoder/go-rpm/version github.com/cespare/xxhash/v2 # github.com/cloudfoundry-community/go-cfclient v0.0.0-20190808214049-35bcce23fc5f github.com/cloudfoundry-community/go-cfclient +# github.com/cloudfoundry/noaa v2.1.0+incompatible +github.com/cloudfoundry/noaa +github.com/cloudfoundry/noaa/consumer +github.com/cloudfoundry/noaa/consumer/internal +github.com/cloudfoundry/noaa/errors # github.com/cloudfoundry/sonde-go v0.0.0-20171206171820-b33733203bb4 github.com/cloudfoundry/sonde-go/events # github.com/containerd/containerd v1.3.3 diff --git a/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc index c4428adb44c..f8d5dd51015 100644 --- a/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc @@ -64,7 +64,7 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr [float] ==== `rlp_address` -The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)". +The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)". [float] ==== `client_id` @@ -79,8 +79,13 @@ Client Secret to authenticate with Cloud Foundry. Default: "". [float] ==== `shard_id` -Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events -from the RLP Gateway. Default: "(generated UUID)". +Shard ID for the connection with Cloud Foundry. Use the same ID across multiple {beatname_lc} to shard the load of events. Default: "(generated UUID)". + +[float] +==== `version` + +Consumer API version to connect with Cloud Foundry to collect events. Use `v1` to collect events using Doppler/Traffic Control. +Use `v2` to collect events from the RLP Gateway. Default: "`v1`". [float] ==== `ssl` diff --git a/x-pack/filebeat/input/cloudfoundry/input.go b/x-pack/filebeat/input/cloudfoundry/input.go index 65755d1f2c1..82bf9d66807 100644 --- a/x-pack/filebeat/input/cloudfoundry/input.go +++ b/x-pack/filebeat/input/cloudfoundry/input.go @@ -5,17 +5,16 @@ package cloudfoundry import ( - "context" - "sync" - - "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" + "fmt" "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/harvester" "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" ) func init() { @@ -25,21 +24,14 @@ func init() { } } -// Input defines a udp input to receive event on a specific host:port. -type Input struct { - sync.Mutex - listener *cloudfoundry.RlpListener - started bool - log *logp.Logger - outlet channel.Outleter -} - // NewInput creates a new udp input func NewInput( cfg *common.Config, outlet channel.Connector, context input.Context, ) (input.Input, error) { + cfgwarn.Beta("The cloudfoundry input is beta") + log := logp.NewLogger("cloudfoundry") out, err := outlet.Connect(cfg) @@ -52,65 +44,12 @@ func NewInput( return nil, err } - hub := cloudfoundry.NewHub(&conf, "filebeat", log) - forwarder := harvester.NewForwarder(out) - callbacks := cloudfoundry.RlpListenerCallbacks{ - HttpAccess: func(evt *cloudfoundry.EventHttpAccess) { - forwarder.Send(beat.Event{ - Timestamp: evt.Timestamp(), - Fields: evt.ToFields(), - }) - }, - Log: func(evt *cloudfoundry.EventLog) { - forwarder.Send(beat.Event{ - Timestamp: evt.Timestamp(), - Fields: evt.ToFields(), - }) - }, - Error: func(evt *cloudfoundry.EventError) { - forwarder.Send(beat.Event{ - Timestamp: evt.Timestamp(), - Fields: evt.ToFields(), - }) - }, - } - - listener, err := hub.RlpListener(callbacks) - if err != nil { - return nil, err - } - return &Input{ - outlet: out, - listener: listener, - started: false, - log: log, - }, nil -} - -// Run starts and start the UDP server and read events from the socket -func (p *Input) Run() { - p.Lock() - defer p.Unlock() - - if !p.started { - p.log.Info("starting cloudfoundry input") - p.listener.Start(context.TODO()) - p.started = true + switch conf.Version { + case cloudfoundry.ConsumerVersionV1: + return newInputV1(log, conf, out, context) + case cloudfoundry.ConsumerVersionV2: + return newInputV2(log, conf, out, context) + default: + return nil, fmt.Errorf("not supported consumer version: %s", conf.Version) } } - -// Stop stops the UDP input -func (p *Input) Stop() { - defer p.outlet.Close() - p.Lock() - defer p.Unlock() - - p.log.Info("stopping cloudfoundry input") - p.listener.Stop() - p.started = false -} - -// Wait suspends the UDP input -func (p *Input) Wait() { - p.Stop() -} diff --git a/x-pack/filebeat/input/cloudfoundry/input_integration_test.go b/x-pack/filebeat/input/cloudfoundry/input_integration_test.go index ac31a07132b..736acf778fd 100644 --- a/x-pack/filebeat/input/cloudfoundry/input_integration_test.go +++ b/x-pack/filebeat/input/cloudfoundry/input_integration_test.go @@ -8,6 +8,9 @@ package cloudfoundry import ( + "context" + "crypto/tls" + "net/http" "testing" "time" @@ -17,11 +20,34 @@ import ( "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" cftest "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry/test" ) func TestInput(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + t.Run("v1", func(t *testing.T) { + testInput(t, "v1") + }) + + t.Run("v2", func(t *testing.T) { + testInput(t, "v2") + }) +} + +func testInput(t *testing.T, version string) { config := common.MustNewConfigFrom(cftest.GetConfigFromEnv(t)) + config.SetString("version", -1, version) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + apiAddress, err := config.String("api_address", -1) + require.NoError(t, err) + + // Ensure that there is something happening in the firehose + go makeApiRequests(t, ctx, apiAddress) events := make(chan beat.Event) connector := channel.ConnectorFunc(func(*common.Config, beat.ClientConfig) (channel.Outleter, error) { @@ -73,3 +99,27 @@ func (o *outleter) OnEvent(e beat.Event) bool { return false } } + +func makeApiRequests(t *testing.T, ctx context.Context, address string) { + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, + } + + for { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, address, nil) + require.NoError(t, err) + resp, err := client.Do(req) + require.NoError(t, err) + resp.Body.Close() + + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): + return + } + } +} diff --git a/x-pack/filebeat/input/cloudfoundry/v1.go b/x-pack/filebeat/input/cloudfoundry/v1.go new file mode 100644 index 00000000000..c1248545369 --- /dev/null +++ b/x-pack/filebeat/input/cloudfoundry/v1.go @@ -0,0 +1,87 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "sync" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/filebeat/channel" + "github.com/elastic/beats/v7/filebeat/harvester" + "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" +) + +// InputV1 defines a udp input to receive event on a specific host:port. +type InputV1 struct { + sync.Mutex + consumer *cloudfoundry.DopplerConsumer + started bool + log *logp.Logger + outlet channel.Outleter +} + +func newInputV1(log *logp.Logger, conf cloudfoundry.Config, out channel.Outleter, context input.Context) (*InputV1, error) { + hub := cloudfoundry.NewHub(&conf, "filebeat", log) + forwarder := harvester.NewForwarder(out) + + callbacks := cloudfoundry.DopplerCallbacks{ + Log: func(evt cloudfoundry.Event) { + forwarder.Send(beat.Event{ + Timestamp: evt.Timestamp(), + Fields: evt.ToFields(), + }) + }, + Error: func(evt cloudfoundry.EventError) { + forwarder.Send(beat.Event{ + Timestamp: evt.Timestamp(), + Fields: evt.ToFields(), + }) + }, + } + + consumer, err := hub.DopplerConsumer(callbacks) + if err != nil { + return nil, errors.Wrapf(err, "initializing doppler consumer") + } + return &InputV1{ + outlet: out, + consumer: consumer, + started: false, + log: log, + }, nil +} + +// Run starts the consumer of cloudfoundry events +func (p *InputV1) Run() { + p.Lock() + defer p.Unlock() + + if !p.started { + p.log.Info("starting cloudfoundry input") + p.consumer.Run() + p.started = true + } +} + +// Stop stops cloudfoundry doppler consumer +func (p *InputV1) Stop() { + defer p.outlet.Close() + p.Lock() + defer p.Unlock() + + p.log.Info("stopping cloudfoundry input") + p.consumer.Stop() + p.started = false +} + +// Wait waits for the input to finalize, and stops it +func (p *InputV1) Wait() { + p.Stop() + p.consumer.Wait() +} diff --git a/x-pack/filebeat/input/cloudfoundry/v2.go b/x-pack/filebeat/input/cloudfoundry/v2.go new file mode 100644 index 00000000000..0d21d361f9c --- /dev/null +++ b/x-pack/filebeat/input/cloudfoundry/v2.go @@ -0,0 +1,90 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "context" + "sync" + + "github.com/elastic/beats/v7/filebeat/channel" + "github.com/elastic/beats/v7/filebeat/harvester" + "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" +) + +// InputV2 defines a Cloudfoundry input that uses the consumer V2 API +type InputV2 struct { + sync.Mutex + listener *cloudfoundry.RlpListener + started bool + log *logp.Logger + outlet channel.Outleter +} + +func newInputV2(log *logp.Logger, conf cloudfoundry.Config, out channel.Outleter, context input.Context) (*InputV2, error) { + hub := cloudfoundry.NewHub(&conf, "filebeat", log) + forwarder := harvester.NewForwarder(out) + callbacks := cloudfoundry.RlpListenerCallbacks{ + HttpAccess: func(evt *cloudfoundry.EventHttpAccess) { + forwarder.Send(beat.Event{ + Timestamp: evt.Timestamp(), + Fields: evt.ToFields(), + }) + }, + Log: func(evt *cloudfoundry.EventLog) { + forwarder.Send(beat.Event{ + Timestamp: evt.Timestamp(), + Fields: evt.ToFields(), + }) + }, + Error: func(evt *cloudfoundry.EventError) { + forwarder.Send(beat.Event{ + Timestamp: evt.Timestamp(), + Fields: evt.ToFields(), + }) + }, + } + + listener, err := hub.RlpListener(callbacks) + if err != nil { + return nil, err + } + return &InputV2{ + outlet: out, + listener: listener, + started: false, + log: log, + }, nil +} + +// Run starts the listener of cloudfoundry events +func (p *InputV2) Run() { + p.Lock() + defer p.Unlock() + + if !p.started { + p.log.Info("starting cloudfoundry input") + p.listener.Start(context.TODO()) + p.started = true + } +} + +// Stop stops cloudfoundry listener +func (p *InputV2) Stop() { + defer p.outlet.Close() + p.Lock() + defer p.Unlock() + + p.log.Info("stopping cloudfoundry input") + p.listener.Stop() + p.started = false +} + +// Wait waits for the input to finalize, and stops it +func (p *InputV2) Wait() { + p.Stop() +} diff --git a/x-pack/libbeat/common/cloudfoundry/config.go b/x-pack/libbeat/common/cloudfoundry/config.go index e634f45fd36..8f1139bd7a2 100644 --- a/x-pack/libbeat/common/cloudfoundry/config.go +++ b/x-pack/libbeat/common/cloudfoundry/config.go @@ -6,14 +6,24 @@ package cloudfoundry import ( "crypto/tls" + "fmt" + "strings" "time" + "github.com/gofrs/uuid" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" +) - "github.com/gofrs/uuid" +const ( + ConsumerVersionV1 = "v1" + ConsumerVersionV2 = "v2" ) type Config struct { + // Version of the consumer to use, it can be v1 or v2, defaults to v1 + Version string `config:"version"` + // CloudFoundry credentials for retrieving OAuth tokens ClientID string `config:"client_id" validate:"required"` ClientSecret string `config:"client_secret" validate:"required"` @@ -45,6 +55,15 @@ func (c *Config) InitDefaults() { } c.ShardID = uuid.String() c.CacheDuration = 120 * time.Second + c.Version = ConsumerVersionV1 +} + +func (c *Config) Validate() error { + supportedVersions := []string{ConsumerVersionV1, ConsumerVersionV2} + if !anyOf(supportedVersions, c.Version) { + return fmt.Errorf("not supported version %v, expected one of %s", c.Version, strings.Join(supportedVersions, ", ")) + } + return nil } // TLSConfig returns the TLS configuration. @@ -55,3 +74,12 @@ func (c *Config) TLSConfig() (*tls.Config, error) { } return tls.ToConfig(), nil } + +func anyOf(elems []string, s string) bool { + for _, elem := range elems { + if s == elem { + return true + } + } + return false +} diff --git a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go new file mode 100644 index 00000000000..8d068a4fd84 --- /dev/null +++ b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go @@ -0,0 +1,204 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "fmt" + "net/http" + "regexp" + "sync" + + "github.com/cloudfoundry-community/go-cfclient" + "github.com/cloudfoundry/noaa/consumer" + "github.com/cloudfoundry/sonde-go/events" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +type DopplerCallbacks struct { + Log func(evt Event) + Metric func(evt Event) + Error func(evt EventError) +} + +type DopplerConsumer struct { + sync.Mutex + + subscriptionID string + callbacks DopplerCallbacks + consumer *consumer.Consumer + tokenRefresher consumer.TokenRefresher + + log *logp.Logger + wg sync.WaitGroup + stop chan struct{} + started bool +} + +func newDopplerConsumer(address string, id string, log *logp.Logger, client *http.Client, tr *TokenRefresher, callbacks DopplerCallbacks) (*DopplerConsumer, error) { + transport, ok := client.Transport.(*http.Transport) + if !ok { + return nil, fmt.Errorf("expected http transport on client") + } + + c := consumer.New(address, transport.TLSClientConfig, transport.Proxy) + c.RefreshTokenFrom(tr) + c.SetDebugPrinter(newLogpDebugPrinter(log)) + + return &DopplerConsumer{ + subscriptionID: id, + consumer: c, + tokenRefresher: tr, + callbacks: callbacks, + log: log, + }, nil +} + +func (c *DopplerConsumer) Run() { + c.Lock() + defer c.Unlock() + if c.started { + return + } + c.stop = make(chan struct{}) + + if c.callbacks.Log != nil { + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.logsFirehose() + }() + } + + if c.callbacks.Metric != nil { + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.metricsFirehose() + }() + } + + c.started = true +} + +func (c *DopplerConsumer) logsFirehose() { + c.firehose(c.callbacks.Log, consumer.LogMessages) +} + +func (c *DopplerConsumer) metricsFirehose() { + c.firehose(c.callbacks.Metric, consumer.Metrics) +} + +func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeFilter) { + var msgChan <-chan *events.Envelope + var errChan <-chan error + filterFn := filterNoFilter + if filter == consumer.LogMessages { + // We are interested in more envelopes than the ones obtained when filtering + // by log messages, retrieve them all and filter later. + // If this causes performance or other problems, we will have to investigate + // if it is possible to pass different filters to the firehose url. + filterFn = filterLogs + msgChan, errChan = c.consumer.Firehose(c.subscriptionID, "") + } else { + msgChan, errChan = c.consumer.FilteredFirehose(c.subscriptionID, "", filter) + } + for { + select { + case env := <-msgChan: + if !filterFn(env) { + continue + } + event := envelopeToEvent(env) + if event == nil { + c.log.Debugf("Envelope couldn't be converted to event: %+v", env) + continue + } + if evtError, ok := event.(*EventError); ok { + c.reportError(*evtError) + continue + } + cb(event) + case err := <-errChan: + // This error is an error on the connection, not a cloud foundry + // error envelope. Firehose should be able to reconnect, so just log it. + c.log.Infof("Error received on firehose: %v", err) + case <-c.stop: + return + } + } +} + +func filterNoFilter(*events.Envelope) bool { return true } +func filterLogs(e *events.Envelope) bool { + if e == nil || e.EventType == nil { + return false + } + switch *e.EventType { + case events.Envelope_HttpStartStop, events.Envelope_LogMessage, events.Envelope_Error: + return true + } + return false +} + +func (c *DopplerConsumer) reportError(e EventError) { + if c.callbacks.Error == nil { + c.log.Debugf("No callback for errors, error received: %s", e) + return + } + c.callbacks.Error(e) +} + +func (c *DopplerConsumer) Stop() { + c.Lock() + defer c.Unlock() + if !c.started { + return + } + + close(c.stop) + err := c.consumer.Close() + if err != nil { + c.log.Errorf("Error while closing doppler consumer: %v", err) + } + + c.started = false +} + +func (c *DopplerConsumer) Wait() { + c.Stop() + c.wg.Wait() +} + +type TokenRefresher struct { + client *cfclient.Client +} + +func TokenRefresherFromCfClient(c *cfclient.Client) *TokenRefresher { + return &TokenRefresher{client: c} +} + +func (tr *TokenRefresher) RefreshAuthToken() (token string, authError error) { + return tr.client.GetToken() +} + +type LogpDebugPrinter struct { + log *logp.Logger +} + +func newLogpDebugPrinter(log *logp.Logger) *LogpDebugPrinter { + return &LogpDebugPrinter{log: log} +} + +var authorizationHeaderRE = regexp.MustCompile("Authorization: .*\n") + +func (p *LogpDebugPrinter) Print(title, dump string) { + if !p.log.IsDebug() { + return + } + // Avoid printing out authorization tokens, Sec-WebSocket-Key is already hidden by the library. + dump = authorizationHeaderRE.ReplaceAllString(dump, "Authorization: [HIDDEN]\n") + p.log.Debugf("%s: %s", title, dump) +} diff --git a/x-pack/libbeat/common/cloudfoundry/events.go b/x-pack/libbeat/common/cloudfoundry/events.go index e946499c49e..f134585ac65 100644 --- a/x-pack/libbeat/common/cloudfoundry/events.go +++ b/x-pack/libbeat/common/cloudfoundry/events.go @@ -378,7 +378,7 @@ func newEventBase(env *events.Envelope) eventBase { func newEventHttpAccess(env *events.Envelope) *EventHttpAccess { msg := env.GetHttpStartStop() - return &EventHttpAccess{ + e := EventHttpAccess{ eventAppBase: eventAppBase{ eventBase: newEventBase(env), appGuid: formatUUID(msg.ApplicationId), @@ -393,9 +393,12 @@ func newEventHttpAccess(env *events.Envelope) *EventHttpAccess { userAgent: *msg.UserAgent, statusCode: *msg.StatusCode, contentLength: *msg.ContentLength, - instanceIndex: *msg.InstanceIndex, forwarded: msg.Forwarded, } + if msg.InstanceIndex != nil { + e.instanceIndex = *msg.InstanceIndex + } + return &e } func newEventLog(env *events.Envelope) *EventLog { diff --git a/x-pack/libbeat/common/cloudfoundry/hub.go b/x-pack/libbeat/common/cloudfoundry/hub.go index 5f92e0db359..823087ea959 100644 --- a/x-pack/libbeat/common/cloudfoundry/hub.go +++ b/x-pack/libbeat/common/cloudfoundry/hub.go @@ -99,23 +99,55 @@ func (h *Hub) RlpListenerFromClient(client Client, callbacks RlpListenerCallback return newRlpListener(rlpAddress, doer, h.cfg.ShardID, callbacks, h.log), nil } -// doerFromClient returns an auth token doer using uaa. -func (h *Hub) doerFromClient(client Client) (*authTokenDoer, error) { - httpClient, _, err := h.httpClient() +func (h *Hub) DopplerConsumer(callbacks DopplerCallbacks) (*DopplerConsumer, error) { + client, err := h.Client() if err != nil { return nil, err } + return h.DopplerConsumerFromClient(client, callbacks) +} + +func (h *Hub) DopplerConsumerFromClient(client Client, callbacks DopplerCallbacks) (*DopplerConsumer, error) { + dopplerAddress := h.cfg.DopplerAddress + if dopplerAddress == "" { + endpoint, err := cfEndpoint(client) + if err != nil { + return nil, errors.Wrap(err, "getting endpoints from client") + } + dopplerAddress = endpoint.DopplerEndpoint + } + httpClient, _, err := h.httpClient() + if err != nil { + return nil, errors.Wrap(err, "getting http client") + } + + // TODO: Refactor Client so it is easier to access the cfclient ccw, ok := client.(*clientCacheWrap) if !ok { - return nil, fmt.Errorf("must pass client returned from hub.Client") + return nil, fmt.Errorf("client without cache wrap") } - cfClient, ok := ccw.client.(*cfclient.Client) + cfc, ok := ccw.client.(*cfclient.Client) if !ok { - return nil, fmt.Errorf("must pass client returned from hub.Client") + return nil, fmt.Errorf("client is not a cloud foundry client") } - url := cfClient.Endpoint.AuthEndpoint - if h.cfg.UaaAddress != "" { - url = h.cfg.UaaAddress + + tr := TokenRefresherFromCfClient(cfc) + return newDopplerConsumer(dopplerAddress, h.cfg.ShardID, h.log, httpClient, tr, callbacks) +} + +// doerFromClient returns an auth token doer using uaa. +func (h *Hub) doerFromClient(client Client) (*authTokenDoer, error) { + httpClient, _, err := h.httpClient() + if err != nil { + return nil, err + } + url := h.cfg.UaaAddress + if url == "" { + endpoint, err := cfEndpoint(client) + if err != nil { + return nil, errors.Wrap(err, "getting endpoints from client") + } + url = endpoint.AuthEndpoint } return newAuthTokenDoer(url, h.cfg.ClientID, h.cfg.ClientSecret, httpClient, h.log), nil } @@ -133,6 +165,18 @@ func (h *Hub) httpClient() (*http.Client, bool, error) { return httpClient, tls.InsecureSkipVerify, nil } +func cfEndpoint(client Client) (cfclient.Endpoint, error) { + ccw, ok := client.(*clientCacheWrap) + if !ok { + return cfclient.Endpoint{}, fmt.Errorf("client without cache wrap") + } + cfc, ok := ccw.client.(*cfclient.Client) + if !ok { + return cfclient.Endpoint{}, fmt.Errorf("client is not a cloud foundry client") + } + return cfc.Endpoint, nil +} + // defaultTransport returns a new http.Transport for http.Client func defaultTransport() *http.Transport { defaultTransport := http.DefaultTransport.(*http.Transport) diff --git a/x-pack/libbeat/common/cloudfoundry/test/config.go b/x-pack/libbeat/common/cloudfoundry/test/config.go index c4024caa5ae..f7b9cd18ffb 100644 --- a/x-pack/libbeat/common/cloudfoundry/test/config.go +++ b/x-pack/libbeat/common/cloudfoundry/test/config.go @@ -23,6 +23,7 @@ func GetConfigFromEnv(t *testing.T) map[string]interface{} { optionalConfig(config, "uaa_address", "CLOUDFOUNDRY_UAA_ADDRESS") optionalConfig(config, "rlp_address", "CLOUDFOUNDRY_RLP_ADDRESS") optionalConfig(config, "doppler_address", "CLOUDFOUNDRY_DOPPLER_ADDRESS") + optionalConfig(config, "shard_id", "CLOUDFOUNDRY_SHARD_ID") if t.Failed() { t.FailNow()