diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index af97c40c0ac..525dee11d47 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -182,6 +182,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Prevent complete loss of long request trace data. {issue}37826[37826] {pull}37836[37836]
- Added experimental version of the Websocket Input. {pull}37774[37774]
- Add support for PEM-based Okta auth in CEL. {pull}37813[37813]
+- Add Salesforce input. {pull}37331[37331]
- Add ETW input. {pull}36915[36915]
- Update CEL mito extensions to v1.9.0 to add keys/values helper. {pull}37971[37971]
- Add logging for cache processor file reads and writes. {pull}38052[38052]
diff --git a/NOTICE.txt b/NOTICE.txt
index eb8588c7783..f846edfc9ec 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -16447,6 +16447,37 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+--------------------------------------------------------------------------------
+Dependency : github.com/elastic/go-sfdc
+Version: v0.0.0-20201201191151-3190c381b3e1
+Licence type (autodetected): MIT
+--------------------------------------------------------------------------------
+
+Contents of probable licence file $GOMODCACHE/github.com/elastic/go-sfdc@v0.0.0-20201201191151-3190c381b3e1/LICENSE.txt:
+
+MIT License
+
+Copyright (c) 2019 Robert Sean Justice
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+
+
--------------------------------------------------------------------------------
Dependency : github.com/go-ldap/ldap/v3
Version: v3.4.6
@@ -17400,6 +17431,25 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+--------------------------------------------------------------------------------
+Dependency : github.com/golang-jwt/jwt
+Version: v3.2.1+incompatible
+Licence type (autodetected): MIT
+--------------------------------------------------------------------------------
+
+Contents of probable licence file $GOMODCACHE/github.com/golang-jwt/jwt@v3.2.1+incompatible/LICENSE:
+
+Copyright (c) 2012 Dave Grijalva
+Copyright (c) 2021 golang-jwt maintainers
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+
--------------------------------------------------------------------------------
Dependency : github.com/golang/mock
Version: v1.6.0
@@ -25058,6 +25108,43 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+--------------------------------------------------------------------------------
+Dependency : golang.org/x/exp
+Version: v0.0.0-20231127185646-65229373498e
+Licence type (autodetected): BSD-3-Clause
+--------------------------------------------------------------------------------
+
+Contents of probable licence file $GOMODCACHE/golang.org/x/exp@v0.0.0-20231127185646-65229373498e/LICENSE:
+
+Copyright (c) 2009 The Go Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
--------------------------------------------------------------------------------
Dependency : golang.org/x/lint
Version: v0.0.0-20210508222113-6edffad5e616
@@ -37123,6 +37210,24 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/ristretto@v0.1.
END OF TERMS AND CONDITIONS
+--------------------------------------------------------------------------------
+Dependency : github.com/dgrijalva/jwt-go
+Version: v3.2.0+incompatible
+Licence type (autodetected): MIT
+--------------------------------------------------------------------------------
+
+Contents of probable licence file $GOMODCACHE/github.com/dgrijalva/jwt-go@v3.2.0+incompatible/LICENSE:
+
+Copyright (c) 2012 Dave Grijalva
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+
--------------------------------------------------------------------------------
Dependency : github.com/dgryski/go-farm
Version: v0.0.0-20190423205320-6a90982ecee2
@@ -53865,43 +53970,6 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
---------------------------------------------------------------------------------
-Dependency : golang.org/x/exp
-Version: v0.0.0-20231127185646-65229373498e
-Licence type (autodetected): BSD-3-Clause
---------------------------------------------------------------------------------
-
-Contents of probable licence file $GOMODCACHE/golang.org/x/exp@v0.0.0-20231127185646-65229373498e/LICENSE:
-
-Copyright (c) 2009 The Go Authors. All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are
-met:
-
- * Redistributions of source code must retain the above copyright
-notice, this list of conditions and the following disclaimer.
- * Redistributions in binary form must reproduce the above
-copyright notice, this list of conditions and the following disclaimer
-in the documentation and/or other materials provided with the
-distribution.
- * Neither the name of Google Inc. nor the names of its
-contributors may be used to endorse or promote products derived from
-this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
--------------------------------------------------------------------------------
Dependency : golang.org/x/term
Version: v0.18.0
diff --git a/go.mod b/go.mod
index 9e8c91fc742..a080de26a98 100644
--- a/go.mod
+++ b/go.mod
@@ -211,7 +211,9 @@ require (
github.com/elastic/tk-btf v0.1.0
github.com/elastic/toutoumomoma v0.0.0-20221026030040-594ef30cb640
github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15
+ github.com/g8rswimmer/go-sfdc v0.0.0-00010101000000-000000000000
github.com/go-ldap/ldap/v3 v3.4.6
+ github.com/golang-jwt/jwt v3.2.1+incompatible
github.com/google/cel-go v0.19.0
github.com/googleapis/gax-go/v2 v2.12.0
github.com/gorilla/handlers v1.5.1
@@ -229,6 +231,7 @@ require (
go.elastic.co/apm/module/apmhttp/v2 v2.4.8
go.elastic.co/apm/v2 v2.4.8
go.mongodb.org/mongo-driver v1.5.1
+ golang.org/x/exp v0.0.0-20231127185646-65229373498e
golang.org/x/tools/go/vcs v0.1.0-deprecated
google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb
gopkg.in/natefinch/lumberjack.v2 v2.0.0
@@ -277,6 +280,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dgraph-io/ristretto v0.1.0 // indirect
+ github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
@@ -375,7 +379,6 @@ require (
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
- golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
@@ -415,6 +418,7 @@ replace (
github.com/dop251/goja_nodejs => github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6
github.com/fsnotify/fsevents => github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270
github.com/fsnotify/fsnotify => github.com/adriansr/fsnotify v1.4.8-0.20211018144411-a81f2b630e7c
+ github.com/g8rswimmer/go-sfdc => github.com/elastic/go-sfdc v0.0.0-20201201191151-3190c381b3e1
github.com/godror/godror => github.com/godror/godror v0.33.2 // updating to v0.24.2 caused a breaking change
github.com/golang/glog => github.com/elastic/glog v1.0.1-0.20210831205241-7d8b5c89dfc4
github.com/google/gopacket => github.com/elastic/gopacket v1.1.20-0.20211202005954-d412fca7f83a
diff --git a/go.sum b/go.sum
index d839df6a98e..7900b675836 100644
--- a/go.sum
+++ b/go.sum
@@ -598,6 +598,7 @@ github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2/go.mod h1:XG9mPq0dF
github.com/dgraph-io/badger/v3 v3.2103.1 h1:zaX53IRg7ycxVlkd5pYdCeFp1FynD6qBGQoQql3R3Hk=
github.com/dgraph-io/badger/v3 v3.2103.1/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4=
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8/go.mod h1:VMaSuZ+SZcx/wljOQKvp5srsbCiKDEb6K2wC4+PiBmQ=
@@ -706,6 +707,8 @@ github.com/elastic/go-plugins-helpers v0.0.0-20200207104224-bdf17607b79f h1:Fvsq
github.com/elastic/go-plugins-helpers v0.0.0-20200207104224-bdf17607b79f/go.mod h1:OPGqFNdTS34kMReS5hPFtBhD9J8itmSDurs1ix2wx7c=
github.com/elastic/go-seccomp-bpf v1.4.0 h1:6y3lYrEHrLH9QzUgOiK8WDqmPaMnnB785WxibCNIOH4=
github.com/elastic/go-seccomp-bpf v1.4.0/go.mod h1:wIMxjTbKpWGQk4CV9WltlG6haB4brjSH/dvAohBPM1I=
+github.com/elastic/go-sfdc v0.0.0-20201201191151-3190c381b3e1 h1:KS+lvT/rUS8Z4++RoiM2pHOKmBv8mLERmgiX04VEgwk=
+github.com/elastic/go-sfdc v0.0.0-20201201191151-3190c381b3e1/go.mod h1:/FB/tWFyF33vmdjwIwqAKu9QMVFVEjeoWi9V6eUcQEQ=
github.com/elastic/go-structform v0.0.10 h1:oy08o/Ih2hHTkNcRY/1HhaYvIp5z6t8si8gnCJPDo1w=
github.com/elastic/go-structform v0.0.10/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4=
github.com/elastic/go-sysinfo v1.13.1 h1:U5Jlx6c/rLkR72O8wXXXo1abnGlWGJU/wbzNJ2AfQa4=
@@ -995,6 +998,7 @@ github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
+github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c=
github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE=
diff --git a/x-pack/filebeat/docs/inputs/input-salesforce.asciidoc b/x-pack/filebeat/docs/inputs/input-salesforce.asciidoc
new file mode 100644
index 00000000000..3b85b8511b6
--- /dev/null
+++ b/x-pack/filebeat/docs/inputs/input-salesforce.asciidoc
@@ -0,0 +1,276 @@
+[role="xpack"]
+
+:type: salesforce
+
+[id="{beatname_lc}-input-{type}"]
+=== Salesforce input
+
+++++
+Salesforce
+++++
+
+Use the `salesforce` input to monitor Salesforce events either via the https://developer.salesforce.com/docs/atlas.en-us.object_reference.meta/object_reference/sforce_api_objects_eventlogfile.htm[Salesforce EventLogFile (ELF) API] or the https://developer.salesforce.com/blogs/2020/05/introduction-to-real-time-event-monitoring[Salesforce Real-time event monitoring API]. Both use REST API (to execute SOQL queries in the Salesforce instance) under the hood to query the relevant objects to fetch the events.
+
+Here the `input` have cursor state(s) that will be provided to the next execution of event monitoring to fetch the events from the last cursor state. The cursor states can be used to control the behaviour of the program.
+
+This input supports:
+
+* Auth
+** OAuth2
+*** User-Password flow
+*** JWT Bearer flow
+* Event Monitoring
+** EventLogFile (ELF) using REST API
+** REST API for objects (Used for Setup Audit Trail and for monitoring real-time events)
+
+Event Monitoring methods are highly configurable and can be used to monitor any supported object or event log file. The input can be configured to monitor multiple objects or event log files at the same time.
+
+Example configurations:
+
+["source","yaml",subs="attributes"]
+----
+filebeat.inputs:
+ - type: salesforce
+ enabled: true
+ version: 56
+ auth.oauth2:
+ user_password_flow:
+ enabled: true
+ client.id: client-id
+ client.secret: client-secret
+ token_url: https://instance-id.develop.my.salesforce.com
+ username: salesforce-instance@user.in
+ password: salesforce-instance-password
+ jwt_bearer_flow:
+ enabled: true
+ client.id: client-id
+ client.username: salesforce-instance@user.in
+ client.key_path: server_client.key
+ url: https://login.salesforce.com
+ url: https://instance-id.develop.my.salesforce.com
+ event_monitoring_method:
+ event_log_file:
+ enabled: true
+ interval: 1h
+ query:
+ default: "SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' ORDER BY CreatedDate ASC NULLS FIRST"
+ value: "SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' AND CreatedDate > [[ .cursor.event_log_file.last_event_time ]] ORDER BY CreatedDate ASC NULLS FIRST"
+ cursor:
+ field: "CreatedDate"
+ object:
+ enabled: true
+ interval: 5m
+ query:
+ default: "SELECT FIELDS(STANDARD) FROM LoginEvent"
+ value: "SELECT FIELDS(STANDARD) FROM LoginEvent WHERE EventDate > [[ .cursor.object.first_event_time ]]"
+ cursor:
+ field: "EventDate"
+----
+
+
+==== Execution
+
+The `salesforce` input is a long-running program that retrieves events from a Salesforce instance and sends them to the specified output. The program executes in a loop, fetching events from the Salesforce instance at a preconfigured interval. Each event monitoring method can be configured to run separately and at different intervals. To prevent a sudden spike in memory usage, if multiple event monitoring methods are configured, they are scheduled to run one at a time. Even if the intervals overlap, only one method will be executed randomly, and the other will be executed after the first one completes.
+
+There are two methods to fetch the events from the Salesforce instance:
+
+- event_log_file: https://developer.salesforce.com/docs/atlas.en-us.object_reference.meta/object_reference/sforce_api_objects_eventlogfile.htm[EventLogFile] is a standard object in Salesforce and the event monitoring method uses the REST API under the hood to gather the Salesforce org's operational events from the object. There is a field EventType that helps distinguish between the types of operational events like — Login, Logout, etc. Uses Salesforce's query language SOQL to query the object.
+
+- object: This method is a general way of retrieving events from a Salesforce instance by using the REST API. It can be used for the SetupAuditTrail and for monitoring objects in real-time. In real-time event monitoring, subscribing to the events is a common practice, but the events are also stored in Salesforce org (if configured), specifically in big object tables that are preconfigured for each event type. To query the object, Salesforce's query language SOQL is used.
+
+==== Configuration options
+
+The `salesforce` input supports the following configuration options plus the
+<<{beatname_lc}-input-{type}-common-options>> described later.
+
+[bool]
+==== `enabled`
+
+Whether the input is enabled or not. Default: `false`.
+
+[integer]
+==== `version`
+
+The version of the Salesforce API to use. Minimum supported version is 46.
+
+[object]
+==== `auth`
+
+The authentication settings for the Salesforce instance.
+
+[object]
+==== `auth.oauth2`
+
+The OAuth2 authentication options for the Salesforce instance.
+
+There are two OAuth2 authentication flows supported:
+
+* `user_password_flow`: User-Password flow
+* `jwt_bearer_flow`: JWT Bearer flow
+
+[bool]
+==== `auth.oauth2.user_password_flow.enabled`
+
+Whether to use the user-password flow for authentication. Default: `false`.
+
+NOTE: Only one authentication flow can be enabled at a time.
+
+[string]
+==== `auth.oauth2.user_password_flow.client.id`
+
+The client ID for the user-password flow.
+
+[string]
+==== `auth.oauth2.user_password_flow.client.secret`
+
+The client secret for the user-password flow.
+
+[string]
+==== `auth.oauth2.user_password_flow.token_url`
+
+The token URL for the user-password flow.
+
+[string]
+==== `auth.oauth2.user_password_flow.username`
+
+The username for the user-password flow.
+
+[string]
+==== `auth.oauth2.user_password_flow.password`
+
+The password for the user-password flow.
+
+[bool]
+==== `auth.oauth2.jwt_bearer_flow.enabled`
+
+Whether to use the JWT bearer flow for authentication. Default: `false`.
+
+NOTE: Only one authentication flow can be enabled at a time.
+
+[string]
+==== `auth.oauth2.jwt_bearer_flow.client.id`
+
+The client ID for the JWT bearer flow.
+
+[string]
+==== `auth.oauth2.jwt_bearer_flow.client.username`
+
+The username for the JWT bearer flow.
+
+[string]
+==== `auth.oauth2.jwt_bearer_flow.client.key_path`
+
+The path to the private key file for the JWT bearer flow. The file must be PEM encoded PKCS1 or PKCS8 private key and must have the right permissions set to have read access for the user running the program.
+
+[string]
+==== `auth.oauth2.jwt_bearer_flow.url`
+
+The URL for the JWT bearer flow.
+
+[string]
+==== `url`
+
+The URL of the Salesforce instance. Required.
+
+[[resource-parameters]]
+[float]
+==== `resource.timeout`
+
+Duration before declaring that the HTTP client connection has timed out. Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`. Default: `30s`.
+
+[float]
+==== `resource.retry.max_attempts`
+
+The maximum number of retries for the HTTP client. Default: `5`.
+
+[float]
+==== `resource.retry.wait_min`
+
+The minimum time to wait before a retry is attempted. Default: `1s`.
+
+[float]
+==== `resource.retry.wait_max`
+
+The maximum time to wait before a retry is attempted. Default: `60s`.
+
+[object]
+==== `event_monitoring_method`
+
+The event monitoring method to use. There are two event monitoring methods supported:
+
+* `event_log_file`: EventLogFile (ELF) using REST API
+
+* `object`: Real-time event monitoring using REST API (objects)
+
+[object]
+==== `event_monitoring_method.event_log_file`
+
+The event monitoring method to use — event_log_file. Uses the EventLogFile API to fetch the events from the Salesforce instance.
+
+[bool]
+==== `event_monitoring_method.event_log_file.enabled`
+
+Whether to use the EventLogFile API for event monitoring. Default: `false`.
+
+[duration]
+==== `event_monitoring_method.event_log_file.interval`
+
+The interval to collect the events from the Salesforce instance using the EventLogFile API.
+
+[string]
+==== `event_monitoring_method.event_log_file.query.default`
+
+The default query to fetch the events from the Salesforce instance using the EventLogFile API.
+
+In case the cursor state is not available, the default query will be used to fetch the events from the Salesforce instance. The default query must be a valid SOQL query. If the SOQL query in `event_monitoring_method.event_log_file.query.value` is not valid, the default query will be used to fetch the events from the Salesforce instance.
+
+[string]
+==== `event_monitoring_method.event_log_file.query.value`
+
+The SOQL query to fetch the events from the Salesforce instance using the EventLogFile API but it uses the cursor state to fetch the events from the Salesforce instance. The SOQL query must be a valid SOQL query. If the SOQL query is not valid, the default query will be used to fetch the events from the Salesforce instance.
+
+In case of restarts or subsequent executions, the cursor state will be used to fetch the events from the Salesforce instance. The cursor state is the last event time of the last event fetched from the Salesforce instance. The cursor state is taken from `event_monitoring_method.event_log_file.cursor.field` field for the last event fetched from the Salesforce instance.
+
+[string]
+==== `event_monitoring_method.event_log_file.cursor.field`
+
+The field to use to fetch the cursor state from the last event fetched from the Salesforce instance. The field must be a valid field in the SOQL query specified in `event_monitoring_method.event_log_file.query.default` and `event_monitoring_method.event_log_file.query.value` i.e., part of the selected fields in the SOQL query.
+
+[object]
+==== `event_monitoring_method.object`
+
+The event monitoring method to use — object. Uses REST API to fetch the events directly from the objects from the Salesforce instance. This method is used for Setup Audit Trail and for monitoring real-time events.
+
+[bool]
+==== `event_monitoring_method.object.enabled`
+
+Whether to use the REST API for objects for event monitoring. Default: `false`.
+
+[duration]
+==== `event_monitoring_method.object.interval`
+
+The interval to collect the events from the Salesforce instance using the REST API from objects.
+
+[string]
+==== `event_monitoring_method.object.query.default`
+
+The default SOQL query to fetch the events from the Salesforce instance using the REST API from objects.
+
+In case the cursor state is not available, the default query will be used to fetch the events from the Salesforce instance. The default query must be a valid SOQL query. If the SOQL query in `event_monitoring_method.object.query.value` is not valid, the default query will be used to fetch the events from the Salesforce instance.
+
+[string]
+==== `event_monitoring_method.object.query.value`
+
+The SOQL query to fetch the events from the Salesforce instance using the REST API from objects but it uses the cursor state to fetch the events from the Salesforce instance. The SOQL query must be a valid SOQL query. If the SOQL query is not valid, the default query will be used to fetch the events from the Salesforce instance.
+
+In case of restarts or subsequent executions, the cursor state will be used to fetch the events from the Salesforce instance. The cursor state is the last event time of the last event fetched from the Salesforce instance. The cursor state is taken from `event_monitoring_method.object.cursor.field` field for the last event fetched from the Salesforce instance.
+
+[string]
+==== `event_monitoring_method.object.cursor.field`
+
+The field to use to fetch the cursor state from the last event fetched from the Salesforce instance. The field must be a valid field in the SOQL query specified in `event_monitoring_method.object.query.default` and `event_monitoring_method.object.query.value` i.e., part of the selected fields in the SOQL query.
+
+[id="{beatname_lc}-input-{type}-common-options"]
+include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]
+
+:type!:
diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go
index 91d5917f261..ab682e4e001 100644
--- a/x-pack/filebeat/input/default-inputs/inputs_other.go
+++ b/x-pack/filebeat/input/default-inputs/inputs_other.go
@@ -22,6 +22,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/salesforce"
"github.com/elastic/beats/v7/x-pack/filebeat/input/shipper"
"github.com/elastic/beats/v7/x-pack/filebeat/input/websocket"
"github.com/elastic/elastic-agent-libs/logp"
@@ -40,6 +41,7 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
awss3.Plugin(store),
awscloudwatch.Plugin(),
lumberjack.Plugin(),
+ salesforce.Plugin(log, store),
shipper.Plugin(log, store),
websocket.Plugin(log, store),
netflow.Plugin(log),
diff --git a/x-pack/filebeat/input/salesforce/config.go b/x-pack/filebeat/input/salesforce/config.go
new file mode 100644
index 00000000000..55ee1c81e83
--- /dev/null
+++ b/x-pack/filebeat/input/salesforce/config.go
@@ -0,0 +1,131 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package salesforce
+
+import (
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/elastic/elastic-agent-libs/transport/httpcommon"
+)
+
+type config struct {
+ Resource *resourceConfig `config:"resource"`
+ Auth *authConfig `config:"auth"`
+ EventMonitoringMethod *eventMonitoringMethod `config:"event_monitoring_method"`
+ URL string `config:"url" validate:"required"`
+ Version int `config:"version" validate:"required"`
+}
+
+type resourceConfig struct {
+ Retry retryConfig `config:"retry"`
+ Transport httpcommon.HTTPTransportSettings `config:",inline"`
+}
+
+type retryConfig struct {
+ MaxAttempts *int `config:"max_attempts"`
+ WaitMin *time.Duration `config:"wait_min"`
+ WaitMax *time.Duration `config:"wait_max"`
+}
+
+func (c retryConfig) Validate() error {
+ switch {
+ case c.MaxAttempts != nil && *c.MaxAttempts <= 0:
+ return errors.New("max_attempts must be greater than zero")
+ case c.WaitMin != nil && *c.WaitMin <= 0:
+ return errors.New("wait_min must be greater than zero")
+ case c.WaitMax != nil && *c.WaitMax <= 0:
+ return errors.New("wait_max must be greater than zero")
+ }
+ return nil
+}
+
+func (c retryConfig) getMaxAttempts() int {
+ if c.MaxAttempts == nil {
+ return 0
+ }
+ return *c.MaxAttempts
+}
+
+func (c retryConfig) getWaitMin() time.Duration {
+ if c.WaitMin == nil {
+ return 0
+ }
+ return *c.WaitMin
+}
+
+func (c retryConfig) getWaitMax() time.Duration {
+ if c.WaitMax == nil {
+ return 0
+ }
+ return *c.WaitMax
+}
+
+type eventMonitoringMethod struct {
+ EventLogFile EventMonitoringConfig `config:"event_log_file"`
+ Object EventMonitoringConfig `config:"object"`
+}
+
+type EventMonitoringConfig struct {
+ Enabled *bool `config:"enabled"`
+ Query *QueryConfig `config:"query"`
+ Cursor *cursorConfig `config:"cursor"`
+ Interval time.Duration `config:"interval"`
+}
+
+func (e *EventMonitoringConfig) isEnabled() bool {
+ return e != nil && (e.Enabled != nil && *e.Enabled)
+}
+
+type cursorConfig struct {
+ Field string `config:"field"`
+}
+
+// Validate validates the configuration.
+func (c *config) Validate() error {
+ switch {
+ case !c.Auth.OAuth2.JWTBearerFlow.isEnabled() && !c.Auth.OAuth2.UserPasswordFlow.isEnabled():
+ return errors.New("no auth provider enabled")
+ case c.Auth.OAuth2.JWTBearerFlow.isEnabled() && c.Auth.OAuth2.UserPasswordFlow.isEnabled():
+ return errors.New("only one auth provider must be enabled")
+ case c.URL == "":
+ return errors.New("no instance url is configured")
+ case !c.EventMonitoringMethod.Object.isEnabled() && !c.EventMonitoringMethod.EventLogFile.isEnabled():
+ return errors.New(`at least one of "event_monitoring_method.event_log_file.enabled" or "event_monitoring_method.object.enabled" must be set to true`)
+ case c.EventMonitoringMethod.EventLogFile.isEnabled() && c.EventMonitoringMethod.EventLogFile.Interval == 0:
+ return fmt.Errorf("not a valid interval %d", c.EventMonitoringMethod.EventLogFile.Interval)
+ case c.EventMonitoringMethod.Object.isEnabled() && c.EventMonitoringMethod.Object.Interval == 0:
+ return fmt.Errorf("not a valid interval %d", c.EventMonitoringMethod.Object.Interval)
+
+ case c.Version < 46:
+ // - EventLogFile object is available in API version 32.0 or later
+ // - SetupAuditTrail object is available in API version 15.0 or later
+ // - Real-Time Event monitoring objects that were introduced as part of
+ // the beta release in API version 46.0
+ //
+ // To keep things simple, only one version is entertained i.e., the
+ // minimum version supported by all objects for which we have support
+ // for.
+ //
+ // minimum_vesion_supported_by_all_objects([32.0, 15.0, 46.0]) = 46.0
+ //
+ // (Objects like EventLogFile, SetupAuditTrail and Real-time monitoring
+ // objects are available in v46.0 and above)
+
+ // References:
+ // https://developer.salesforce.com/docs/atlas.en-us.object_reference.meta/object_reference/sforce_api_objects_eventlogfile.htm
+ // https://developer.salesforce.com/docs/atlas.en-us.object_reference.meta/object_reference/sforce_api_objects_setupaudittrail.htm
+ // https://developer.salesforce.com/docs/atlas.en-us.platform_events.meta/platform_events/platform_events_objects_monitoring.htm
+ return errors.New("not a valid version i.e., 46.0 or above")
+ }
+
+ return nil
+}
+
+type QueryConfig struct {
+ Default *valueTpl `config:"default"`
+ Value *valueTpl `config:"value"`
+}
diff --git a/x-pack/filebeat/input/salesforce/config_auth.go b/x-pack/filebeat/input/salesforce/config_auth.go
new file mode 100644
index 00000000000..6e0f9361def
--- /dev/null
+++ b/x-pack/filebeat/input/salesforce/config_auth.go
@@ -0,0 +1,92 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package salesforce
+
+import "errors"
+
+type authConfig struct {
+ // See: https://help.salesforce.com/s/articleView?id=sf.remoteaccess_oauth_flows.htm&type=5
+ // for more information about OAuth2 flows.
+ OAuth2 *OAuth2 `config:"oauth2"`
+}
+
+type OAuth2 struct {
+ // See: https://help.salesforce.com/s/articleView?id=sf.remoteaccess_oauth_username_password_flow.htm&type=5
+ UserPasswordFlow *UserPasswordFlow `config:"user_password_flow"`
+ // See: https://help.salesforce.com/s/articleView?id=sf.remoteaccess_oauth_jwt_flow.htm&type=5
+ JWTBearerFlow *JWTBearerFlow `config:"jwt_bearer_flow"`
+}
+
+type UserPasswordFlow struct {
+ Enabled *bool `config:"enabled"`
+
+ ClientID string `config:"client.id"`
+ ClientSecret string `config:"client.secret"`
+ Password string `config:"password"`
+ TokenURL string `config:"token_url"`
+ Username string `config:"username"`
+}
+
+type JWTBearerFlow struct {
+ Enabled *bool `config:"enabled"`
+
+ URL string `config:"url"`
+ ClientID string `config:"client.id"`
+ ClientUsername string `config:"client.username"`
+ ClientKeyPath string `config:"client.key_path"`
+}
+
+// isEnabled returns true if the `enable` field is set to true in the yaml.
+func (o *UserPasswordFlow) isEnabled() bool {
+ return o != nil && (o.Enabled != nil && *o.Enabled)
+}
+
+// Validate checks if User Passworld Flow config is valid.
+func (o *UserPasswordFlow) Validate() error {
+ if !o.isEnabled() {
+ return nil
+ }
+
+ switch {
+ case o.TokenURL == "":
+ return errors.New("token_url must be provided")
+ case o.ClientID == "":
+ return errors.New("client.id must be provided")
+ case o.ClientSecret == "":
+ return errors.New("client.secret must be provided")
+ case o.Username == "":
+ return errors.New("username must be provided")
+ case o.Password == "":
+ return errors.New("password must be provided")
+
+ }
+
+ return nil
+}
+
+// isEnabled returns true if the `enable` field is set to true in the yaml.
+func (o *JWTBearerFlow) isEnabled() bool {
+ return o != nil && (o.Enabled != nil && *o.Enabled)
+}
+
+// Validate checks if JWT Bearer Flow config is valid.
+func (o *JWTBearerFlow) Validate() error {
+ if !o.isEnabled() {
+ return nil
+ }
+
+ switch {
+ case o.URL == "":
+ return errors.New("url must be provided")
+ case o.ClientID == "":
+ return errors.New("client.id must be provided")
+ case o.ClientUsername == "":
+ return errors.New("client.username must be provided")
+ case o.ClientKeyPath == "":
+ return errors.New("client.key_path must be provided")
+ }
+
+ return nil
+}
diff --git a/x-pack/filebeat/input/salesforce/config_auth_test.go b/x-pack/filebeat/input/salesforce/config_auth_test.go
new file mode 100644
index 00000000000..0b378f7b47f
--- /dev/null
+++ b/x-pack/filebeat/input/salesforce/config_auth_test.go
@@ -0,0 +1,55 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package salesforce
+
+import (
+ "errors"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestOAuth2Config(t *testing.T) {
+ tests := map[string]struct {
+ wantErr error
+ config UserPasswordFlow
+ }{
+ "auth disabled I": {config: UserPasswordFlow{}, wantErr: nil},
+ "auth disabled II": {config: UserPasswordFlow{Enabled: pointer(false)}, wantErr: nil},
+ "tokenURL missing": {config: UserPasswordFlow{Enabled: pointer(true), TokenURL: ""}, wantErr: errors.New("token_url must be provided")},
+ "clientID missing": {config: UserPasswordFlow{Enabled: pointer(true), TokenURL: "https://salesforce.com", ClientID: ""}, wantErr: errors.New("client.id must be provided")},
+ "clientSecret missing": {config: UserPasswordFlow{Enabled: pointer(true), TokenURL: "https://salesforce.com", ClientID: "xyz", ClientSecret: ""}, wantErr: errors.New("client.secret must be provided")},
+ "username missing": {config: UserPasswordFlow{Enabled: pointer(true), TokenURL: "https://salesforce.com", ClientID: "xyz", ClientSecret: "abc", Username: ""}, wantErr: errors.New("username must be provided")},
+ "password missing": {config: UserPasswordFlow{Enabled: pointer(true), TokenURL: "https://salesforce.com", ClientID: "xyz", ClientSecret: "abc", Username: "user", Password: ""}, wantErr: errors.New("password must be provided")},
+ "all present": {config: UserPasswordFlow{Enabled: pointer(true), TokenURL: "https://salesforce.com", ClientID: "xyz", ClientSecret: "abc", Username: "user", Password: "pass"}, wantErr: nil},
+ }
+ for name, tc := range tests {
+ t.Run(name, func(t *testing.T) {
+ got := tc.config.Validate()
+ assert.Equal(t, tc.wantErr, got)
+ })
+ }
+}
+
+func TestJWTConfig(t *testing.T) {
+ tests := map[string]struct {
+ wantErr error
+ config JWTBearerFlow
+ }{
+ "auth disabled I": {config: JWTBearerFlow{}, wantErr: nil},
+ "auth disabled II": {config: JWTBearerFlow{Enabled: pointer(false)}, wantErr: nil},
+ "url missing": {config: JWTBearerFlow{Enabled: pointer(true), URL: ""}, wantErr: errors.New("url must be provided")},
+ "clientID missing": {config: JWTBearerFlow{Enabled: pointer(true), URL: "https://salesforce.com", ClientID: ""}, wantErr: errors.New("client.id must be provided")},
+ "clientUsername missing": {config: JWTBearerFlow{Enabled: pointer(true), URL: "https://salesforce.com", ClientID: "xyz", ClientUsername: ""}, wantErr: errors.New("client.username must be provided")},
+ "clientKeyPath missing": {config: JWTBearerFlow{Enabled: pointer(true), URL: "https://salesforce.com", ClientID: "xyz", ClientUsername: "abc", ClientKeyPath: ""}, wantErr: errors.New("client.key_path must be provided")},
+ "all present": {config: JWTBearerFlow{Enabled: pointer(true), URL: "https://salesforce.com", ClientID: "xyz", ClientUsername: "abc", ClientKeyPath: "def"}, wantErr: nil},
+ }
+ for name, tc := range tests {
+ t.Run(name, func(t *testing.T) {
+ got := tc.config.Validate()
+ assert.Equal(t, tc.wantErr, got)
+ })
+ }
+}
diff --git a/x-pack/filebeat/input/salesforce/config_test.go b/x-pack/filebeat/input/salesforce/config_test.go
new file mode 100644
index 00000000000..b620d9bf9a9
--- /dev/null
+++ b/x-pack/filebeat/input/salesforce/config_test.go
@@ -0,0 +1,126 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package salesforce
+
+import (
+ "errors"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestValidate(t *testing.T) {
+ tests := map[string]struct {
+ wantErr error
+ inputCfg config
+ }{
+ "no auth provider enabled (no password or jwt)": {
+ inputCfg: config{
+ Auth: &authConfig{
+ OAuth2: &OAuth2{
+ UserPasswordFlow: &UserPasswordFlow{},
+ JWTBearerFlow: &JWTBearerFlow{},
+ },
+ },
+ },
+ wantErr: errors.New("no auth provider enabled"),
+ },
+ "only one auth provider is allowed (either password or jwt)": {
+ inputCfg: config{
+ Auth: &authConfig{
+ OAuth2: &OAuth2{
+ UserPasswordFlow: &UserPasswordFlow{Enabled: pointer(true)},
+ JWTBearerFlow: &JWTBearerFlow{Enabled: pointer(true)},
+ },
+ },
+ },
+ wantErr: errors.New("only one auth provider must be enabled"),
+ },
+ "no instance url is configured (empty url)": {
+ inputCfg: config{
+ URL: "",
+ Auth: &authConfig{
+ OAuth2: &OAuth2{
+ UserPasswordFlow: &UserPasswordFlow{Enabled: pointer(true)},
+ },
+ },
+ },
+ wantErr: errors.New("no instance url is configured"),
+ },
+ "no data collection method configured": {
+ inputCfg: config{
+ EventMonitoringMethod: &eventMonitoringMethod{},
+ URL: "https://some-dummy-subdomain.salesforce.com/services/oauth2/token",
+ Auth: &authConfig{
+ OAuth2: &OAuth2{
+ UserPasswordFlow: &UserPasswordFlow{Enabled: pointer(true)},
+ },
+ },
+ },
+ wantErr: errors.New(`at least one of "event_monitoring_method.event_log_file.enabled" or "event_monitoring_method.object.enabled" must be set to true`),
+ },
+ "invalid elf interval (1h)": {
+ inputCfg: config{
+ EventMonitoringMethod: &eventMonitoringMethod{
+ EventLogFile: EventMonitoringConfig{
+ Enabled: pointer(true),
+ Interval: time.Duration(0),
+ },
+ },
+ URL: "https://some-dummy-subdomain.salesforce.com/services/oauth2/token",
+ Auth: &authConfig{
+ OAuth2: &OAuth2{
+ UserPasswordFlow: &UserPasswordFlow{Enabled: pointer(true)},
+ },
+ },
+ },
+ wantErr: fmt.Errorf("not a valid interval %d", time.Duration(0)),
+ },
+ "invalid object interval (1h)": {
+ inputCfg: config{
+ EventMonitoringMethod: &eventMonitoringMethod{
+ Object: EventMonitoringConfig{
+ Enabled: pointer(true),
+ Interval: time.Duration(0),
+ },
+ },
+ URL: "https://some-dummy-subdomain.salesforce.com/services/oauth2/token",
+ Auth: &authConfig{
+ OAuth2: &OAuth2{
+ UserPasswordFlow: &UserPasswordFlow{Enabled: pointer(true)},
+ },
+ },
+ },
+ wantErr: fmt.Errorf("not a valid interval %d", time.Duration(0)),
+ },
+ "invalid api version (v45)": {
+ inputCfg: config{
+ Version: 45,
+ EventMonitoringMethod: &eventMonitoringMethod{
+ Object: EventMonitoringConfig{
+ Enabled: pointer(true),
+ Interval: time.Hour,
+ },
+ },
+ URL: "https://some-dummy-subdomain.salesforce.com/services/oauth2/token",
+ Auth: &authConfig{
+ OAuth2: &OAuth2{
+ UserPasswordFlow: &UserPasswordFlow{Enabled: pointer(true)},
+ },
+ },
+ },
+ wantErr: errors.New("not a valid version i.e., 46.0 or above"),
+ },
+ }
+
+ for name, tc := range tests {
+ t.Run(name, func(t *testing.T) {
+ got := tc.inputCfg.Validate()
+ assert.Equal(t, tc.wantErr, got)
+ })
+ }
+}
diff --git a/x-pack/filebeat/input/salesforce/helper.go b/x-pack/filebeat/input/salesforce/helper.go
new file mode 100644
index 00000000000..8869ca9aa3f
--- /dev/null
+++ b/x-pack/filebeat/input/salesforce/helper.go
@@ -0,0 +1,33 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package salesforce
+
+import "time"
+
+// timeNow wraps time.Now to mock time for tests.
+var timeNow = time.Now
+
+// mockTimeNow mocks timeNow for tests.
+func mockTimeNow(t time.Time) {
+ timeNow = func() time.Time {
+ return t
+ }
+}
+
+// resetTimeNow resets timeNow to time.Now.
+func resetTimeNow() {
+ timeNow = time.Now
+}
+
+// pointer returns a pointer to the given value.
+//
+// For example: Assigning &true to value of type *bool is not possible but
+// pointer(true) is assignable to the same value of type *bool as address operator
+// can be applied to pointer(true) as the returned value is an addressable value.
+//
+// See: https://go.dev/ref/spec#Address_operators
+func pointer[T any](d T) *T {
+ return &d
+}
diff --git a/x-pack/filebeat/input/salesforce/input.go b/x-pack/filebeat/input/salesforce/input.go
new file mode 100644
index 00000000000..1e893625ed2
--- /dev/null
+++ b/x-pack/filebeat/input/salesforce/input.go
@@ -0,0 +1,573 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package salesforce
+
+import (
+ "bytes"
+ "context"
+ "encoding/csv"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "time"
+
+ "github.com/g8rswimmer/go-sfdc"
+ "github.com/g8rswimmer/go-sfdc/credentials"
+ "github.com/g8rswimmer/go-sfdc/session"
+ "github.com/g8rswimmer/go-sfdc/soql"
+ "github.com/golang-jwt/jwt"
+ "github.com/hashicorp/go-retryablehttp"
+ "go.uber.org/zap"
+ "golang.org/x/exp/slices"
+
+ v2 "github.com/elastic/beats/v7/filebeat/input/v2"
+ inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
+ "github.com/elastic/beats/v7/libbeat/beat"
+ "github.com/elastic/beats/v7/libbeat/feature"
+ "github.com/elastic/elastic-agent-libs/logp"
+ "github.com/elastic/elastic-agent-libs/mapstr"
+ "github.com/elastic/go-concert/ctxtool"
+)
+
+const (
+ inputName = "salesforce"
+ formatRFC3339Like = "2006-01-02T15:04:05.999Z"
+)
+
+type salesforceInput struct {
+ ctx context.Context
+ publisher inputcursor.Publisher
+ cancel context.CancelCauseFunc
+ cursor *state
+ srcConfig *config
+ sfdcConfig *sfdc.Configuration
+ log *logp.Logger
+ clientSession *session.Session
+ soqlr *soql.Resource
+ config
+}
+
+// // The Filebeat user-agent is provided to the program as useragent.
+// var userAgent = useragent.UserAgent("Filebeat", version.GetDefaultVersion(), version.Commit(), version.BuildTime().String())
+
+// Plugin returns the input plugin.
+func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin {
+ return v2.Plugin{
+ Name: inputName,
+ Stability: feature.Stable,
+ Manager: NewInputManager(log, store),
+ }
+}
+
+func (s *salesforceInput) Name() string { return inputName }
+
+func (s *salesforceInput) Test(_ inputcursor.Source, _ v2.TestContext) error {
+ return nil
+}
+
+// Run starts the input and blocks until it ends completes. It will return on
+// context cancellation or type invalidity errors, any other error will be retried.
+func (s *salesforceInput) Run(env v2.Context, src inputcursor.Source, cursor inputcursor.Cursor, pub inputcursor.Publisher) (err error) {
+ st := &state{}
+ if !cursor.IsNew() {
+ if err = cursor.Unpack(&st); err != nil {
+ return err
+ }
+ }
+
+ if err = s.Setup(env, src, st, pub); err != nil {
+ return err
+ }
+
+ return s.run()
+}
+
+// Setup sets up the input. It will create a new SOQL resource and all other
+// necessary configurations.
+func (s *salesforceInput) Setup(env v2.Context, src inputcursor.Source, cursor *state, pub inputcursor.Publisher) (err error) {
+ cfg := src.(*source).cfg
+
+ ctx := ctxtool.FromCanceller(env.Cancelation)
+ childCtx, cancel := context.WithCancelCause(ctx)
+
+ s.srcConfig = &cfg
+ s.ctx = childCtx
+ s.cancel = cancel
+ s.publisher = pub
+ s.cursor = cursor
+ s.log = env.Logger.With("input_url", cfg.URL)
+ s.sfdcConfig, err = s.getSFDCConfig(&cfg)
+ if err != nil {
+ return fmt.Errorf("error with configuration: %w", err)
+ }
+
+ s.soqlr, err = s.SetupSFClientConnection() // create a new SOQL resource
+ if err != nil {
+ return fmt.Errorf("error setting up connection to Salesforce: %w", err)
+ }
+
+ return nil
+}
+
+// run is the main loop of the input. It will run until the context is cancelled
+// and based on the configuration, it will run the different methods -- EventLogFile
+// or Object to collect events at defined intervals.
+func (s *salesforceInput) run() error {
+ if s.srcConfig.EventMonitoringMethod.EventLogFile.isEnabled() {
+ err := s.RunEventLogFile()
+ if err != nil {
+ s.log.Errorf("Problem running EventLogFile collection: %s", err)
+ }
+ }
+
+ if s.srcConfig.EventMonitoringMethod.Object.isEnabled() {
+ err := s.RunObject()
+ if err != nil {
+ s.log.Errorf("Problem running Object collection: %s", err)
+ }
+ }
+
+ eventLogFileTicker, objectMethodTicker := &time.Ticker{}, &time.Ticker{}
+ eventLogFileTicker.C, objectMethodTicker.C = nil, nil
+
+ if s.srcConfig.EventMonitoringMethod.EventLogFile.isEnabled() {
+ eventLogFileTicker = time.NewTicker(s.srcConfig.EventMonitoringMethod.EventLogFile.Interval)
+ defer eventLogFileTicker.Stop()
+ }
+
+ if s.srcConfig.EventMonitoringMethod.Object.isEnabled() {
+ objectMethodTicker = time.NewTicker(s.srcConfig.EventMonitoringMethod.Object.Interval)
+ defer objectMethodTicker.Stop()
+ }
+
+ for {
+ // Always check for cancel first, to not accidentally trigger another
+ // run if the context is already cancelled, but we have already received
+ // another ticker making the channel ready.
+ select {
+ case <-s.ctx.Done():
+ return s.isError(s.ctx.Err())
+ default:
+ }
+
+ select {
+ case <-s.ctx.Done():
+ return s.isError(s.ctx.Err())
+ case <-eventLogFileTicker.C:
+ if err := s.RunEventLogFile(); err != nil {
+ s.log.Errorf("Problem running EventLogFile collection: %s", err)
+ }
+ case <-objectMethodTicker.C:
+ if err := s.RunObject(); err != nil {
+ s.log.Errorf("Problem running Object collection: %s", err)
+ }
+ }
+ }
+}
+
+func (s *salesforceInput) isError(err error) error {
+ if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+ s.log.Infof("input stopped because context was cancelled with: %v", err)
+ return nil
+ }
+
+ return err
+}
+
+func (s *salesforceInput) SetupSFClientConnection() (*soql.Resource, error) {
+ if s.sfdcConfig == nil {
+ return nil, errors.New("internal error: salesforce configuration is not set properly")
+ }
+
+ // Open creates a session using the configuration.
+ session, err := session.Open(*s.sfdcConfig)
+ if err != nil {
+ return nil, err
+ }
+
+ // Set clientSession for re-use.
+ s.clientSession = session
+
+ // Create a new SOQL resource using the session.
+ soqlr, err := soql.NewResource(session)
+ if err != nil {
+ return nil, fmt.Errorf("error setting up salesforce SOQL resource: %w", err)
+ }
+ return soqlr, nil
+}
+
+// FormQueryWithCursor takes a queryConfig and a cursor and returns a querier.
+func (s *salesforceInput) FormQueryWithCursor(queryConfig *QueryConfig, cursor mapstr.M) (*querier, error) {
+ qr, err := parseCursor(queryConfig, cursor, s.log)
+ if err != nil {
+ return nil, err
+ }
+
+ s.log.Infof("Salesforce query: %s", qr)
+
+ return &querier{Query: qr}, err
+}
+
+// isZero checks if the given value v is the zero value for its type.
+// It compares v to the zero value obtained by new(T).
+func isZero[T comparable](v T) bool {
+ return v == *new(T)
+}
+
+// RunObject runs the Object method of the Event Monitoring API to collect events.
+func (s *salesforceInput) RunObject() error {
+ s.log.Debugf("scrape object(s) every %s", s.srcConfig.EventMonitoringMethod.Object.Interval)
+
+ var cursor mapstr.M
+ if !(isZero(s.cursor.Object.FirstEventTime) && isZero(s.cursor.Object.LastEventTime)) {
+ object := make(mapstr.M)
+ if !isZero(s.cursor.Object.FirstEventTime) {
+ object.Put("first_event_time", s.cursor.Object.FirstEventTime)
+ }
+ if !isZero(s.cursor.Object.LastEventTime) {
+ object.Put("last_event_time", s.cursor.Object.LastEventTime)
+ }
+ cursor = mapstr.M{"object": object}
+ }
+
+ query, err := s.FormQueryWithCursor(s.config.EventMonitoringMethod.Object.Query, cursor)
+ if err != nil {
+ return fmt.Errorf("error forming query based on cursor: %w", err)
+ }
+
+ res, err := s.soqlr.Query(query, false)
+ if err != nil {
+ return err
+ }
+
+ totalEvents := 0
+ firstEvent := true
+
+ for res.TotalSize() > 0 {
+ for _, rec := range res.Records() {
+ val := rec.Record().Fields()
+
+ jsonStrEvent, err := json.Marshal(val)
+ if err != nil {
+ return err
+ }
+
+ if timestamp, ok := val[s.config.EventMonitoringMethod.Object.Cursor.Field].(string); ok {
+ if firstEvent {
+ s.cursor.Object.FirstEventTime = timestamp
+ }
+ s.cursor.Object.LastEventTime = timestamp
+ }
+
+ err = publishEvent(s.publisher, s.cursor, jsonStrEvent, "Object")
+ if err != nil {
+ return err
+ }
+ firstEvent = false
+ totalEvents++
+ }
+
+ if !res.MoreRecords() { // returns true if there are more records.
+ break
+ }
+
+ res, err = res.Next()
+ if err != nil {
+ return err
+ }
+ }
+ s.log.Debugf("Total events: %d", totalEvents)
+
+ return nil
+}
+
+// RunEventLogFile runs the EventLogFile method of the Event Monitoring API to
+// collect events.
+func (s *salesforceInput) RunEventLogFile() error {
+ s.log.Debugf("scrape eventLogFile(s) every %s", s.srcConfig.EventMonitoringMethod.EventLogFile.Interval)
+
+ var cursor mapstr.M
+ if !(isZero(s.cursor.Object.FirstEventTime) && isZero(s.cursor.Object.LastEventTime)) {
+ eventLogFile := make(mapstr.M)
+ if !isZero(s.cursor.Object.FirstEventTime) {
+ eventLogFile.Put("first_event_time", s.cursor.EventLogFile.FirstEventTime)
+ }
+ if !isZero(s.cursor.Object.LastEventTime) {
+ eventLogFile.Put("last_event_time", s.cursor.EventLogFile.LastEventTime)
+ }
+ cursor = mapstr.M{"event_log_file": eventLogFile}
+ }
+
+ query, err := s.FormQueryWithCursor(s.config.EventMonitoringMethod.EventLogFile.Query, cursor)
+ if err != nil {
+ return fmt.Errorf("error forming query based on cursor: %w", err)
+ }
+
+ res, err := s.soqlr.Query(query, false)
+ if err != nil {
+ return err
+ }
+
+ // NOTE: This is a failsafe check because the HTTP client is always set.
+ // This check allows unit tests to verify correct behavior when the HTTP
+ // client is nil.
+ if s.sfdcConfig.Client == nil {
+ return errors.New("internal error: salesforce configuration is not set properly")
+ }
+
+ totalEvents, firstEvent := 0, true
+ for res.TotalSize() > 0 {
+ for _, rec := range res.Records() {
+ req, err := http.NewRequestWithContext(s.ctx, http.MethodGet, s.config.URL+rec.Record().Fields()["LogFile"].(string), nil)
+ if err != nil {
+ return err
+ }
+
+ s.clientSession.AuthorizationHeader(req)
+
+ // NOTE: If we ever see a production issue relaated to this, then only
+ // we should consider adding the header: "X-PrettyPrint:1"
+ //
+ // // NOTE: X-PrettyPrint:1 is for formatted response and ideally we do
+ // // not need it. But see:
+ // // https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/dome_event_log_file_download.htm?q=X-PrettyPrint%3A1
+ // req.Header.Add("X-PrettyPrint", "1")
+
+ resp, err := s.sfdcConfig.Client.Do(req)
+ if err != nil {
+ return err
+ }
+
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ resp.Body.Close()
+ return err
+ }
+ resp.Body.Close()
+
+ recs, err := decodeAsCSV(body)
+ if err != nil {
+ return err
+ }
+
+ if timestamp, ok := rec.Record().Fields()[s.config.EventMonitoringMethod.EventLogFile.Cursor.Field].(string); ok {
+ if firstEvent {
+ s.cursor.EventLogFile.FirstEventTime = timestamp
+ }
+ s.cursor.EventLogFile.LastEventTime = timestamp
+ }
+
+ for _, val := range recs {
+ jsonStrEvent, err := json.Marshal(val)
+ if err != nil {
+ return err
+ }
+
+ err = publishEvent(s.publisher, s.cursor, jsonStrEvent, "EventLogFile")
+ if err != nil {
+ return err
+ }
+ totalEvents++
+ }
+ firstEvent = false
+ }
+
+ if !res.MoreRecords() {
+ break
+ }
+
+ res, err = res.Next()
+ if err != nil {
+ return err
+ }
+ }
+ s.log.Debugf("Total events: %d", totalEvents)
+
+ return nil
+}
+
+// getSFDCConfig returns a new Salesforce configuration based on the configuration.
+func (s *salesforceInput) getSFDCConfig(cfg *config) (*sfdc.Configuration, error) {
+ var (
+ creds *credentials.Credentials
+ err error
+ )
+
+ if cfg.Auth == nil {
+ return nil, errors.New("no auth provider enabled")
+ }
+
+ switch {
+ case cfg.Auth.OAuth2.JWTBearerFlow != nil && cfg.Auth.OAuth2.JWTBearerFlow.isEnabled():
+ pemBytes, err := os.ReadFile(cfg.Auth.OAuth2.JWTBearerFlow.ClientKeyPath)
+ if err != nil {
+ return nil, fmt.Errorf("problem with client key path for JWT auth: %w", err)
+ }
+
+ signKey, err := jwt.ParseRSAPrivateKeyFromPEM(pemBytes)
+ if err != nil {
+ return nil, fmt.Errorf("problem with client key for JWT auth: %w", err)
+ }
+
+ passCreds := credentials.JwtCredentials{
+ URL: cfg.Auth.OAuth2.JWTBearerFlow.URL,
+ ClientId: cfg.Auth.OAuth2.JWTBearerFlow.ClientID,
+ ClientUsername: cfg.Auth.OAuth2.JWTBearerFlow.ClientUsername,
+ ClientKey: signKey,
+ }
+
+ creds, err = credentials.NewJWTCredentials(passCreds)
+ if err != nil {
+ return nil, fmt.Errorf("error creating jwt credentials: %w", err)
+ }
+
+ case cfg.Auth.OAuth2.UserPasswordFlow != nil && cfg.Auth.OAuth2.UserPasswordFlow.isEnabled():
+ passCreds := credentials.PasswordCredentials{
+ URL: cfg.Auth.OAuth2.UserPasswordFlow.TokenURL,
+ Username: cfg.Auth.OAuth2.UserPasswordFlow.Username,
+ Password: cfg.Auth.OAuth2.UserPasswordFlow.Password,
+ ClientID: cfg.Auth.OAuth2.UserPasswordFlow.ClientID,
+ ClientSecret: cfg.Auth.OAuth2.UserPasswordFlow.ClientSecret,
+ }
+
+ creds, err = credentials.NewPasswordCredentials(passCreds)
+ if err != nil {
+ return nil, fmt.Errorf("error creating password credentials: %w", err)
+ }
+
+ }
+
+ client, err := newClient(*cfg, s.log)
+ if err != nil {
+ return nil, fmt.Errorf("problem with client: %w", err)
+ }
+
+ return &sfdc.Configuration{
+ Credentials: creds,
+ Client: client,
+ Version: cfg.Version,
+ }, nil
+}
+
+// retryLog is a shim for the retryablehttp.Client.Logger.
+type retryLog struct{ log *logp.Logger }
+
+func newRetryLog(log *logp.Logger) *retryLog {
+ return &retryLog{log: log.Named("retryablehttp").WithOptions(zap.AddCallerSkip(1))}
+}
+
+func (l *retryLog) Error(msg string, kv ...interface{}) { l.log.Errorw(msg, kv...) }
+func (l *retryLog) Info(msg string, kv ...interface{}) { l.log.Infow(msg, kv...) }
+func (l *retryLog) Debug(msg string, kv ...interface{}) { l.log.Debugw(msg, kv...) }
+func (l *retryLog) Warn(msg string, kv ...interface{}) { l.log.Warnw(msg, kv...) }
+
+// retryErrorHandler returns a retryablehttp.ErrorHandler that will log retry resignation
+// but return the last retry attempt's response and a nil error to allow the retryablehttp.Client
+// evaluate the response status itself. Any error passed to the retryablehttp.ErrorHandler
+// is returned unaltered.
+func retryErrorHandler(max int, log *logp.Logger) retryablehttp.ErrorHandler {
+ return func(resp *http.Response, err error, numTries int) (*http.Response, error) {
+ log.Warnw("giving up retries", "method", resp.Request.Method, "url", resp.Request.URL, "retries", max+1)
+ return resp, err
+ }
+}
+
+func newClient(cfg config, log *logp.Logger) (*http.Client, error) {
+ c, err := cfg.Resource.Transport.Client()
+ if err != nil {
+ return nil, err
+ }
+
+ if maxAttempts := cfg.Resource.Retry.getMaxAttempts(); maxAttempts > 1 {
+ c = (&retryablehttp.Client{
+ HTTPClient: c,
+ Logger: newRetryLog(log),
+ RetryWaitMin: cfg.Resource.Retry.getWaitMin(),
+ RetryWaitMax: cfg.Resource.Retry.getWaitMax(),
+ RetryMax: maxAttempts,
+ CheckRetry: retryablehttp.DefaultRetryPolicy,
+ Backoff: retryablehttp.DefaultBackoff,
+ ErrorHandler: retryErrorHandler(maxAttempts, log),
+ }).StandardClient()
+
+ // BUG: retryablehttp ignores the timeout previously set. So, setting it
+ // again.
+ c.Timeout = cfg.Resource.Transport.Timeout
+ }
+
+ return c, nil
+}
+
+// publishEvent publishes an event using the configured publisher pub.
+func publishEvent(pub inputcursor.Publisher, cursor *state, jsonStrEvent []byte, dataCollectionMethod string) error {
+ event := beat.Event{
+ Timestamp: timeNow(),
+ Fields: mapstr.M{
+ "message": string(jsonStrEvent),
+ "event": mapstr.M{
+ "provider": dataCollectionMethod,
+ },
+ },
+ }
+
+ return pub.Publish(event, cursor)
+}
+
+type textContextError struct {
+ error
+ body []byte
+}
+
+// decodeAsCSV decodes p as a headed CSV document into dst.
+func decodeAsCSV(p []byte) ([]map[string]string, error) {
+ r := csv.NewReader(bytes.NewReader(p))
+
+ // To share the backing array for performance.
+ r.ReuseRecord = true
+
+ // Header row is always expected, otherwise we can't map values to keys in
+ // the event.
+ header, err := r.Read()
+ if err != nil {
+ if err == io.EOF { //nolint:errorlint // csv.Reader never wraps io.EOF.
+ return nil, nil
+ }
+ return nil, err
+ }
+
+ // As buffer reuse is enabled, copying header is important.
+ header = slices.Clone(header)
+
+ var results []map[string]string //nolint:prealloc // not sure about the size to prealloc with
+
+ // NOTE:
+ //
+ // Read sets `r.FieldsPerRecord` to the number of fields in the first record,
+ // so that future records must have the same field count.
+ // So, if len(header) != len(event), the Read will return an error and hence
+ // we need not put an explicit check.
+ event, err := r.Read()
+ for ; err == nil; event, err = r.Read() {
+ if err != nil {
+ continue
+ }
+ o := make(map[string]string, len(header))
+ for i, h := range header {
+ o[h] = event[i]
+ }
+ results = append(results, o)
+ }
+
+ if err != nil {
+ if err != io.EOF { //nolint:errorlint // csv.Reader never wraps io.EOF.
+ return nil, textContextError{error: err, body: p}
+ }
+ }
+
+ return results, nil
+}
diff --git a/x-pack/filebeat/input/salesforce/input_manager.go b/x-pack/filebeat/input/salesforce/input_manager.go
new file mode 100644
index 00000000000..1fb0ae42e91
--- /dev/null
+++ b/x-pack/filebeat/input/salesforce/input_manager.go
@@ -0,0 +1,89 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package salesforce
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/elastic/go-concert/unison"
+
+ v2 "github.com/elastic/beats/v7/filebeat/input/v2"
+ inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
+ conf "github.com/elastic/elastic-agent-libs/config"
+ "github.com/elastic/elastic-agent-libs/logp"
+ "github.com/elastic/elastic-agent-libs/transport/httpcommon"
+)
+
+// compile-time check if querier implements InputManager
+var _ v2.InputManager = InputManager{}
+
+// InputManager wraps one stateless input manager
+// and one cursor input manager. It will create one or the other
+// based on the config that is passed.
+type InputManager struct {
+ cursor *inputcursor.InputManager
+}
+
+// NewInputManager creates a new input manager.
+func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManager {
+ return InputManager{
+ cursor: &inputcursor.InputManager{
+ Logger: log,
+ StateStore: store,
+ Type: inputName,
+ Configure: cursorConfigure,
+ },
+ }
+}
+
+func defaultConfig() config {
+ apiVersion := 58
+ maxAttempts := 5
+ waitMin := time.Second
+ waitMax := time.Minute
+ transport := httpcommon.DefaultHTTPTransportSettings()
+ transport.Timeout = 30 * time.Second
+
+ return config{
+ Version: apiVersion,
+ Resource: &resourceConfig{
+ Transport: transport,
+ Retry: retryConfig{
+ MaxAttempts: &maxAttempts,
+ WaitMin: &waitMin,
+ WaitMax: &waitMax,
+ },
+ },
+ }
+}
+
+// cursorConfigure configures the cursor input manager.
+func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, error) {
+ config := defaultConfig()
+ if err := cfg.Unpack(&config); err != nil {
+ return nil, nil, fmt.Errorf("reading config: %w", err)
+ }
+ sources := []inputcursor.Source{&source{cfg: config}}
+ return sources, &salesforceInput{config: config}, nil
+}
+
+type source struct{ cfg config }
+
+func (s *source) Name() string { return s.cfg.URL }
+
+// Init initializes both wrapped input managers.
+func (m InputManager) Init(grp unison.Group, mode v2.Mode) error {
+ return m.cursor.Init(grp, mode)
+}
+
+// Create creates a cursor input manager.
+func (m InputManager) Create(cfg *conf.C) (v2.Input, error) {
+ config := defaultConfig()
+ if err := cfg.Unpack(&config); err != nil {
+ return nil, err
+ }
+ return m.cursor.Create(cfg)
+}
diff --git a/x-pack/filebeat/input/salesforce/input_manager_test.go b/x-pack/filebeat/input/salesforce/input_manager_test.go
new file mode 100644
index 00000000000..90647e9d302
--- /dev/null
+++ b/x-pack/filebeat/input/salesforce/input_manager_test.go
@@ -0,0 +1,82 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package salesforce
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+
+ v2 "github.com/elastic/beats/v7/filebeat/input/v2"
+ cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
+ "github.com/elastic/beats/v7/libbeat/statestore"
+ "github.com/elastic/beats/v7/libbeat/statestore/storetest"
+ conf "github.com/elastic/elastic-agent-libs/config"
+ "github.com/elastic/elastic-agent-libs/logp"
+ "github.com/elastic/go-concert/unison"
+)
+
+func makeTestStore(data map[string]interface{}) *statestore.Store {
+ memstore := &storetest.MapStore{Table: data}
+ reg := statestore.NewRegistry(&storetest.MemoryStore{
+ Stores: map[string]*storetest.MapStore{
+ "test": memstore,
+ },
+ })
+ store, err := reg.Get("test")
+ if err != nil {
+ panic("failed to create test store")
+ }
+ return store
+}
+
+type stateStore struct{}
+
+func (stateStore) Access() (*statestore.Store, error) {
+ return makeTestStore(map[string]interface{}{"hello": "world"}), nil
+}
+func (stateStore) CleanupInterval() time.Duration { return time.Duration(0) }
+
+// compile-time check if stateStore implements cursor.StateStore
+var _ cursor.StateStore = stateStore{}
+
+func TestInputManager(t *testing.T) {
+ inputManager := NewInputManager(logp.NewLogger("salesforce_test"), stateStore{})
+
+ var inputTaskGroup unison.TaskGroup
+ defer inputTaskGroup.Stop() //nolint:errcheck // ignore error in test
+
+ err := inputManager.Init(&inputTaskGroup, v2.ModeRun)
+ assert.NoError(t, err)
+
+ config, err := conf.NewConfigFrom(map[string]interface{}{
+ "url": "https://salesforce.com",
+ "version": 46,
+ "auth": &authConfig{
+ OAuth2: &OAuth2{JWTBearerFlow: &JWTBearerFlow{
+ Enabled: pointer(true),
+ URL: "https://salesforce.com",
+ ClientID: "xyz",
+ ClientUsername: "xyz",
+ ClientKeyPath: "xyz",
+ }},
+ },
+ "event_monitoring_method": &eventMonitoringMethod{
+ Object: EventMonitoringConfig{Enabled: pointer(true), Interval: 4},
+ },
+ })
+ assert.NoError(t, err)
+
+ _, err = inputManager.Create(config)
+ assert.NoError(t, err)
+}
+
+func TestSource(t *testing.T) {
+ want := "https://salesforce.com"
+ src := source{cfg: config{URL: want}}
+ got := src.Name()
+ assert.Equal(t, want, got)
+}
diff --git a/x-pack/filebeat/input/salesforce/input_test.go b/x-pack/filebeat/input/salesforce/input_test.go
new file mode 100644
index 00000000000..a579036e4d8
--- /dev/null
+++ b/x-pack/filebeat/input/salesforce/input_test.go
@@ -0,0 +1,803 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package salesforce
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "reflect"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/g8rswimmer/go-sfdc"
+ "github.com/g8rswimmer/go-sfdc/soql"
+ "github.com/google/go-cmp/cmp"
+ "github.com/stretchr/testify/assert"
+
+ inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
+ "github.com/elastic/beats/v7/libbeat/beat"
+ "github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
+ conf "github.com/elastic/elastic-agent-libs/config"
+ "github.com/elastic/elastic-agent-libs/logp"
+ "github.com/elastic/elastic-agent-libs/mapstr"
+ "github.com/elastic/elastic-agent-libs/transport/httpcommon"
+)
+
+const (
+ PaginationFlow = "PaginationFlow"
+ NoPaginationFlow = "NoPaginationFlow"
+ IntervalFlow = "IntervalFlow"
+ BadReponseFlow = "BadReponseFlow"
+
+ defaultLoginObjectQuery = "SELECT FIELDS(STANDARD) FROM LoginEvent"
+ valueLoginObjectQuery = "SELECT FIELDS(STANDARD) FROM LoginEvent WHERE EventDate > [[ .cursor.object.first_event_time ]]"
+ defaultLoginObjectQueryWithCursor = "SELECT FIELDS(STANDARD) FROM LoginEvent WHERE EventDate > 2023-12-06T05:44:24.973+0000"
+
+ defaultLoginEventLogFileQuery = "SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' ORDER BY CreatedDate ASC NULLS FIRST"
+ valueLoginEventLogFileQuery = "SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' AND CreatedDate > [[ .cursor.event_log_file.last_event_time ]] ORDER BY CreatedDate ASC NULLS FIRST"
+
+ invalidDefaultLoginEventObjectQuery = "SELECT FIELDS(STANDARD) FROM LoginEvnt"
+ invalidDefaultLoginEventLogFileQuery = "SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' ORDER BY ASC NULLS FIRST"
+
+ invalidValueLoginObjectQuery = "SELECT FIELDS(STANDARD) FROM LoginEvent WHERE EventDate > [[ .cursor.object.first_event ]]"
+ invalidValueLoginEventLogFileQuery = "SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' AND CreatedDate > [[ .cursor.event_log_file.last_event ]] ORDER BY CreatedDate ASC NULLS FIRST"
+
+ oneEventLogfileFirstResponseJSON = `{ "totalSize": 1, "done": true, "records": [ { "attributes": { "type": "EventLogFile", "url": "/services/data/v58.0/sobjects/EventLogFile/0AT5j00002LqQTxGAN" }, "Id": "0AT5j00002LqQTxGAN", "CreatedDate": "2023-12-19T21:04:35.000+0000", "LogDate": "2023-12-18T00:00:00.000+0000", "LogFile": "/services/data/v58.0/sobjects/EventLogFile/0AT5j00002LqQTxGAN/LogFile" } ] }`
+ oneEventLogfileSecondResponseCSV = `"EVENT_TYPE","TIMESTAMP","REQUEST_ID","ORGANIZATION_ID","USER_ID","RUN_TIME","CPU_TIME","URI","SESSION_KEY","LOGIN_KEY","USER_TYPE","REQUEST_STATUS","DB_TOTAL_TIME","LOGIN_TYPE","BROWSER_TYPE","API_TYPE","API_VERSION","USER_NAME","TLS_PROTOCOL","CIPHER_SUITE","AUTHENTICATION_METHOD_REFERENCE","LOGIN_SUB_TYPE","TIMESTAMP_DERIVED","USER_ID_DERIVED","CLIENT_IP","URI_ID_DERIVED","LOGIN_STATUS","SOURCE_IP"
+"Login","20231218054831.655","4u6LyuMrDvb_G-l1cJIQk-","00D5j00000DgAYG","0055j00000AT6I1","1219","127","/services/oauth2/token","","bY5Wfv8t/Ith7WVE","Standard","","1051271151","i","Go-http-client/1.1","","9998.0","salesforceinstance@devtest.in","TLSv1.2","ECDHE-RSA-AES256-GCM-SHA384","","","2023-12-18T05:48:31.655Z","0055j00000AT6I1AAL","Salesforce.com IP","","LOGIN_NO_ERROR","103.108.207.58"
+`
+
+ expectedELFEvent = `{"API_TYPE":"","API_VERSION":"9998.0","AUTHENTICATION_METHOD_REFERENCE":"","BROWSER_TYPE":"Go-http-client/1.1","CIPHER_SUITE":"ECDHE-RSA-AES256-GCM-SHA384","CLIENT_IP":"Salesforce.com IP","CPU_TIME":"127","DB_TOTAL_TIME":"1051271151","EVENT_TYPE":"Login","LOGIN_KEY":"bY5Wfv8t/Ith7WVE","LOGIN_STATUS":"LOGIN_NO_ERROR","LOGIN_SUB_TYPE":"","LOGIN_TYPE":"i","ORGANIZATION_ID":"00D5j00000DgAYG","REQUEST_ID":"4u6LyuMrDvb_G-l1cJIQk-","REQUEST_STATUS":"","RUN_TIME":"1219","SESSION_KEY":"","SOURCE_IP":"103.108.207.58","TIMESTAMP":"20231218054831.655","TIMESTAMP_DERIVED":"2023-12-18T05:48:31.655Z","TLS_PROTOCOL":"TLSv1.2","URI":"/services/oauth2/token","URI_ID_DERIVED":"","USER_ID":"0055j00000AT6I1","USER_ID_DERIVED":"0055j00000AT6I1AAL","USER_NAME":"salesforceinstance@devtest.in","USER_TYPE":"Standard"}`
+
+ oneObjectEvents = `{ "totalSize": 1, "done": true, "records": [ { "attributes": { "type": "LoginEvent", "url": "/services/data/v58.0/sobjects/LoginEvent/000000000000000AAA" }, "AdditionalInfo": "{}", "ApiType": "N/A", "ApiVersion": "N/A", "Application": "salesforce_test", "Browser": "Unknown", "CipherSuite": "ECDHE-RSA-AES256-GCM-SHA384", "City": "Mumbai", "ClientVersion": "N/A", "Country": "India", "CountryIso": "IN", "CreatedDate": "2023-12-06T05:44:34.942+0000", "EvaluationTime": 0, "EventDate": "2023-12-06T05:44:24.973+0000", "EventIdentifier": "00044326-ed4a-421a-a0a8-e62ea626f3af", "HttpMethod": "POST", "Id": "000000000000000AAA", "LoginGeoId": "04F5j00003NvV1cEAF", "LoginHistoryId": "0Ya5j00003k2scQCAQ", "LoginKey": "pgOVoLbV96U9o08W", "LoginLatitude": 19.0748, "LoginLongitude": 72.8856, "LoginType": "Remote Access 2.0", "LoginUrl": "login.salesforce.com", "Platform": "Unknown", "PostalCode": "400070", "SessionLevel": "STANDARD", "SourceIp": "134.238.252.19", "Status": "Success", "Subdivision": "Maharashtra", "TlsProtocol": "TLS 1.2", "UserId": "0055j00000AT6I1AAL", "UserType": "Standard", "Username": "salesforceinstance@devtest.in" } ] }`
+ oneObjectEventsPageOne = `{ "totalSize": 1, "done": true, "nextRecordsUrl": "/nextRecords/LoginEvents/ABCABCDABCDE", "records": [ { "attributes": { "type": "LoginEvent", "url": "/services/data/v58.0/sobjects/LoginEvent/000000000000000AAA" }, "AdditionalInfo": "{}", "ApiType": "N/A", "ApiVersion": "N/A", "Application": "salesforce_test", "Browser": "Unknown", "CipherSuite": "ECDHE-RSA-AES256-GCM-SHA384", "City": "Mumbai", "ClientVersion": "N/A", "Country": "India", "CountryIso": "IN", "CreatedDate": "2023-12-06T05:44:34.942+0000", "EvaluationTime": 0, "EventDate": "2023-12-06T05:44:24.973+0000", "EventIdentifier": "00044326-ed4a-421a-a0a8-e62ea626f3af", "HttpMethod": "POST", "Id": "000000000000000AAA", "LoginGeoId": "04F5j00003NvV1cEAF", "LoginHistoryId": "0Ya5j00003k2scQCAQ", "LoginKey": "pgOVoLbV96U9o08W", "LoginLatitude": 19.0748, "LoginLongitude": 72.8856, "LoginType": "Remote Access 2.0", "LoginUrl": "login.salesforce.com", "Platform": "Unknown", "PostalCode": "400070", "SessionLevel": "STANDARD", "SourceIp": "134.238.252.19", "Status": "Success", "Subdivision": "Maharashtra", "TlsProtocol": "TLS 1.2", "UserId": "0055j00000AT6I1AAL", "UserType": "Standard", "Username": "salesforceinstance@devtest.in" } ] }`
+ oneObjectEventsPageTwo = `{ "totalSize": 1, "done": true, "records": [ { "attributes": { "type": "LoginEvent", "url": "/services/data/v58.0/sobjects/LoginEvent/000000000000000AAA" }, "AdditionalInfo": "{}", "ApiType": "N/A", "ApiVersion": "N/A", "Application": "salesforce_test", "Browser": "Unknown", "CipherSuite": "ECDHE-RSA-AES256-GCM-SHA384", "City": "Mumbai", "ClientVersion": "N/A", "Country": "India", "CountryIso": "IN", "CreatedDate": "2023-12-06T05:44:34.942+0000", "EvaluationTime": 0, "EventDate": "2023-12-06T05:44:24.973+0000", "EventIdentifier": "00044326-ed4a-421a-a0a8-e62ea626f3af", "HttpMethod": "POST", "Id": "000000000000000AAA", "LoginGeoId": "04F5j00003NvV1cEAF", "LoginHistoryId": "0Ya5j00003k2scQCAQ", "LoginKey": "pgOVoLbV96U9o08W", "LoginLatitude": 19.0748, "LoginLongitude": 72.8856, "LoginType": "Remote Access 2.0", "LoginUrl": "login.salesforce.com", "Platform": "Unknown", "PostalCode": "400070", "SessionLevel": "STANDARD", "SourceIp": "134.238.252.19", "Status": "Success", "Subdivision": "Maharashtra", "TlsProtocol": "TLS 1.2", "UserId": "0055j00000AT6I1AAL", "UserType": "Standard", "Username": "salesforceinstance@devtest.in" } ] }`
+
+ expectedObjectEvent = `{"AdditionalInfo":"{}","ApiType":"N/A","ApiVersion":"N/A","Application":"salesforce_test","Browser":"Unknown","CipherSuite":"ECDHE-RSA-AES256-GCM-SHA384","City":"Mumbai","ClientVersion":"N/A","Country":"India","CountryIso":"IN","CreatedDate":"2023-12-06T05:44:34.942+0000","EvaluationTime":0,"EventDate":"2023-12-06T05:44:24.973+0000","EventIdentifier":"00044326-ed4a-421a-a0a8-e62ea626f3af","HttpMethod":"POST","Id":"000000000000000AAA","LoginGeoId":"04F5j00003NvV1cEAF","LoginHistoryId":"0Ya5j00003k2scQCAQ","LoginKey":"pgOVoLbV96U9o08W","LoginLatitude":19.0748,"LoginLongitude":72.8856,"LoginType":"Remote Access 2.0","LoginUrl":"login.salesforce.com","Platform":"Unknown","PostalCode":"400070","SessionLevel":"STANDARD","SourceIp":"134.238.252.19","Status":"Success","Subdivision":"Maharashtra","TlsProtocol":"TLS 1.2","UserId":"0055j00000AT6I1AAL","UserType":"Standard","Username":"salesforceinstance@devtest.in"}`
+)
+
+func TestFormQueryWithCursor(t *testing.T) {
+ logp.TestingSetup()
+
+ mockTimeNow(time.Date(2023, time.May, 18, 12, 0, 0, 0, time.UTC))
+ t.Cleanup(resetTimeNow)
+
+ tests := map[string]struct {
+ wantErr error
+ cursor mapstr.M
+ defaultSOQLTemplate string
+ valueSOQLTemplate string
+ wantQuery string
+ initialInterval time.Duration
+ }{
+ "valid soql templates with nil cursor": { // expect default query with LogDate > initialInterval
+ initialInterval: 60 * 24 * time.Hour, // 60 * 24h = 1440h = 60 days = 2 months
+ defaultSOQLTemplate: `SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' AND Logdate > [[ (formatTime (now.Add (parseDuration "-1440h")) "RFC3339") ]] ORDER BY CreatedDate ASC NULLS FIRST`,
+ valueSOQLTemplate: "SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' AND CreatedDate > [[ .cursor.logdate ]] ORDER BY CreatedDate ASC NULLS FIRST",
+ wantQuery: "SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' AND Logdate > 2023-03-19T12:00:00Z ORDER BY CreatedDate ASC NULLS FIRST",
+ cursor: nil,
+ },
+ "valid soql templates with non-empty .cursor.object.logdate": { // expect value SOQL query with .cursor.object.logdate set
+ initialInterval: 60 * 24 * time.Hour, // 60 * 24h = 1440h = 60 days = 2 months
+ defaultSOQLTemplate: `SELECT Id,CreatedDate,LogDate,LogFile FROM LoginEvent WHERE EventDate > [[ (formatTime (now.Add (parseDuration "-1440h")) "RFC3339") ]]`,
+ valueSOQLTemplate: "SELECT Id,CreatedDate,LogDate,LogFile FROM LoginEvent WHERE CreatedDate > [[ .cursor.object.logdate ]]",
+ wantQuery: "SELECT Id,CreatedDate,LogDate,LogFile FROM LoginEvent WHERE CreatedDate > 2023-05-18T12:00:00Z",
+ cursor: mapstr.M{"object": mapstr.M{"logdate": timeNow().Format(formatRFC3339Like)}},
+ },
+ "valid soql templates with non-empty .cursor.event_log_file.logdate": { // expect value SOQL query with .cursor.event_log_file.logdate set
+ initialInterval: 60 * 24 * time.Hour, // 60 * 24h = 1440h = 60 days = 2 months
+ defaultSOQLTemplate: `SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' AND Logdate > [[ (formatTime (now.Add (parseDuration "-1440h")) "RFC3339") ]] ORDER BY CreatedDate ASC NULLS FIRST`,
+ valueSOQLTemplate: "SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' AND CreatedDate > [[ .cursor.event_log_file.logdate ]] ORDER BY CreatedDate ASC NULLS FIRST",
+ wantQuery: "SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' AND CreatedDate > 2023-05-18T12:00:00Z ORDER BY CreatedDate ASC NULLS FIRST",
+ cursor: mapstr.M{"event_log_file": mapstr.M{"logdate": timeNow().Format(formatRFC3339Like)}},
+ },
+ "invalid soql templates wrong cursor name .cursor.event_log_file.logdate1": { // expect value SOQL query with .cursor.event_log_file.logdate set
+ initialInterval: 60 * 24 * time.Hour, // 60 * 24h = 1440h = 60 days = 2 months
+ defaultSOQLTemplate: `SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' AND Logdate > [[ (formatTime (now.Add (parseDuration "-1440h")) "RFC3339") ]] ORDER BY CreatedDate ASC NULLS FIRST`,
+ valueSOQLTemplate: "SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' AND CreatedDate > [[ .cursor.event_log_file.logdate1 ]] ORDER BY CreatedDate ASC NULLS FIRST",
+ wantQuery: "SELECT Id,CreatedDate,LogDate,LogFile FROM EventLogFile WHERE EventType = 'Login' AND CreatedDate > 2023-05-18T12:00:00Z ORDER BY CreatedDate ASC NULLS FIRST",
+ cursor: mapstr.M{"event_log_file": mapstr.M{"logdate": timeNow().Format(formatRFC3339Like)}},
+ wantErr: errors.New(`template: :1:110: executing "" at <.cursor.event_log_file.logdate1>: map has no entry for key "logdate1"`),
+ },
+ }
+
+ for name, tc := range tests {
+ t.Run(name, func(t *testing.T) {
+ v1, v2 := &valueTpl{}, &valueTpl{}
+
+ err := v1.Unpack(tc.defaultSOQLTemplate)
+ assert.NoError(t, err)
+
+ err = v2.Unpack(tc.valueSOQLTemplate)
+ assert.NoError(t, err)
+
+ queryConfig := &QueryConfig{
+ Default: v1,
+ Value: v2,
+ }
+
+ sfInput := &salesforceInput{
+ config: config{},
+ log: logp.NewLogger("salesforce_test"),
+ }
+
+ querier, err := sfInput.FormQueryWithCursor(queryConfig, tc.cursor)
+ if fmt.Sprint(tc.wantErr) != fmt.Sprint(err) {
+ t.Errorf("got error %v, want error %v", err, tc.wantErr)
+ }
+ if tc.wantErr != nil {
+ return
+ }
+
+ assert.EqualValues(t, tc.wantQuery, querier.Query)
+ })
+ }
+}
+
+var (
+ defaultUserPasswordFlowMap = map[string]interface{}{
+ "user_password_flow": map[string]interface{}{
+ "enabled": true,
+ "client.id": "clientid",
+ "client.secret": "clientsecret",
+ "token_url": "https://instance_id.develop.my.salesforce.com/services/oauth2/token",
+ "username": "username",
+ "password": "password",
+ },
+ }
+ wrongUserPasswordFlowMap = map[string]interface{}{
+ "user_password_flow": map[string]interface{}{
+ "enabled": true,
+ "client.id": "clientid-wrong",
+ "client.secret": "clientsecret-wrong",
+ "token_url": "https://instance_id.develop.my.salesforce.com/services/oauth2/token",
+ "username": "username-wrong",
+ "password": "password-wrong",
+ },
+ }
+
+ defaultObjectMonitoringMethodConfigMap = map[string]interface{}{
+ "interval": "5s",
+ "enabled": true,
+ "query": map[string]interface{}{
+ "default": defaultLoginObjectQuery,
+ "value": valueLoginObjectQuery,
+ },
+ "cursor": map[string]interface{}{
+ "field": "EventDate",
+ },
+ }
+ defaultEventLogFileMonitoringMethodMap = map[string]interface{}{
+ "interval": "5s",
+ "enabled": true,
+ "query": map[string]interface{}{
+ "default": defaultLoginEventLogFileQuery,
+ "value": valueLoginEventLogFileQuery,
+ },
+ "cursor": map[string]interface{}{
+ "field": "CreatedDate",
+ },
+ }
+
+ invalidObjectMonitoringMethodMap = map[string]interface{}{
+ "interval": "5m",
+ "enabled": true,
+ "query": map[string]interface{}{
+ "default": invalidDefaultLoginEventObjectQuery,
+ "value": valueLoginEventLogFileQuery,
+ },
+ "cursor": map[string]interface{}{
+ "field": "CreatedDate",
+ },
+ }
+ invalidEventLogFileMonitoringMethodMap = map[string]interface{}{
+ "interval": "5m",
+ "enabled": true,
+ "query": map[string]interface{}{
+ "default": invalidDefaultLoginEventLogFileQuery,
+ "value": invalidValueLoginEventLogFileQuery,
+ },
+ "cursor": map[string]interface{}{
+ "field": "CreatedDate",
+ },
+ }
+)
+
+func TestInput(t *testing.T) {
+ logp.TestingSetup()
+
+ tests := []struct {
+ setupServer func(testing.TB, http.HandlerFunc, map[string]interface{})
+ baseConfig map[string]interface{}
+ handler http.HandlerFunc
+ persistentCursor *state
+ name string
+ expected []string
+ timeout time.Duration
+ wantErr bool
+ AuthFail bool
+ }{
+ // Object
+ {
+ name: "Positive/event_monitoring_method_object_with_default_query_only",
+ setupServer: newTestServer(httptest.NewServer),
+ baseConfig: map[string]interface{}{
+ "version": 56,
+ "auth.oauth2": defaultUserPasswordFlowMap,
+ "event_monitoring_method": map[string]interface{}{
+ "object": defaultObjectMonitoringMethodConfigMap,
+ },
+ },
+ handler: defaultHandler(NoPaginationFlow, false, "", oneObjectEvents),
+ expected: []string{expectedObjectEvent},
+ },
+ {
+ name: "Negative/event_monitoring_method_object_with_error_in_data_collection",
+ setupServer: newTestServer(httptest.NewServer),
+ baseConfig: map[string]interface{}{
+ "version": 56,
+ "auth.oauth2": defaultUserPasswordFlowMap,
+ "event_monitoring_method": map[string]interface{}{
+ "object": invalidObjectMonitoringMethodMap,
+ },
+ },
+ handler: defaultHandler(NoPaginationFlow, false, "", `{"error": "invalid_query"}`),
+ wantErr: true,
+ },
+ {
+ name: "Positive/event_monitoring_method_object_with_interval_5s",
+ setupServer: newTestServer(httptest.NewServer),
+ baseConfig: map[string]interface{}{
+ "version": 56,
+ "auth.oauth2": defaultUserPasswordFlowMap,
+ "event_monitoring_method": map[string]interface{}{
+ "object": defaultObjectMonitoringMethodConfigMap,
+ },
+ },
+ handler: defaultHandler(IntervalFlow, false, "", oneObjectEventsPageTwo),
+ expected: []string{expectedObjectEvent, expectedObjectEvent},
+ timeout: 20 * time.Second,
+ },
+ {
+ name: "Positive/event_monitoring_method_object_with_Pagination",
+ setupServer: newTestServer(httptest.NewServer),
+ baseConfig: map[string]interface{}{
+ "version": 56,
+ "auth.oauth2": defaultUserPasswordFlowMap,
+ "event_monitoring_method": map[string]interface{}{
+ "object": defaultObjectMonitoringMethodConfigMap,
+ },
+ },
+ handler: defaultHandler(PaginationFlow, false, oneObjectEventsPageOne, oneObjectEventsPageTwo),
+ expected: []string{expectedObjectEvent, expectedObjectEvent},
+ },
+
+ // EventLogFile
+ {
+ name: "Positive/event_monitoring_method_elf_with_default_query_only",
+ setupServer: newTestServer(httptest.NewServer),
+ baseConfig: map[string]interface{}{
+ "version": 56,
+ "auth.oauth2": defaultUserPasswordFlowMap,
+ "event_monitoring_method": map[string]interface{}{
+ "event_log_file": defaultEventLogFileMonitoringMethodMap,
+ },
+ },
+ handler: defaultHandler(NoPaginationFlow, false, oneEventLogfileFirstResponseJSON, oneEventLogfileSecondResponseCSV),
+ expected: []string{expectedELFEvent},
+ },
+ {
+ name: "Negative/event_monitoring_method_elf_with_error_in_auth",
+ setupServer: newTestServer(httptest.NewServer),
+ baseConfig: map[string]interface{}{
+ "version": 56,
+ "auth.oauth2": wrongUserPasswordFlowMap,
+ "event_monitoring_method": map[string]interface{}{
+ "event_log_file": defaultEventLogFileMonitoringMethodMap,
+ },
+ },
+ handler: defaultHandler(NoPaginationFlow, false, "", `{"error": "invalid_client_id"}`),
+ wantErr: true,
+ AuthFail: true,
+ },
+ {
+ name: "Negative/event_monitoring_method_elf_with_error_in_data_collection",
+ setupServer: newTestServer(httptest.NewServer),
+ baseConfig: map[string]interface{}{
+ "version": 56,
+ "auth.oauth2": defaultUserPasswordFlowMap,
+ "event_monitoring_method": map[string]interface{}{
+ "event_log_file": invalidEventLogFileMonitoringMethodMap,
+ },
+ },
+ handler: defaultHandler(NoPaginationFlow, false, "", `{"error": "invalid_query"}`),
+ wantErr: true,
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ tc.setupServer(t, tc.handler, tc.baseConfig)
+
+ cfg := defaultConfig()
+ err := conf.MustNewConfigFrom(tc.baseConfig).Unpack(&cfg)
+ assert.NoError(t, err)
+ timeout := 5 * time.Second
+ if tc.timeout != 0 {
+ timeout = tc.timeout
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ var client publisher
+ client.done = func() {
+ if len(client.published) >= len(tc.expected) {
+ cancel()
+ }
+ }
+
+ salesforceInput := salesforceInput{config: cfg}
+ assert.Equal(t, "salesforce", salesforceInput.Name())
+
+ ctx, cancelClause := context.WithCancelCause(ctx)
+
+ salesforceInput.cursor = &state{}
+ if tc.persistentCursor != nil {
+ salesforceInput.cursor = tc.persistentCursor
+ }
+ salesforceInput.ctx = ctx
+ salesforceInput.cancel = cancelClause
+ salesforceInput.srcConfig = &cfg
+ salesforceInput.publisher = &client
+ salesforceInput.log = logp.L().With("input_url", "salesforce")
+
+ salesforceInput.sfdcConfig, err = salesforceInput.getSFDCConfig(&cfg)
+ assert.NoError(t, err)
+
+ salesforceInput.soqlr, err = salesforceInput.SetupSFClientConnection()
+ if err != nil && !tc.wantErr {
+ t.Errorf("unexpected error from running input: %v", err)
+ }
+ if tc.wantErr && tc.AuthFail {
+ return
+ }
+
+ err = salesforceInput.run()
+ if err != nil && !tc.wantErr {
+ t.Errorf("unexpected error from running input: %v", err)
+ }
+ if tc.wantErr {
+ return
+ }
+
+ if len(client.published) < len(tc.expected) {
+ t.Errorf("unexpected number of published events: got:%d want at least:%d", len(client.published), len(tc.expected))
+ tc.expected = tc.expected[:len(client.published)]
+ }
+
+ client.published = client.published[:len(tc.expected)]
+ for i, got := range client.published {
+ if !reflect.DeepEqual(got.Fields["message"], tc.expected[i]) {
+ t.Errorf("unexpected result for event %d: got:- want:+\n%s", i, cmp.Diff(got.Fields, tc.expected[i]))
+ }
+ }
+ })
+ }
+}
+
+func defaultHandler(flow string, withoutQuery bool, msg1, msg2 string) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("content-type", "application/json")
+ switch {
+ case flow == PaginationFlow && r.FormValue("q") == defaultLoginObjectQuery:
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte(msg1))
+ case r.RequestURI == "/nextRecords/LoginEvents/ABCABCDABCDE":
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte(msg2))
+ case r.RequestURI == "/services/oauth2/token" && r.Method == http.MethodPost && r.FormValue("client_id") == "clientid":
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte(`{"access_token":"abcd","instance_url":"http://` + r.Host + `","token_type":"Bearer","id_token":"abcd","refresh_token":"abcd"}`))
+ case r.FormValue("client_id") == "clientid-wrong":
+ w.WriteHeader(http.StatusBadRequest)
+ _, _ = w.Write([]byte(msg2))
+ case r.FormValue("q") == defaultLoginEventLogFileQuery:
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte(msg1))
+ case r.FormValue("q") == defaultLoginObjectQuery, r.FormValue("q") == defaultLoginObjectQueryWithCursor, r.RequestURI == "/services/data/v58.0/sobjects/EventLogFile/0AT5j00002LqQTxGAN/LogFile":
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte(msg2))
+ case r.FormValue("q") == invalidDefaultLoginEventLogFileQuery, r.FormValue("q") == invalidDefaultLoginEventObjectQuery:
+ w.WriteHeader(http.StatusBadRequest)
+ _, _ = w.Write([]byte(msg2))
+ case flow == BadReponseFlow && (withoutQuery && r.FormValue("q") == ""):
+ w.WriteHeader(http.StatusBadRequest)
+ _, _ = w.Write([]byte(`{"error":"internal server error"}`))
+ }
+ }
+}
+
+func newTestServer(newServer func(http.Handler) *httptest.Server) func(testing.TB, http.HandlerFunc, map[string]interface{}) {
+ return func(t testing.TB, h http.HandlerFunc, config map[string]interface{}) {
+ server := newServer(h)
+ config["url"] = server.URL
+ config["auth.oauth2"].(map[string]interface{})["user_password_flow"].(map[string]interface{})["token_url"] = server.URL
+ t.Cleanup(server.Close)
+ }
+}
+
+var _ inputcursor.Publisher = (*publisher)(nil)
+
+type publisher struct {
+ done func()
+ published []beat.Event
+ cursors []map[string]interface{}
+ mu sync.Mutex
+}
+
+func (p *publisher) Publish(e beat.Event, cursor interface{}) error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ p.published = append(p.published, e)
+ if cursor != nil {
+ var cv map[string]interface{}
+ err := typeconv.Convert(&cv, cursor)
+ if err != nil {
+ return err
+ }
+
+ p.cursors = append(p.cursors, cv)
+ }
+ p.done()
+
+ return nil
+}
+
+func TestDecodeAsCSV(t *testing.T) {
+ sampleELF := `"EVENT_TYPE","TIMESTAMP","REQUEST_ID","ORGANIZATION_ID","USER_ID","RUN_TIME","CPU_TIME","URI","SESSION_KEY","LOGIN_KEY","USER_TYPE","REQUEST_STATUS","DB_TOTAL_TIME","LOGIN_TYPE","BROWSER_TYPE","API_TYPE","API_VERSION","USER_NAME","TLS_PROTOCOL","CIPHER_SUITE","AUTHENTICATION_METHOD_REFERENCE","LOGIN_SUB_TYPE","TIMESTAMP_DERIVED","USER_ID_DERIVED","CLIENT_IP","URI_ID_DERIVED","LOGIN_STATUS","SOURCE_IP"
+"Login","20231218054831.655","4u6LyuMrDvb_G-l1cJIQk-","00D5j00000DgAYG","0055j00000AT6I1","1219","127","/services/oauth2/token","","bY5Wfv8t/Ith7WVE","Standard","","1051271151","i","Go-http-client/1.1","","9998.0","salesforceinstance@devtest.in","TLSv1.2","ECDHE-RSA-AES256-GCM-SHA384","","","2023-12-18T05:48:31.655Z","0055j00000AT6I1AAL","Salesforce.com IP","","LOGIN_NO_ERROR","103.108.207.58"
+"Login","20231218054832.003","4u6LyuHSDv8LLVl1cJOqGV","00D5j00000DgAYG","0055j00000AT6I1","1277","104","/services/oauth2/token","","u60el7VqW8CSSKcW","Standard","","674857427","i","Go-http-client/1.1","","9998.0","salesforceinstance@devtest.in","TLSv1.2","ECDHE-RSA-AES256-GCM-SHA384","","","2023-12-18T05:48:32.003Z","0055j00000AT6I1AAL","103.108.207.58","","LOGIN_NO_ERROR","103.108.207.58"`
+
+ mp, err := decodeAsCSV([]byte(sampleELF))
+ assert.NoError(t, err)
+
+ wantNumOfEvents := 2
+ gotNumOfEvents := len(mp)
+ assert.Equal(t, wantNumOfEvents, gotNumOfEvents)
+
+ wantEventFields := map[string]string{
+ "LOGIN_TYPE": "i",
+ "API_VERSION": "9998.0",
+ "TIMESTAMP_DERIVED": "2023-12-18T05:48:31.655Z",
+ "TIMESTAMP": "20231218054831.655",
+ "USER_NAME": "salesforceinstance@devtest.in",
+ "SOURCE_IP": "103.108.207.58",
+ "CPU_TIME": "127",
+ "REQUEST_STATUS": "",
+ "DB_TOTAL_TIME": "1051271151",
+ "TLS_PROTOCOL": "TLSv1.2",
+ "AUTHENTICATION_METHOD_REFERENCE": "",
+ "REQUEST_ID": "4u6LyuMrDvb_G-l1cJIQk-",
+ "USER_ID": "0055j00000AT6I1",
+ "RUN_TIME": "1219",
+ "CIPHER_SUITE": "ECDHE-RSA-AES256-GCM-SHA384",
+ "CLIENT_IP": "Salesforce.com IP",
+ "EVENT_TYPE": "Login",
+ "LOGIN_SUB_TYPE": "",
+ "USER_ID_DERIVED": "0055j00000AT6I1AAL",
+ "URI_ID_DERIVED": "",
+ "ORGANIZATION_ID": "00D5j00000DgAYG",
+ "URI": "/services/oauth2/token",
+ "LOGIN_KEY": "bY5Wfv8t/Ith7WVE",
+ "USER_TYPE": "Standard",
+ "API_TYPE": "",
+ "SESSION_KEY": "",
+ "BROWSER_TYPE": "Go-http-client/1.1",
+ "LOGIN_STATUS": "LOGIN_NO_ERROR",
+ }
+
+ assert.Equal(t, wantEventFields, mp[0])
+}
+
+func TestSalesforceInputRunWithMethod(t *testing.T) {
+ var (
+ defaultUserPassAuthConfig = authConfig{
+ OAuth2: &OAuth2{
+ UserPasswordFlow: &UserPasswordFlow{
+ Enabled: pointer(true),
+ TokenURL: "https://instance_id.develop.my.salesforce.com/services/oauth2/token",
+ ClientID: "clientid",
+ ClientSecret: "clientsecret",
+ Username: "username",
+ Password: "password",
+ },
+ },
+ }
+ objectEventMonitotingConfig = eventMonitoringMethod{
+ Object: EventMonitoringConfig{
+ Enabled: pointer(true),
+ Interval: time.Second * 5,
+ Query: &QueryConfig{
+ Default: getValueTpl(defaultLoginObjectQuery),
+ Value: getValueTpl(valueLoginObjectQuery),
+ },
+ Cursor: &cursorConfig{Field: "EventDate"},
+ },
+ }
+ objectEventMonitoringWithWrongQuery = eventMonitoringMethod{
+ Object: EventMonitoringConfig{
+ Enabled: pointer(true),
+ Interval: time.Second * 5,
+ Query: &QueryConfig{
+ Default: getValueTpl(invalidDefaultLoginEventObjectQuery),
+ Value: getValueTpl(invalidValueLoginObjectQuery),
+ },
+ Cursor: &cursorConfig{Field: "EventDate"},
+ },
+ }
+
+ elfEventMonitotingConfig = eventMonitoringMethod{
+ EventLogFile: EventMonitoringConfig{
+ Enabled: pointer(true),
+ Interval: time.Second * 5,
+ Query: &QueryConfig{
+ Default: getValueTpl(defaultLoginEventLogFileQuery),
+ Value: getValueTpl(valueLoginEventLogFileQuery),
+ },
+ Cursor: &cursorConfig{Field: "EventDate"},
+ },
+ }
+ elfEventMonitotingWithWrongQuery = eventMonitoringMethod{
+ EventLogFile: EventMonitoringConfig{
+ Enabled: pointer(true),
+ Interval: time.Second * 5,
+ Query: &QueryConfig{
+ Default: getValueTpl(invalidDefaultLoginEventLogFileQuery),
+ Value: getValueTpl(invalidValueLoginEventLogFileQuery),
+ },
+ Cursor: &cursorConfig{Field: "EventDate"},
+ },
+ }
+ )
+
+ type fields struct {
+ ctx context.Context
+ publisher inputcursor.Publisher
+ cancel context.CancelCauseFunc
+ cursor *state
+ srcConfig *config
+ sfdcConfig *sfdc.Configuration
+ soqlr *soql.Resource
+ config config
+ }
+
+ defaultResource := resourceConfig{
+ Retry: retryConfig{
+ MaxAttempts: pointer(5),
+ WaitMin: pointer(time.Minute),
+ WaitMax: pointer(time.Minute),
+ },
+ Transport: httpcommon.DefaultHTTPTransportSettings(),
+ }
+
+ tests := []struct {
+ fields fields
+ setupServer func(testing.TB, http.HandlerFunc, *config)
+ handler http.HandlerFunc
+ method string
+ name string
+ expected []string
+ wantErr bool
+ AuthFail bool
+ ClientConnectionFail bool
+ }{
+ // Object
+ {
+ name: "Positive/object_get_one_event",
+ method: "Object",
+ setupServer: newTestServerBasedOnConfig(httptest.NewServer),
+ handler: defaultHandler(NoPaginationFlow, false, "", oneObjectEvents),
+ fields: fields{
+ config: config{
+ Version: 56,
+ Auth: &defaultUserPassAuthConfig,
+ EventMonitoringMethod: &objectEventMonitotingConfig,
+ Resource: &defaultResource,
+ },
+ cursor: &state{},
+ },
+ expected: []string{expectedObjectEvent},
+ },
+ {
+ name: "Negative/object_error_from_wrong_default_query",
+ method: "Object",
+ setupServer: newTestServerBasedOnConfig(httptest.NewServer),
+ handler: defaultHandler(NoPaginationFlow, false, "", oneObjectEvents),
+ fields: fields{
+ config: config{
+ Version: 56,
+ Auth: &defaultUserPassAuthConfig,
+ EventMonitoringMethod: &objectEventMonitoringWithWrongQuery,
+ Resource: &defaultResource,
+ },
+ cursor: &state{},
+ },
+ wantErr: true,
+ },
+ {
+ name: "Negative/object_error_from_wrong_value_query",
+ method: "Object",
+ setupServer: newTestServerBasedOnConfig(httptest.NewServer),
+ handler: defaultHandler(NoPaginationFlow, false, "", oneObjectEvents),
+ fields: fields{
+ config: config{
+ Version: 56,
+ Auth: &defaultUserPassAuthConfig,
+ EventMonitoringMethod: &objectEventMonitoringWithWrongQuery,
+ Resource: &defaultResource,
+ },
+ cursor: &state{
+ Object: dateTimeCursor{
+ FirstEventTime: "2020-01-01T00:00:00Z",
+ LastEventTime: "2020-01-01T00:00:00Z",
+ },
+ },
+ },
+ wantErr: true,
+ },
+
+ // EventLogFile
+ {
+ name: "Positive/elf_get_one_event",
+ method: "ELF",
+ setupServer: newTestServerBasedOnConfig(httptest.NewServer),
+ handler: defaultHandler(NoPaginationFlow, false, oneEventLogfileFirstResponseJSON, oneEventLogfileSecondResponseCSV),
+ fields: fields{
+ config: config{
+ Version: 56,
+ Auth: &defaultUserPassAuthConfig,
+ EventMonitoringMethod: &elfEventMonitotingConfig,
+ Resource: &defaultResource,
+ },
+ cursor: &state{},
+ },
+ expected: []string{expectedELFEvent},
+ },
+ {
+ name: "Negative/elf_error_from_wrong_default_query",
+ method: "ELF",
+ setupServer: newTestServerBasedOnConfig(httptest.NewServer),
+ handler: defaultHandler(NoPaginationFlow, false, oneEventLogfileFirstResponseJSON, oneEventLogfileSecondResponseCSV),
+ fields: fields{
+ config: config{
+ Version: 56,
+ Auth: &defaultUserPassAuthConfig,
+ EventMonitoringMethod: &elfEventMonitotingWithWrongQuery,
+ Resource: &defaultResource,
+ },
+ cursor: &state{},
+ },
+ wantErr: true,
+ },
+ {
+ name: "Negative/elf_error_from_wrong_value_query",
+ method: "ELF",
+ setupServer: newTestServerBasedOnConfig(httptest.NewServer),
+ handler: defaultHandler(NoPaginationFlow, false, oneEventLogfileFirstResponseJSON, oneEventLogfileSecondResponseCSV),
+ fields: fields{
+ config: config{
+ Version: 56,
+ Auth: &defaultUserPassAuthConfig,
+ EventMonitoringMethod: &elfEventMonitotingWithWrongQuery,
+ Resource: &defaultResource,
+ },
+ cursor: &state{
+ EventLogFile: dateTimeCursor{
+ FirstEventTime: "2020-01-01T00:00:00Z",
+ LastEventTime: "2020-01-01T00:00:00Z",
+ },
+ },
+ },
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ config := tt.fields.config
+
+ t.Run(tt.name, func(t *testing.T) {
+ tt.setupServer(t, tt.handler, &config)
+
+ s := &salesforceInput{
+ config: config,
+ ctx: tt.fields.ctx,
+ cancel: tt.fields.cancel,
+ publisher: tt.fields.publisher,
+ cursor: tt.fields.cursor,
+ srcConfig: tt.fields.srcConfig,
+ sfdcConfig: tt.fields.sfdcConfig,
+ log: logp.NewLogger("salesforceInput"),
+ soqlr: tt.fields.soqlr,
+ }
+
+ ctx, cancel := context.WithCancelCause(context.Background())
+ s.ctx = ctx
+ s.cancel = cancel
+
+ var client publisher
+ client.done = func() {
+ if len(client.published) >= len(tt.expected) {
+ cancel(nil)
+ }
+ }
+ s.publisher = &client
+ s.srcConfig = &s.config
+
+ var err error
+ s.sfdcConfig, err = s.getSFDCConfig(&s.config)
+ if err != nil && !tt.wantErr {
+ t.Errorf("unexpected error from running input: %v", err)
+ }
+ if tt.wantErr && tt.AuthFail {
+ return
+ }
+
+ s.soqlr, err = s.SetupSFClientConnection()
+ if err != nil && !tt.wantErr {
+ t.Errorf("unexpected error from running input: %v", err)
+ }
+ if tt.wantErr && tt.ClientConnectionFail {
+ return
+ }
+
+ if tt.method == "Object" {
+ if err := s.RunObject(); (err != nil) != tt.wantErr {
+ t.Errorf("salesforceInput.RunObject() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ } else {
+ if err := s.RunEventLogFile(); (err != nil) != tt.wantErr {
+ t.Errorf("salesforceInput.RunEventLogFile() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ }
+
+ if len(client.published) < len(tt.expected) {
+ t.Errorf("unexpected number of published events: got:%d want at least:%d", len(client.published), len(tt.expected))
+ tt.expected = tt.expected[:len(client.published)]
+ }
+
+ client.published = client.published[:len(tt.expected)]
+ for i, got := range client.published {
+ if !reflect.DeepEqual(got.Fields["message"], tt.expected[i]) {
+ t.Errorf("unexpected result for event %d: got:- want:+\n%s", i, cmp.Diff(got.Fields, tt.expected[i]))
+ }
+ }
+ })
+ }
+}
+
+func getValueTpl(in string) *valueTpl {
+ vp := &valueTpl{}
+ vp.Unpack(in) //nolint:errcheck // ignore error in test
+
+ return vp
+}
+
+func newTestServerBasedOnConfig(newServer func(http.Handler) *httptest.Server) func(testing.TB, http.HandlerFunc, *config) {
+ return func(t testing.TB, h http.HandlerFunc, config *config) {
+ server := newServer(h)
+ config.URL = server.URL
+ config.Auth.OAuth2.UserPasswordFlow.TokenURL = server.URL
+ t.Cleanup(server.Close)
+ }
+}
+
+func TestPlugin(t *testing.T) {
+ _ = Plugin(logp.NewLogger("salesforce_test"), stateStore{})
+}
diff --git a/x-pack/filebeat/input/salesforce/soql.go b/x-pack/filebeat/input/salesforce/soql.go
new file mode 100644
index 00000000000..44987644bdc
--- /dev/null
+++ b/x-pack/filebeat/input/salesforce/soql.go
@@ -0,0 +1,27 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package salesforce
+
+import (
+ "errors"
+ "strings"
+
+ "github.com/g8rswimmer/go-sfdc/soql"
+)
+
+// compile-time check if querier implements soql.QueryFormatter
+var _ soql.QueryFormatter = (*querier)(nil)
+
+type querier struct {
+ Query string
+}
+
+// Format returns the query string.
+func (q querier) Format() (string, error) {
+ if strings.TrimSpace(q.Query) == "" {
+ return "", errors.New("query is empty")
+ }
+ return q.Query, nil
+}
diff --git a/x-pack/filebeat/input/salesforce/soql_test.go b/x-pack/filebeat/input/salesforce/soql_test.go
new file mode 100644
index 00000000000..137a59a91bd
--- /dev/null
+++ b/x-pack/filebeat/input/salesforce/soql_test.go
@@ -0,0 +1,39 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package salesforce
+
+import (
+ "errors"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestFormat(t *testing.T) {
+ tests := map[string]struct {
+ wantErr error
+ input string
+ wantStr string
+ }{
+ "empty query": {input: "", wantStr: "", wantErr: errors.New("query is empty")},
+ "valid query": {input: "SELECT FIELDS(STANDARD) FROM LoginEvent", wantStr: "SELECT FIELDS(STANDARD) FROM LoginEvent", wantErr: nil},
+ "invalid query": {input: "SELECT ", wantStr: "SELECT ", wantErr: nil},
+ }
+
+ var q querier
+
+ for name, tc := range tests {
+ t.Run(name, func(t *testing.T) {
+ q.Query = tc.input
+ got, gotErr := q.Format()
+ if !assert.Equal(t, tc.wantErr, gotErr) {
+ t.FailNow()
+ }
+ if !assert.EqualValues(t, tc.wantStr, got) {
+ t.FailNow()
+ }
+ })
+ }
+}
diff --git a/x-pack/filebeat/input/salesforce/state.go b/x-pack/filebeat/input/salesforce/state.go
new file mode 100644
index 00000000000..2d8a96b68bd
--- /dev/null
+++ b/x-pack/filebeat/input/salesforce/state.go
@@ -0,0 +1,49 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package salesforce
+
+import (
+ "github.com/elastic/elastic-agent-libs/logp"
+ "github.com/elastic/elastic-agent-libs/mapstr"
+)
+
+// state is the state of the salesforce module. It is used to watermark the state
+// to avoid pulling duplicate data from Salesforce. The state is persisted separately
+// for EventLogFile and Object.
+type state struct {
+ Object dateTimeCursor `json:"object,omitempty"`
+ EventLogFile dateTimeCursor `json:"event_log_file,omitempty"`
+}
+
+// dateTimeCursor maintains two distinct states for the event collection iteration.
+// The initial state represents the time of the first event, while the subsequent state denotes the time of the last event.
+// In certain SOQL queries for specific objects, sorting by all fields may not be feasible, and there may be no specific order.
+// This design allows users to exert maximum control over the iteration process.
+// For instance, the LoginEvent object only supports sorting based on EventIdentifier and EventDate.
+// Furthermore, if we desire to sort based on EventDate, it only supports descending order sorting.
+// In this case by using first_event_time we can get latest event EventDate to query next set of events.
+// Reference to LoginEvent: https://developer.salesforce.com/docs/atlas.en-us.platform_events.meta/platform_events/sforce_api_objects_loginevent.htm
+type dateTimeCursor struct {
+ FirstEventTime string `struct:"first_event_time,omitempty"`
+ LastEventTime string `struct:"last_event_time,omitempty"`
+}
+
+// parseCursor parses the cursor from the configuration and executes the
+// template. If cursor is nil, the default templated query is used else
+// the value templated query is used. See QueryConfig struct for more.
+func parseCursor(cfg *QueryConfig, cursor mapstr.M, log *logp.Logger) (string, error) {
+ ctxTmpl := mapstr.M{"cursor": nil}
+
+ if cursor != nil {
+ ctxTmpl["cursor"] = cursor
+ qr, err := cfg.Value.Execute(ctxTmpl, nil, log)
+ if err != nil {
+ return "", err
+ }
+ return qr, nil
+ }
+
+ return cfg.Default.Execute(ctxTmpl, nil, log)
+}
diff --git a/x-pack/filebeat/input/salesforce/value_tpl.go b/x-pack/filebeat/input/salesforce/value_tpl.go
new file mode 100644
index 00000000000..8a05ecc0686
--- /dev/null
+++ b/x-pack/filebeat/input/salesforce/value_tpl.go
@@ -0,0 +1,132 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package salesforce
+
+import (
+ "errors"
+ "strings"
+ "text/template"
+ "time"
+
+ "github.com/elastic/elastic-agent-libs/logp"
+ "github.com/elastic/elastic-agent-libs/mapstr"
+)
+
+type valueTpl struct {
+ *template.Template
+}
+
+var (
+ errEmptyTemplateResult = errors.New("template result is empty")
+ errExecuteTemplate = errors.New("template execution failed")
+)
+
+// Execute executes the template with the given data. If the template execution
+// fails, then the defaultVal is used if it is not nil. Execute will return
+// variable substituted query with nil error.
+func (t *valueTpl) Execute(data any, defaultVal *valueTpl, log *logp.Logger) (val string, err error) {
+ fallback := func(err error) (string, error) {
+ if defaultVal != nil {
+ log.Debugf("template execution error: %s", err)
+ log.Info("fallback to default template")
+ return defaultVal.Execute(mapstr.M{}, nil, log)
+ }
+ return "", err
+ }
+
+ defer func() {
+ if r := recover(); r != nil {
+ val, err = fallback(errExecuteTemplate)
+ }
+ if err != nil {
+ log.Debugf("template execution failed %s", err)
+ }
+ }()
+
+ buf := new(strings.Builder)
+
+ err = t.Template.Execute(buf, data)
+ if err != nil {
+ return fallback(err)
+ }
+
+ val = buf.String()
+ if val == "" {
+ return fallback(errEmptyTemplateResult)
+ }
+
+ return val, nil
+}
+
+// Unpack parses the given string as a template.
+func (t *valueTpl) Unpack(in string) error {
+ // Custom delimiters to prevent issues when using template values as part of
+ // other Go templates.
+ const (
+ leftDelim = "[["
+ rightDelim = "]]"
+ )
+
+ tpl, err := template.New("").
+ Option("missingkey=error").
+ Funcs(template.FuncMap{
+ "now": timeNow,
+ "parseDuration": parseDuration,
+ "parseTime": parseTime,
+ "formatTime": formatTime,
+ }).
+ Delims(leftDelim, rightDelim).
+ Parse(in)
+ if err != nil {
+ return err
+ }
+
+ *t = valueTpl{Template: tpl}
+
+ return nil
+}
+
+// parseDuration parses a duration string and returns the time.Duration value.
+func parseDuration(s string) time.Duration {
+ d, _ := time.ParseDuration(s)
+ return d
+}
+
+// predefinedLayouts contains some predefined layouts that are commonly used.
+var predefinedLayouts = map[string]string{
+ "ANSIC": time.ANSIC,
+ "UnixDate": time.UnixDate,
+ "RubyDate": time.RubyDate,
+ "RFC822": time.RFC822,
+ "RFC822Z": time.RFC822Z,
+ "RFC850": time.RFC850,
+ "RFC1123": time.RFC1123,
+ "RFC1123Z": time.RFC1123Z,
+ "RFC3339": time.RFC3339, // 2006-01-02T15:04:05Z07:00
+ "CustomRFC3339Like": formatRFC3339Like, // 2006-01-02T15:04:05.999Z
+ "RFC3339Nano": time.RFC3339Nano,
+ "Kitchen": time.Kitchen,
+}
+
+// parseTime parses a time string using the given layout. There are also some
+// predefined layouts that can be used; see predefinedLayouts for more.
+func parseTime(ts, layout string) time.Time {
+ if found := predefinedLayouts[layout]; found != "" {
+ layout = found
+ }
+
+ t, _ := time.Parse(layout, ts)
+ return t
+}
+
+// formatTime formats a time using the given layout. There are also some
+// predefined layouts that can be used; see predefinedLayouts for more.
+func formatTime(t time.Time, layout string) string {
+ if found := predefinedLayouts[layout]; found != "" {
+ layout = found
+ }
+
+ return t.Format(layout)
+}