From fb753129b632739fc022b7a5c8148fa437710006 Mon Sep 17 00:00:00 2001
From: Andrew Kroh <andrew.kroh@elastic.co>
Date: Fri, 19 Aug 2022 18:02:16 -0400
Subject: [PATCH] [Filebeat] Add lumberjack input (#32175)

Add an input for receiving data over the Lumberjack protocol as
defined in https://github.com/elastic/go-lumber.

The raw data is written into the lumberjack field which is
mapped as flattened. The client's remote address is written to
`source.address` And if mTLS is used the client cert's CN is
written to `tls.client.subject`.

Metrics from the input are published under the `dataset` namespace.
They are also available under the http endpoint at `/dataset`.

The logging selectors for the input are `input.lumberjack` and
`input.lumberjack.go-lumber`.
---
 NOTICE.txt                                    | 217 ++++++++++++++--
 filebeat/docs/fields.asciidoc                 |  18 ++
 go.mod                                        |   4 +-
 go.sum                                        |   8 +-
 x-pack/filebeat/include/list.go               |   1 +
 .../input/default-inputs/inputs_aix.go        |   2 +
 .../input/default-inputs/inputs_other.go      |   2 +
 .../input/lumberjack/_meta/fields.yml         |   9 +
 x-pack/filebeat/input/lumberjack/ack.go       |  78 ++++++
 x-pack/filebeat/input/lumberjack/ack_test.go  |  46 ++++
 x-pack/filebeat/input/lumberjack/config.go    |  39 +++
 .../filebeat/input/lumberjack/config_test.go  |  74 ++++++
 x-pack/filebeat/input/lumberjack/fields.go    |  23 ++
 .../input/lumberjack/generate_certs_test.go   | 153 +++++++++++
 x-pack/filebeat/input/lumberjack/input.go     |  94 +++++++
 x-pack/filebeat/input/lumberjack/logger.go    |  40 +++
 x-pack/filebeat/input/lumberjack/metrics.go   |  47 ++++
 x-pack/filebeat/input/lumberjack/server.go    | 182 ++++++++++++++
 .../filebeat/input/lumberjack/server_test.go  | 238 ++++++++++++++++++
 19 files changed, 1255 insertions(+), 20 deletions(-)
 create mode 100644 x-pack/filebeat/input/lumberjack/_meta/fields.yml
 create mode 100644 x-pack/filebeat/input/lumberjack/ack.go
 create mode 100644 x-pack/filebeat/input/lumberjack/ack_test.go
 create mode 100644 x-pack/filebeat/input/lumberjack/config.go
 create mode 100644 x-pack/filebeat/input/lumberjack/config_test.go
 create mode 100644 x-pack/filebeat/input/lumberjack/fields.go
 create mode 100644 x-pack/filebeat/input/lumberjack/generate_certs_test.go
 create mode 100644 x-pack/filebeat/input/lumberjack/input.go
 create mode 100644 x-pack/filebeat/input/lumberjack/logger.go
 create mode 100644 x-pack/filebeat/input/lumberjack/metrics.go
 create mode 100644 x-pack/filebeat/input/lumberjack/server.go
 create mode 100644 x-pack/filebeat/input/lumberjack/server_test.go

diff --git a/NOTICE.txt b/NOTICE.txt
index 5ccfea34fde9..e23ebb81db43 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -11440,25 +11440,214 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-lookslike@v0
 
 --------------------------------------------------------------------------------
 Dependency : github.com/elastic/go-lumber
-Version: v0.1.0
+Version: v0.1.2-0.20220819171948-335fde24ea0f
 Licence type (autodetected): Apache-2.0
 --------------------------------------------------------------------------------
 
-Contents of probable licence file $GOMODCACHE/github.com/elastic/go-lumber@v0.1.0/LICENSE:
+Contents of probable licence file $GOMODCACHE/github.com/elastic/go-lumber@v0.1.2-0.20220819171948-335fde24ea0f/LICENSE:
 
-Copyright (c) 2012–2016 Elasticsearch <http://www.elastic.co>
 
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
 
-    http://www.apache.org/licenses/LICENSE-2.0
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
 
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
 
 
 --------------------------------------------------------------------------------
@@ -39265,11 +39454,11 @@ OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 
 --------------------------------------------------------------------------------
 Dependency : github.com/klauspost/compress
-Version: v1.13.6
+Version: v1.15.9
 Licence type (autodetected): Apache-2.0
 --------------------------------------------------------------------------------
 
-Contents of probable licence file $GOMODCACHE/github.com/klauspost/compress@v1.13.6/LICENSE:
+Contents of probable licence file $GOMODCACHE/github.com/klauspost/compress@v1.15.9/LICENSE:
 
 Copyright (c) 2012 The Go Authors. All rights reserved.
 Copyright (c) 2019 Klaus Post. All rights reserved.
diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc
index 66af34c3469e..51a6eb4ee115 100644
--- a/filebeat/docs/fields.asciidoc
+++ b/filebeat/docs/fields.asciidoc
@@ -56,6 +56,7 @@ grouped in the following categories:
 * <<exported-fields-kubernetes-processor>>
 * <<exported-fields-log>>
 * <<exported-fields-logstash>>
+* <<exported-fields-lumberjack>>
 * <<exported-fields-microsoft>>
 * <<exported-fields-misp>>
 * <<exported-fields-mongodb>>
@@ -87575,6 +87576,23 @@ alias to: event.duration
 
 --
 
+[[exported-fields-lumberjack]]
+== Lumberjack fields
+
+Fields from Lumberjack input.
+
+
+
+*`lumberjack`*::
++
+--
+Structured data received in an event sent over the Lumberjack protocol.
+
+
+type: flattened
+
+--
+
 [[exported-fields-microsoft]]
 == Microsoft fields
 
diff --git a/go.mod b/go.mod
index 363674896590..97a4c2a4a840 100644
--- a/go.mod
+++ b/go.mod
@@ -78,7 +78,7 @@ require (
 	github.com/elastic/go-libaudit/v2 v2.3.2-0.20220729123722-f8f7d5c19e6b
 	github.com/elastic/go-licenser v0.4.0
 	github.com/elastic/go-lookslike v0.3.0
-	github.com/elastic/go-lumber v0.1.0
+	github.com/elastic/go-lumber v0.1.2-0.20220819171948-335fde24ea0f
 	github.com/elastic/go-perf v0.0.0-20191212140718-9c656876f595
 	github.com/elastic/go-seccomp-bpf v1.2.0
 	github.com/elastic/go-structform v0.0.10
@@ -294,7 +294,7 @@ require (
 	github.com/json-iterator/go v1.1.12 // indirect
 	github.com/karrick/godirwalk v1.15.8 // indirect
 	github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
-	github.com/klauspost/compress v1.13.6 // indirect
+	github.com/klauspost/compress v1.15.9 // indirect
 	github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
 	github.com/markbates/pkger v0.17.0 // indirect
 	github.com/mattn/go-isatty v0.0.14 // indirect
diff --git a/go.sum b/go.sum
index 1f616d559c68..7d4451b58186 100644
--- a/go.sum
+++ b/go.sum
@@ -622,8 +622,8 @@ github.com/elastic/go-licenser v0.4.0 h1:jLq6A5SilDS/Iz1ABRkO6BHy91B9jBora8FwGRs
 github.com/elastic/go-licenser v0.4.0/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU=
 github.com/elastic/go-lookslike v0.3.0 h1:HDI/DQ65V85ZqM7D/sbxcK2wFFnh3+7iFvBk2v2FTHs=
 github.com/elastic/go-lookslike v0.3.0/go.mod h1:AhH+rdJux5RlVjs+6ej4jkvYyoNRkj2crxmqeHlj3hA=
-github.com/elastic/go-lumber v0.1.0 h1:HUjpyg36v2HoKtXlEC53EJ3zDFiDRn65d7B8dBHNius=
-github.com/elastic/go-lumber v0.1.0/go.mod h1:8YvjMIRYypWuPvpxx7WoijBYdbB7XIh/9FqSYQZTtxQ=
+github.com/elastic/go-lumber v0.1.2-0.20220819171948-335fde24ea0f h1:TsPpU5EAwlt7YZoupKlxZ093qTZYdGou3EhfTF1U0B4=
+github.com/elastic/go-lumber v0.1.2-0.20220819171948-335fde24ea0f/go.mod h1:HHaWnZamYKWsR9/eZNHqRHob8iQDKnchHmmskT/SKko=
 github.com/elastic/go-perf v0.0.0-20191212140718-9c656876f595 h1:q8n4QjcLa4q39Q3fqHRknTBXBtegjriHFrB42YKgXGI=
 github.com/elastic/go-perf v0.0.0-20191212140718-9c656876f595/go.mod h1:s09U1b4P1ZxnKx2OsqY7KlHdCesqZWIhyq0Gs/QC/Us=
 github.com/elastic/go-plugins-helpers v0.0.0-20200207104224-bdf17607b79f h1:FvsqAVIFZtJtK+koSvFU+/KoNQo1m14kgV5qJ8ImN+U=
@@ -1223,8 +1223,8 @@ github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYs
 github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
 github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
 github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
-github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
-github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
+github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
+github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
 github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
 github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
 github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go
index 91bf5fee773f..c1382ae36275 100644
--- a/x-pack/filebeat/include/list.go
+++ b/x-pack/filebeat/include/list.go
@@ -13,6 +13,7 @@ import (
 	_ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub"
 	_ "github.com/elastic/beats/v7/x-pack/filebeat/input/cometd"
 	_ "github.com/elastic/beats/v7/x-pack/filebeat/input/gcppubsub"
+	_ "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack"
 	_ "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow"
 	_ "github.com/elastic/beats/v7/x-pack/filebeat/module/activemq"
 	_ "github.com/elastic/beats/v7/x-pack/filebeat/module/aws"
diff --git a/x-pack/filebeat/input/default-inputs/inputs_aix.go b/x-pack/filebeat/input/default-inputs/inputs_aix.go
index a1cdf5da43ef..f46d8ed1f254 100644
--- a/x-pack/filebeat/input/default-inputs/inputs_aix.go
+++ b/x-pack/filebeat/input/default-inputs/inputs_aix.go
@@ -11,6 +11,7 @@ import (
 	"github.com/elastic/beats/v7/x-pack/filebeat/input/awss3"
 	"github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint"
 	"github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson"
+	"github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack"
 	"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
 	"github.com/elastic/elastic-agent-libs/logp"
 )
@@ -21,5 +22,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
 		httpjson.Plugin(log, store),
 		o365audit.Plugin(log, store),
 		awss3.Plugin(store),
+		lumberjack.Plugin(),
 	}
 }
diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go
index b100612d58cd..b87faaed46ac 100644
--- a/x-pack/filebeat/input/default-inputs/inputs_other.go
+++ b/x-pack/filebeat/input/default-inputs/inputs_other.go
@@ -16,6 +16,7 @@ import (
 	"github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry"
 	"github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint"
 	"github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson"
+	"github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack"
 	"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
 	"github.com/elastic/elastic-agent-libs/logp"
 )
@@ -28,5 +29,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
 		o365audit.Plugin(log, store),
 		awss3.Plugin(store),
 		awscloudwatch.Plugin(),
+		lumberjack.Plugin(),
 	}
 }
diff --git a/x-pack/filebeat/input/lumberjack/_meta/fields.yml b/x-pack/filebeat/input/lumberjack/_meta/fields.yml
new file mode 100644
index 000000000000..ee3ef012006e
--- /dev/null
+++ b/x-pack/filebeat/input/lumberjack/_meta/fields.yml
@@ -0,0 +1,9 @@
+- key: lumberjack
+  title: "Lumberjack"
+  description: >
+    Fields from Lumberjack input.
+  fields:
+    - name: lumberjack
+      type: flattened
+      description: >
+        Structured data received in an event sent over the Lumberjack protocol.
diff --git a/x-pack/filebeat/input/lumberjack/ack.go b/x-pack/filebeat/input/lumberjack/ack.go
new file mode 100644
index 000000000000..ab15ad157dc4
--- /dev/null
+++ b/x-pack/filebeat/input/lumberjack/ack.go
@@ -0,0 +1,78 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package lumberjack
+
+import (
+	"sync"
+
+	"github.com/elastic/beats/v7/libbeat/beat"
+	"github.com/elastic/beats/v7/libbeat/common/acker"
+)
+
+// batchACKTracker invokes batchACK when all events associated to the batch
+// have been published and acknowledged by an output.
+type batchACKTracker struct {
+	batchACK func()
+
+	mutex       sync.Mutex // mutex synchronizes access to pendingACKs.
+	pendingACKs int64      // Number of Beat events in lumberjack batch that are pending ACKs.
+}
+
+// newBatchACKTracker returns a new batchACKTracker. The provided batchACK function
+// is invoked after the full batch has been acknowledged. Ready() must be invoked
+// after all events in the batch are published.
+func newBatchACKTracker(batchACKCallback func()) *batchACKTracker {
+	return &batchACKTracker{
+		batchACK:    batchACKCallback,
+		pendingACKs: 1, // Ready() must be called to consume this "1".
+	}
+}
+
+// Ready signals that the batch has been fully consumed. Only
+// after the batch is marked as "ready" can the lumberjack batch
+// be ACKed. This prevents the batch from being ACKed prematurely.
+func (t *batchACKTracker) Ready() {
+	t.ACK()
+}
+
+// Add increments the number of pending ACKs.
+func (t *batchACKTracker) Add() {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+
+	t.pendingACKs++
+}
+
+// ACK decrements the number of pending event ACKs. When all pending ACKs are
+// received then the lumberjack batch is ACKed.
+func (t *batchACKTracker) ACK() {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+
+	if t.pendingACKs <= 0 {
+		panic("misuse detected: negative ACK counter")
+	}
+
+	t.pendingACKs--
+	if t.pendingACKs == 0 {
+		t.batchACK()
+	}
+}
+
+// newEventACKHandler returns a beat ACKer that can receive callbacks when
+// an event has been ACKed an output. If the event contains a private metadata
+// pointing to a batchACKTracker then it will invoke the tracker's ACK() method
+// to decrement the number of pending ACKs.
+func newEventACKHandler() beat.ACKer {
+	return acker.ConnectionOnly(
+		acker.EventPrivateReporter(func(_ int, privates []interface{}) {
+			for _, private := range privates {
+				if ack, ok := private.(*batchACKTracker); ok {
+					ack.ACK()
+				}
+			}
+		}),
+	)
+}
diff --git a/x-pack/filebeat/input/lumberjack/ack_test.go b/x-pack/filebeat/input/lumberjack/ack_test.go
new file mode 100644
index 000000000000..90e03819488d
--- /dev/null
+++ b/x-pack/filebeat/input/lumberjack/ack_test.go
@@ -0,0 +1,46 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package lumberjack
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/require"
+
+	"github.com/elastic/go-lumber/lj"
+)
+
+func TestBatchACKTracker(t *testing.T) {
+	t.Run("empty", func(t *testing.T) {
+		batch := lj.NewBatch(nil)
+
+		acker := newBatchACKTracker(batch.ACK)
+		require.False(t, isACKed(batch))
+
+		acker.Ready()
+		require.True(t, isACKed(batch))
+	})
+
+	t.Run("single_event", func(t *testing.T) {
+		batch := lj.NewBatch(nil)
+
+		acker := newBatchACKTracker(batch.ACK)
+		acker.Add()
+		acker.ACK()
+		require.False(t, isACKed(batch))
+
+		acker.Ready()
+		require.True(t, isACKed(batch))
+	})
+}
+
+func isACKed(batch *lj.Batch) bool {
+	select {
+	case <-batch.Await():
+		return true
+	default:
+		return false
+	}
+}
diff --git a/x-pack/filebeat/input/lumberjack/config.go b/x-pack/filebeat/input/lumberjack/config.go
new file mode 100644
index 000000000000..53ceed2f8ce9
--- /dev/null
+++ b/x-pack/filebeat/input/lumberjack/config.go
@@ -0,0 +1,39 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package lumberjack
+
+import (
+	"fmt"
+	"strings"
+	"time"
+
+	"github.com/elastic/elastic-agent-libs/transport/tlscommon"
+)
+
+type config struct {
+	ListenAddress  string                  `config:"listen_address" validate:"nonzero"` // Bind address for the server (e.g. address:port). Default to localhost:5044.
+	Versions       []string                `config:"versions"`                          // List of Lumberjack version (e.g. v1, v2).
+	TLS            *tlscommon.ServerConfig `config:"ssl"`                               // TLS options.
+	Keepalive      time.Duration           `config:"keepalive"       validate:"min=0"`  // Keepalive interval for notifying clients that batches that are not yet ACKed.
+	Timeout        time.Duration           `config:"timeout"         validate:"min=0"`  // Read / write timeouts for Lumberjack server.
+	MaxConnections int                     `config:"max_connections" validate:"min=0"`  // Maximum number of concurrent connections. Default is 0 which means no limit.
+}
+
+func (c *config) InitDefaults() {
+	c.ListenAddress = "localhost:5044"
+	c.Versions = []string{"v1", "v2"}
+}
+
+func (c *config) Validate() error {
+	for _, v := range c.Versions {
+		switch strings.ToLower(v) {
+		case "v1", "v2":
+		default:
+			return fmt.Errorf("invalid lumberjack version %q: allowed values are v1 and v2", v)
+		}
+	}
+
+	return nil
+}
diff --git a/x-pack/filebeat/input/lumberjack/config_test.go b/x-pack/filebeat/input/lumberjack/config_test.go
new file mode 100644
index 000000000000..5b9e73d4d7c4
--- /dev/null
+++ b/x-pack/filebeat/input/lumberjack/config_test.go
@@ -0,0 +1,74 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package lumberjack
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/require"
+
+	conf "github.com/elastic/elastic-agent-libs/config"
+)
+
+func TestConfig(t *testing.T) {
+	testCases := []struct {
+		name        string
+		userConfig  map[string]interface{}
+		expected    *config
+		expectedErr string
+	}{
+		{
+			"defaults",
+			map[string]interface{}{},
+			&config{
+				ListenAddress: "localhost:5044",
+				Versions:      []string{"v1", "v2"},
+			},
+			"",
+		},
+		{
+			"validate version",
+			map[string]interface{}{
+				"versions": []string{"v3"},
+			},
+			nil,
+			`invalid lumberjack version "v3"`,
+		},
+		{
+			"validate keepalive",
+			map[string]interface{}{
+				"keepalive": "-1s",
+			},
+			nil,
+			`requires duration >= 0`,
+		},
+		{
+			"validate max_connections",
+			map[string]interface{}{
+				"max_connections": -1,
+			},
+			nil,
+			`requires value >= 0 accessing 'max_connections'`,
+		},
+	}
+
+	for _, tc := range testCases {
+		tc := tc
+		t.Run(tc.name, func(t *testing.T) {
+			c := conf.MustNewConfigFrom(tc.userConfig)
+
+			var ljConf config
+			err := c.Unpack(&ljConf)
+
+			if tc.expectedErr != "" {
+				require.Error(t, err, "expected error: %s", tc.expectedErr)
+				require.Contains(t, err.Error(), tc.expectedErr)
+				return
+			}
+
+			require.Equal(t, *tc.expected, ljConf)
+		})
+	}
+}
diff --git a/x-pack/filebeat/input/lumberjack/fields.go b/x-pack/filebeat/input/lumberjack/fields.go
new file mode 100644
index 000000000000..d54be5d16eb4
--- /dev/null
+++ b/x-pack/filebeat/input/lumberjack/fields.go
@@ -0,0 +1,23 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+// Code generated by beats/dev-tools/cmd/asset/asset.go - DO NOT EDIT.
+
+package lumberjack
+
+import (
+	"github.com/elastic/beats/v7/libbeat/asset"
+)
+
+func init() {
+	if err := asset.SetFields("filebeat", "lumberjack", asset.ModuleFieldsPri, AssetLumberjack); err != nil {
+		panic(err)
+	}
+}
+
+// AssetLumberjack returns asset data.
+// This is the base64 encoded zlib format compressed contents of input/lumberjack.
+func AssetLumberjack() string {
+	return "eJxsjjEOwjAQBHu/YpU+eYALSio6XmDsjTBxbOtyjpTfo0QIEGKLK/ZGmu0xcbNIbb5RHs5PBtCoiRbd5V12BghcvMSqsWSLkwGAc2QKC0YpMz4wYq5NBwOMx98ebI/sZv6Y9uhWaTEmp8rM8Gr/2PZcVZrXJgwITh2EnnFlQMxwGVyZFct+ykqB3vk9rErR4ksazDMAAP//JmxQDQ=="
+}
diff --git a/x-pack/filebeat/input/lumberjack/generate_certs_test.go b/x-pack/filebeat/input/lumberjack/generate_certs_test.go
new file mode 100644
index 000000000000..e66eb1b5b8b2
--- /dev/null
+++ b/x-pack/filebeat/input/lumberjack/generate_certs_test.go
@@ -0,0 +1,153 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package lumberjack
+
+import (
+	"bytes"
+	"crypto/rand"
+	"crypto/rsa"
+	"crypto/tls"
+	"crypto/x509"
+	"crypto/x509/pkix"
+	"encoding/pem"
+	"math/big"
+	"net"
+	"testing"
+	"time"
+)
+
+type Cert struct {
+	signedCertDER []byte          // DER encoded certificate from x509.CreateCertificate.
+	key           *rsa.PrivateKey // RSA public / private key pair.
+}
+
+// CertPEM returns the cert encoded as PEM.
+func (c Cert) CertPEM(t testing.TB) []byte { return pemEncode(t, c.signedCertDER, "CERTIFICATE") }
+
+// KeyPEM returns the private key encoded as PEM.
+func (c Cert) KeyPEM(t testing.TB) []byte {
+	return pemEncode(t, x509.MarshalPKCS1PrivateKey(c.key), "RSA PRIVATE KEY")
+}
+
+func (c Cert) TLSCertificate(t testing.TB) tls.Certificate {
+	pair, err := tls.X509KeyPair(c.CertPEM(t), c.KeyPEM(t))
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	return pair
+}
+
+// generateCertData creates a root CA, server, and client cert suitable for
+// testing mTLS.
+func generateCertData(t testing.TB) (rootCA, client, server Cert) {
+	t.Helper()
+
+	// CA cert
+	ca := &x509.Certificate{
+		SerialNumber: big.NewInt(1),
+		Subject: pkix.Name{
+			Organization:  []string{"Elastic"},
+			Country:       []string{"US"},
+			Locality:      []string{"San Francisco"},
+			StreetAddress: []string{"West El Camino Real"},
+			PostalCode:    []string{"94040"},
+		},
+		NotBefore:             time.Now(),
+		NotAfter:              time.Now().AddDate(0, 0, 1),
+		IsCA:                  true,
+		ExtKeyUsage:           []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
+		KeyUsage:              x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
+		BasicConstraintsValid: true,
+	}
+
+	var err error
+	rootCA.key, err = rsa.GenerateKey(rand.Reader, 4096)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	rootCA.signedCertDER, err = x509.CreateCertificate(rand.Reader, ca, ca, &rootCA.key.PublicKey, rootCA.key)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Server cert
+	{
+		// set up our server certificate
+		serverCert := &x509.Certificate{
+			SerialNumber: big.NewInt(2),
+			Subject: pkix.Name{
+				Organization:  []string{"Elastic"},
+				Country:       []string{"US"},
+				Locality:      []string{"San Francisco"},
+				StreetAddress: []string{"West El Camino Real"},
+				PostalCode:    []string{"94040"},
+				CommonName:    "server",
+			},
+			IPAddresses:  []net.IP{net.IPv4(127, 0, 0, 1), net.IPv6loopback},
+			DNSNames:     []string{"localhost"},
+			NotBefore:    time.Now(),
+			NotAfter:     time.Now().AddDate(0, 0, 1),
+			SubjectKeyId: []byte{1, 2, 3, 4, 5},
+			ExtKeyUsage:  []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
+			KeyUsage:     x509.KeyUsageDigitalSignature,
+		}
+
+		server.key, err = rsa.GenerateKey(rand.Reader, 4096)
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		server.signedCertDER, err = x509.CreateCertificate(rand.Reader, serverCert, ca, &server.key.PublicKey, rootCA.key)
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// Client cert.
+	{
+		clientCert := &x509.Certificate{
+			SerialNumber: big.NewInt(3),
+			Subject: pkix.Name{
+				Organization:  []string{"Elastic"},
+				Country:       []string{"US"},
+				Locality:      []string{"San Francisco"},
+				StreetAddress: []string{"West El Camino Real"},
+				PostalCode:    []string{"94040"},
+				CommonName:    "client",
+			},
+			NotBefore:      time.Now(),
+			NotAfter:       time.Now().AddDate(0, 0, 1),
+			SubjectKeyId:   []byte{1, 2, 3, 4, 5},
+			ExtKeyUsage:    []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
+			KeyUsage:       x509.KeyUsageDigitalSignature,
+			EmailAddresses: []string{"client@example.com"},
+		}
+
+		client.key, err = rsa.GenerateKey(rand.Reader, 4096)
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		client.signedCertDER, err = x509.CreateCertificate(rand.Reader, clientCert, ca, &client.key.PublicKey, rootCA.key)
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	return rootCA, client, server
+}
+
+func pemEncode(t testing.TB, certBytes []byte, certType string) []byte {
+	t.Helper()
+
+	pemData := new(bytes.Buffer)
+	if err := pem.Encode(pemData, &pem.Block{Type: certType, Bytes: certBytes}); err != nil {
+		t.Fatal(err)
+	}
+
+	return pemData.Bytes()
+}
diff --git a/x-pack/filebeat/input/lumberjack/input.go b/x-pack/filebeat/input/lumberjack/input.go
new file mode 100644
index 000000000000..9471bb35e92b
--- /dev/null
+++ b/x-pack/filebeat/input/lumberjack/input.go
@@ -0,0 +1,94 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package lumberjack
+
+import (
+	"fmt"
+
+	inputv2 "github.com/elastic/beats/v7/filebeat/input/v2"
+	"github.com/elastic/beats/v7/libbeat/beat"
+	"github.com/elastic/beats/v7/libbeat/feature"
+	conf "github.com/elastic/elastic-agent-libs/config"
+	"github.com/elastic/elastic-agent-libs/monitoring"
+)
+
+const (
+	inputName = "lumberjack"
+)
+
+func Plugin() inputv2.Plugin {
+	return inputv2.Plugin{
+		Name:      inputName,
+		Stability: feature.Beta,
+		Info:      "Receives data streamed via the Lumberjack protocol.",
+		Manager:   inputv2.ConfigureWith(configure),
+	}
+}
+
+func configure(cfg *conf.C) (inputv2.Input, error) {
+	var lumberjackConfig config
+	if err := cfg.Unpack(&lumberjackConfig); err != nil {
+		return nil, err
+	}
+
+	return newLumberjackInput(lumberjackConfig)
+}
+
+// lumberjackInput implements the Filebeat input V2 interface. The input is stateless.
+type lumberjackInput struct {
+	config config
+}
+
+var _ inputv2.Input = (*lumberjackInput)(nil)
+
+func newLumberjackInput(lumberjackConfig config) (*lumberjackInput, error) {
+	return &lumberjackInput{config: lumberjackConfig}, nil
+}
+
+func (i *lumberjackInput) Name() string { return inputName }
+
+func (i *lumberjackInput) Test(inputCtx inputv2.TestContext) error {
+	s, err := newServer(i.config, inputCtx.Logger, nil, nil)
+	if err != nil {
+		return err
+	}
+	return s.Close()
+}
+
+func (i *lumberjackInput) Run(inputCtx inputv2.Context, pipeline beat.Pipeline) error {
+	inputCtx.Logger.Info("Starting " + inputName + " input")
+	defer inputCtx.Logger.Info(inputName + " input stopped")
+
+	// Create client for publishing events and receive notification of their ACKs.
+	client, err := pipeline.ConnectWith(beat.ClientConfig{
+		CloseRef:   inputCtx.Cancelation,
+		ACKHandler: newEventACKHandler(),
+	})
+	if err != nil {
+		return fmt.Errorf("failed to create pipeline client: %w", err)
+	}
+	defer client.Close()
+
+	setGoLumberLogger(inputCtx.Logger.Named("go-lumber"))
+
+	metricRegistry := monitoring.GetNamespace("dataset").GetRegistry()
+	metrics := newInputMetrics(metricRegistry, inputCtx.ID)
+	defer metrics.Close()
+
+	s, err := newServer(i.config, inputCtx.Logger, client.Publish, metrics)
+	if err != nil {
+		return err
+	}
+	defer s.Close()
+
+	// Shutdown the server when cancellation is signaled.
+	go func() {
+		<-inputCtx.Cancelation.Done()
+		s.Close()
+	}()
+
+	// Run server until the cancellation signal.
+	return s.Run()
+}
diff --git a/x-pack/filebeat/input/lumberjack/logger.go b/x-pack/filebeat/input/lumberjack/logger.go
new file mode 100644
index 000000000000..0f15b2b0d114
--- /dev/null
+++ b/x-pack/filebeat/input/lumberjack/logger.go
@@ -0,0 +1,40 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package lumberjack
+
+import (
+	"sync"
+
+	"go.uber.org/zap"
+
+	"github.com/elastic/elastic-agent-libs/logp"
+	lumberlog "github.com/elastic/go-lumber/log"
+)
+
+var setGoLumberLoggerOnce sync.Once
+
+func setGoLumberLogger(parent *logp.Logger) {
+	setGoLumberLoggerOnce.Do(func() {
+		lumberlog.Logger = &goLumberLogger{parent: parent.WithOptions(zap.AddCallerSkip(2))}
+	})
+}
+
+// goLumberLogger implements the go-lumber/log.Logging interface to route
+// log message from go-lumber to Beats logp.
+type goLumberLogger struct {
+	parent *logp.Logger
+}
+
+func (l *goLumberLogger) Printf(s string, i ...interface{}) {
+	l.parent.Debugf(s, i...)
+}
+
+func (l *goLumberLogger) Println(i ...interface{}) {
+	l.parent.Debug(i...)
+}
+
+func (l *goLumberLogger) Print(i ...interface{}) {
+	l.parent.Debug(i...)
+}
diff --git a/x-pack/filebeat/input/lumberjack/metrics.go b/x-pack/filebeat/input/lumberjack/metrics.go
new file mode 100644
index 000000000000..ebceeb397b71
--- /dev/null
+++ b/x-pack/filebeat/input/lumberjack/metrics.go
@@ -0,0 +1,47 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package lumberjack
+
+import (
+	"github.com/rcrowley/go-metrics"
+
+	"github.com/elastic/elastic-agent-libs/monitoring"
+	"github.com/elastic/elastic-agent-libs/monitoring/adapter"
+)
+
+type inputMetrics struct {
+	id          string               // Input ID.
+	parent      *monitoring.Registry // Parent registry holding this input's ID as a key.
+	bindAddress *monitoring.String   // Bind address of input.
+
+	batchesReceivedTotal  *monitoring.Uint // Number of Lumberjack batches received (not necessarily processed fully).
+	batchesACKedTotal     *monitoring.Uint // Number of Lumberjack batches ACKed.
+	messagesReceivedTotal *monitoring.Uint // Number of Lumberjack messages received (not necessarily processed fully).
+	batchProcessingTime   metrics.Sample   // Histogram of the elapsed batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches).
+}
+
+// Close removes the metrics from the registry.
+func (m *inputMetrics) Close() {
+	m.parent.Remove(m.id)
+}
+
+func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics {
+	reg := parent.NewRegistry(id)
+	monitoring.NewString(reg, "input").Set(inputName)
+	monitoring.NewString(reg, "id").Set(id)
+	out := &inputMetrics{
+		id:                    id,
+		parent:                reg,
+		bindAddress:           monitoring.NewString(reg, "bind_address"),
+		batchesReceivedTotal:  monitoring.NewUint(reg, "batches_received_total"),
+		batchesACKedTotal:     monitoring.NewUint(reg, "batches_acked_total"),
+		messagesReceivedTotal: monitoring.NewUint(reg, "messages_received_total"),
+		batchProcessingTime:   metrics.NewUniformSample(1024),
+	}
+	adapter.NewGoMetrics(reg, "batch_processing_time", adapter.Accept).
+		Register("histogram", metrics.NewHistogram(out.batchProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
+
+	return out
+}
diff --git a/x-pack/filebeat/input/lumberjack/server.go b/x-pack/filebeat/input/lumberjack/server.go
new file mode 100644
index 000000000000..96d0366e2b5f
--- /dev/null
+++ b/x-pack/filebeat/input/lumberjack/server.go
@@ -0,0 +1,182 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package lumberjack
+
+import (
+	"crypto/tls"
+	"net"
+	"strings"
+	"sync"
+	"time"
+
+	"golang.org/x/net/netutil"
+
+	"github.com/elastic/beats/v7/libbeat/beat"
+	"github.com/elastic/elastic-agent-libs/logp"
+	"github.com/elastic/elastic-agent-libs/monitoring"
+	"github.com/elastic/elastic-agent-libs/transport/tlscommon"
+	lumber "github.com/elastic/go-lumber/server"
+)
+
+type server struct {
+	config         config
+	log            *logp.Logger
+	publish        func(beat.Event)
+	metrics        *inputMetrics
+	ljSvr          lumber.Server
+	ljSvrCloseOnce sync.Once
+	bindAddress    string
+}
+
+func newServer(c config, log *logp.Logger, pub func(beat.Event), metrics *inputMetrics) (*server, error) {
+	ljSvr, bindAddress, err := newLumberjack(c)
+	if err != nil {
+		return nil, err
+	}
+
+	if metrics == nil {
+		metrics = newInputMetrics(monitoring.NewRegistry(), "")
+	}
+
+	bindURI := "tcp://" + bindAddress
+	if c.TLS.IsEnabled() {
+		bindURI = "tls://" + bindAddress
+	}
+	log.Infof(inputName+" is listening at %v.", bindURI)
+	metrics.bindAddress.Set(bindURI)
+
+	return &server{
+		config:      c,
+		log:         log,
+		publish:     pub,
+		metrics:     metrics,
+		ljSvr:       ljSvr,
+		bindAddress: bindAddress,
+	}, nil
+}
+
+func (s *server) Close() error {
+	var err error
+	s.ljSvrCloseOnce.Do(func() {
+		err = s.ljSvr.Close()
+	})
+	return err
+}
+
+func (s *server) Run() error {
+	// Process batches until the input is stopped.
+	for batch := range s.ljSvr.ReceiveChan() {
+		s.metrics.batchesReceivedTotal.Inc()
+
+		if len(batch.Events) == 0 {
+			batch.ACK()
+			s.metrics.batchesACKedTotal.Inc()
+			continue
+		}
+		s.metrics.messagesReceivedTotal.Add(uint64(len(batch.Events)))
+
+		// Track all the Beat events associated to the Lumberjack batch so that
+		// the batch can be ACKed after the Beat events are delivered successfully.
+		start := time.Now()
+		acker := newBatchACKTracker(func() {
+			batch.ACK()
+			s.metrics.batchesACKedTotal.Inc()
+			s.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
+		})
+
+		for _, ljEvent := range batch.Events {
+			acker.Add()
+			s.publish(makeEvent(batch.RemoteAddr, batch.TLS, ljEvent, acker))
+		}
+
+		// Mark the batch as "ready" after Beat events are generated for each
+		// Lumberjack event.
+		acker.Ready()
+	}
+
+	return nil
+}
+
+func makeEvent(remoteAddr string, tlsState *tls.ConnectionState, lumberjackEvent interface{}, acker *batchACKTracker) beat.Event {
+	event := beat.Event{
+		Timestamp: time.Now().UTC(),
+		Fields: map[string]interface{}{
+			"source": map[string]interface{}{
+				"address": remoteAddr,
+			},
+			"lumberjack": lumberjackEvent,
+		},
+		Private: acker,
+	}
+
+	if tlsState != nil && len(tlsState.PeerCertificates) > 0 {
+		event.Fields["tls"] = map[string]interface{}{
+			"client": map[string]interface{}{
+				"subject": tlsState.PeerCertificates[0].Subject.CommonName,
+			},
+		}
+	}
+
+	return event
+}
+
+func newLumberjack(c config) (lj lumber.Server, bindAddress string, err error) {
+	// Setup optional TLS.
+	var tlsConfig *tls.Config
+	if c.TLS.IsEnabled() {
+		elasticTLSConfig, err := tlscommon.LoadTLSServerConfig(c.TLS)
+		if err != nil {
+			return nil, "", err
+		}
+
+		// NOTE: Passing an empty string disables checking the client certificate for a
+		// specific hostname.
+		tlsConfig = elasticTLSConfig.BuildServerConfig("")
+	}
+
+	// Start listener.
+	l, err := net.Listen("tcp", c.ListenAddress)
+	if err != nil {
+		return nil, "", err
+	}
+	if tlsConfig != nil {
+		l = tls.NewListener(l, tlsConfig)
+	}
+	if c.MaxConnections > 0 {
+		l = netutil.LimitListener(l, c.MaxConnections)
+	}
+
+	// Start lumberjack server.
+	s, err := lumber.NewWithListener(l, makeLumberjackOptions(c)...)
+	if err != nil {
+		return nil, "", err
+	}
+
+	return s, l.Addr().String(), nil
+}
+
+func makeLumberjackOptions(c config) []lumber.Option {
+	var opts []lumber.Option
+
+	// Versions
+	for _, p := range c.Versions {
+		switch strings.ToLower(p) {
+		case "v1":
+			opts = append(opts, lumber.V1(true))
+		case "v2":
+			opts = append(opts, lumber.V2(true))
+		}
+	}
+
+	if c.Keepalive > 0 {
+		opts = append(opts, lumber.Keepalive(c.Keepalive))
+	}
+
+	if c.Timeout > 0 {
+		opts = append(opts, lumber.Timeout(c.Keepalive))
+	}
+
+	return opts
+}
diff --git a/x-pack/filebeat/input/lumberjack/server_test.go b/x-pack/filebeat/input/lumberjack/server_test.go
new file mode 100644
index 000000000000..971a37f7255f
--- /dev/null
+++ b/x-pack/filebeat/input/lumberjack/server_test.go
@@ -0,0 +1,238 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package lumberjack
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"errors"
+	"fmt"
+	"net"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/require"
+	"golang.org/x/sync/errgroup"
+
+	"github.com/elastic/beats/v7/libbeat/beat"
+	"github.com/elastic/elastic-agent-libs/logp"
+	"github.com/elastic/elastic-agent-libs/transport/tlscommon"
+	client "github.com/elastic/go-lumber/client/v2"
+)
+
+const testTimeout = 10 * time.Second
+
+func TestServer(t *testing.T) {
+	makeTestConfig := func() config {
+		var c config
+		c.InitDefaults()
+		c.ListenAddress = "localhost:0"
+		c.MaxConnections = 1
+		c.Keepalive = time.Second
+		c.Timeout = time.Second
+		return c
+	}
+
+	t.Run("empty_batch", func(t *testing.T) {
+		testSendReceive(t, makeTestConfig(), 0, nil)
+	})
+
+	t.Run("no tls", func(t *testing.T) {
+		testSendReceive(t, makeTestConfig(), 10, nil)
+	})
+
+	t.Run("tls", func(t *testing.T) {
+		clientConf, serverConf := tlsSetup(t)
+		clientConf.Certificates = nil
+
+		c := makeTestConfig()
+		c.TLS = serverConf
+		// Disable mTLS requirements in the server.
+		c.TLS.ClientAuth = 0 // tls.NoClientCert
+		c.TLS.VerificationMode = tlscommon.VerifyNone
+
+		testSendReceive(t, c, 10, clientConf)
+	})
+
+	t.Run("mutual tls", func(t *testing.T) {
+		clientConf, serverConf := tlsSetup(t)
+
+		c := makeTestConfig()
+		c.TLS = serverConf
+
+		testSendReceive(t, c, 10, clientConf)
+	})
+}
+
+func testSendReceive(t testing.TB, c config, numberOfEvents int, clientTLSConfig *tls.Config) {
+	require.NoError(t, logp.TestingSetup())
+	log := logp.NewLogger(inputName).With("test_name", t.Name())
+
+	ctx, shutdown := context.WithTimeout(context.Background(), testTimeout)
+	t.Cleanup(shutdown)
+	collect := newEventCollector(ctx, numberOfEvents)
+
+	// Start server.
+	s, err := newServer(c, log, collect.Publish, nil)
+	require.NoError(t, err)
+	go func() {
+		<-ctx.Done()
+		s.Close()
+	}()
+
+	// Asynchronously send and receive events.
+	var wg errgroup.Group
+	wg.Go(s.Run)
+	wg.Go(func() error {
+		// The client returns on error or after an E2E ACK is received.
+		// In both cases the test should shutdown.
+		defer shutdown()
+
+		return sendData(ctx, t, s.bindAddress, numberOfEvents, clientTLSConfig)
+	})
+
+	// Wait for the expected number of events.
+	collect.Await(t)
+
+	// Check for errors from client and server.
+	require.NoError(t, wg.Wait())
+}
+
+func sendData(ctx context.Context, t testing.TB, bindAddress string, numberOfEvents int, clientTLSConfig *tls.Config) error {
+	_, port, err := net.SplitHostPort(bindAddress)
+	if err != nil {
+		return err
+	}
+
+	dialFunc := net.Dial
+	if clientTLSConfig != nil {
+		dialer := &tls.Dialer{
+			Config: clientTLSConfig,
+		}
+		dialFunc = dialer.Dial
+	}
+
+	c, err := client.SyncDialWith(dialFunc, net.JoinHostPort("localhost", port))
+	if err != nil {
+		return fmt.Errorf("client dial error: %w", err)
+	}
+	defer c.Close()
+	go func() {
+		<-ctx.Done()
+		c.Close()
+	}()
+	t.Log("Lumberjack client connected.")
+
+	var events []interface{}
+	for i := 0; i < numberOfEvents; i++ {
+		events = append(events, map[string]interface{}{
+			"message": "hello world!",
+			"index":   i,
+		})
+	}
+
+	if _, err = c.Send(events); err != nil {
+		return fmt.Errorf("failed sending lumberjack events: %w", err)
+	}
+	t.Log("Lumberjack client sent", len(events), "events.")
+
+	return nil
+}
+
+type eventCollector struct {
+	sync.Mutex
+	events       []beat.Event
+	awaitCtx     context.Context // awaitCtx is cancelled when events length is expectedSize.
+	awaitCancel  context.CancelFunc
+	expectedSize int
+}
+
+func newEventCollector(ctx context.Context, expectedSize int) *eventCollector {
+	ctx, cancel := context.WithCancel(ctx)
+	if expectedSize == 0 {
+		cancel()
+	}
+
+	return &eventCollector{
+		awaitCtx:     ctx,
+		awaitCancel:  cancel,
+		expectedSize: expectedSize,
+	}
+}
+
+func (c *eventCollector) Publish(evt beat.Event) {
+	c.Lock()
+	defer c.Unlock()
+
+	c.events = append(c.events, evt)
+	evt.Private.(*batchACKTracker).ACK()
+
+	if len(c.events) == c.expectedSize {
+		c.awaitCancel()
+	}
+}
+
+func (c *eventCollector) Await(t testing.TB) []beat.Event {
+	t.Helper()
+
+	<-c.awaitCtx.Done()
+	if errors.Is(c.awaitCtx.Err(), context.DeadlineExceeded) {
+		t.Fatal(c.awaitCtx.Err())
+	}
+
+	c.Lock()
+	defer c.Unlock()
+
+	if len(c.events) > c.expectedSize {
+		t.Fatalf("more events received than expected, got %d, want %d", len(c.events), c.expectedSize)
+	}
+
+	events := make([]beat.Event, len(c.events))
+	copy(events, c.events)
+	return events
+}
+
+var (
+	certDataOnce sync.Once
+	certData     = struct {
+		ca, client, server Cert
+	}{}
+)
+
+// tlsSetup return client and server configurations ready to test mutual TLS.
+func tlsSetup(t *testing.T) (clientConfig *tls.Config, serverConfig *tlscommon.ServerConfig) {
+	t.Helper()
+
+	certDataOnce.Do(func() {
+		certData.ca, certData.client, certData.server = generateCertData(t)
+	})
+
+	certPool := x509.NewCertPool()
+	certPool.AppendCertsFromPEM(certData.ca.CertPEM(t))
+
+	clientConfig = &tls.Config{
+		RootCAs:      certPool,
+		Certificates: []tls.Certificate{certData.client.TLSCertificate(t)},
+		MinVersion:   tls.VersionTLS12,
+	}
+
+	serverConfig = &tlscommon.ServerConfig{
+		// NOTE: VerifyCertificate is ineffective unless ClientAuth is set to RequireAndVerifyClientCert.
+		VerificationMode: tlscommon.VerifyCertificate,
+		// Unfortunately ServerConfig uses an unexported type in an exported field.
+		ClientAuth: 4, // tls.RequireAndVerifyClientCert
+		CAs: []string{
+			string(certData.ca.CertPEM(t)),
+		},
+		Certificate: tlscommon.CertificateConfig{
+			Certificate: string(certData.server.CertPEM(t)),
+			Key:         string(certData.server.KeyPEM(t)),
+		},
+	}
+
+	return clientConfig, serverConfig
+}