diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index dad73445fcae..ea6d40f8368d 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -87,6 +87,47 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
 *Filebeat*
 
 - `container` and `docker` inputs now support reading of labels and env vars written by docker JSON file logging driver. {issue}8358[8358]
+- Add specific date processor to convert timezones so same pipeline can be used when convert_timezone is enabled or disabled. {pull}12253[12253]
+- Add MSSQL module {pull}12079[12079]
+- Add ISO8601 date parsing support for system module. {pull}12568[12568] {pull}12578[12579]
+- Update Kubernetes deployment manifest to use `container` input. {pull}12632[12632]
+- Use correct OS path separator in `add_kubernetes_metadata` to support Windows nodes. {pull}9205[9205]
+- Add support for virtual host in Apache access logs {pull}12778[12778]
+- Add support for client addresses with port in Apache error logs {pull}12695[12695]
+- Add `google-pubsub` input type for consuming messages from a Google Cloud Pub/Sub topic subscription. {pull}12746[12746]
+- Add module for ingesting Cisco IOS logs over syslog. {pull}12748[12748]
+- Add module for ingesting Google Cloud VPC flow logs. {pull}12747[12747]
+- Report host metadata for Filebeat logs in Kubernetes. {pull}12790[12790]
+- Add netflow dashboards based on Logstash netflow. {pull}12857[12857]
+- Parse more fields from Elasticsearch slowlogs. {pull}11939[11939]
+- Update module pipelines to enrich events with autonomous system fields. {pull}13036[13036]
+- Add module for ingesting IBM MQ logs. {pull}8782[8782]
+- Add S3 input to retrieve logs from AWS S3 buckets. {pull}12640[12640] {issue}12582[12582]
+- Add aws module s3access metricset. {pull}13170[13170] {issue}12880[12880]
+- Update Suricata module to populate ECS DNS fields and handle EVE DNS version 2. {issue}13320[13320] {pull}13329[13329]
+- Update PAN-OS fileset to use the ECS NAT fields. {issue}13320[13320] {pull}13330[13330]
+- Add fields to the Zeek DNS fileset for ECS DNS. {issue}13320[13320] {pull}13324[13324]
+- Add container image in Kubernetes metadata {pull}13356[13356] {issue}12688[12688]
+- Add timezone information to apache error fileset. {issue}12772[12772] {pull}13304[13304]
+- Add module for ingesting Cisco FTD logs over syslog. {pull}13286[13286]
+- Update CoreDNS module to populate ECS DNS fields. {issue}13320[13320] {pull}13505[13505]
+- Parse query steps in PostgreSQL slowlogs. {issue}13496[13496] {pull}13701[13701]
+- Add filebeat azure module with activitylogs, auditlogs, signinlogs filesets. {pull}13776[13776] {pull}14033[14033]
+- Add support to set the document id in the json reader. {pull}5844[5844]
+- Add input httpjson. {issue}13545[13545] {pull}13546[13546]
+- Filebeat Netflow input: Remove beta label. {pull}13858[13858]
+- Remove `event.timezone` from events that don't need it in some modules that support log formats with and without timezones. {pull}13918[13918]
+- Add ExpandEventListFromField config option in the kafka input. {pull}13965[13965]
+- Add ELB fileset to AWS module. {pull}14020[14020]
+- Add module for MISP (Malware Information Sharing Platform). {pull}13805[13805]
+- Add `source.bytes` and `source.packets` for uni-directional netflow events. {pull}14111[14111]
+- Add support for gzipped files in S3 input. {pull}13980[13980]
+- Add support for all the ObjectCreated events in S3 input. {pull}14077[14077]
+- Add Kibana Dashboard for MISP module. {pull}14147[14147]
+- Add JSON options to autodiscover hints {pull}14208[14208]
+- Add more filesets to Zeek module. {pull}14150[14150]
+- Add `index` option to all inputs to directly set a per-input index value. {pull}14010[14010]
+- Remove beta flag for some filebeat modules. {pull}14374[14374]
 
 *Heartbeat*
 
diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go
index c30203dea16b..76e7e8b12f80 100644
--- a/filebeat/beater/filebeat.go
+++ b/filebeat/beater/filebeat.go
@@ -326,7 +326,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
 
 	outDone := make(chan struct{}) // outDone closes down all active pipeline connections
 	crawler, err := crawler.New(
-		channel.NewOutletFactory(outDone, wgEvents).Create,
+		channel.NewOutletFactory(outDone, wgEvents, b.Info).Create,
 		config.Inputs,
 		b.Info.Version,
 		fb.done,
diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go
index 3d584df38f5b..ebd5983a3f7b 100644
--- a/filebeat/channel/connector.go
+++ b/filebeat/channel/connector.go
@@ -18,8 +18,11 @@
 package channel
 
 import (
+	"fmt"
+
 	"github.com/elastic/beats/libbeat/beat"
 	"github.com/elastic/beats/libbeat/common"
+	"github.com/elastic/beats/libbeat/common/fmtstr"
 	"github.com/elastic/beats/libbeat/processors"
 )
 
@@ -31,6 +34,14 @@ type pipelineConnector struct {
 	pipeline beat.Pipeline
 }
 
+// addFormattedIndex is a Processor to set an event's "raw_index" metadata field
+// with a given TimestampFormatString. The elasticsearch output interprets
+// that field as specifying the (raw string) index the event should be sent to;
+// in other outputs it is just included in the metadata.
+type addFormattedIndex struct {
+	formatString *fmtstr.TimestampFormatString
+}
+
 // Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function.
 func (fn ConnectorFunc) Connect(cfg *common.Config) (Outleter, error) {
 	return fn(cfg, beat.ClientConfig{})
@@ -51,24 +62,11 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien
 		return nil, err
 	}
 
-	var err error
-	var userProcessors beat.ProcessorList
-
-	userProcessors, err = processors.New(config.Processors)
+	procs, err := processorsForConfig(c.parent.beatInfo, config, clientCfg)
 	if err != nil {
 		return nil, err
 	}
 
-	if lst := clientCfg.Processing.Processor; lst != nil {
-		if len(userProcessors.All()) == 0 {
-			userProcessors = lst
-		} else if orig := lst.All(); len(orig) > 0 {
-			newLst := processors.NewList(nil)
-			newLst.List = append(newLst.List, lst, userProcessors)
-			userProcessors = newLst
-		}
-	}
-
 	setOptional := func(to common.MapStr, key string, value string) {
 		if value != "" {
 			to.Put(key, value)
@@ -105,7 +103,7 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien
 	clientCfg.Processing.EventMetadata = config.EventMetadata
 	clientCfg.Processing.Meta = meta
 	clientCfg.Processing.Fields = fields
-	clientCfg.Processing.Processor = userProcessors
+	clientCfg.Processing.Processor = procs
 	clientCfg.Processing.KeepNull = config.KeepNull
 	client, err := c.pipeline.ConnectWith(clientCfg)
 	if err != nil {
@@ -118,3 +116,64 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien
 	}
 	return outlet, nil
 }
+
+// processorsForConfig assembles the Processors for a pipelineConnector.
+func processorsForConfig(
+	beatInfo beat.Info, config inputOutletConfig, clientCfg beat.ClientConfig,
+) (*processors.Processors, error) {
+	procs := processors.NewList(nil)
+
+	// Processor ordering is important:
+	// 1. Index configuration
+	if !config.Index.IsEmpty() {
+		staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
+		timestampFormat, err :=
+			fmtstr.NewTimestampFormatString(&config.Index, staticFields)
+		if err != nil {
+			return nil, err
+		}
+		indexProcessor := &addFormattedIndex{timestampFormat}
+		procs.List = append(procs.List, indexProcessor)
+	}
+
+	// 2. ClientConfig processors
+	if lst := clientCfg.Processing.Processor; lst != nil {
+		procs.List = append(procs.List, lst)
+	}
+
+	// 3. User processors
+	userProcessors, err := processors.New(config.Processors)
+	if err != nil {
+		return nil, err
+	}
+	// Subtlety: it is important here that we append the individual elements of
+	// userProcessors, rather than userProcessors itself, even though
+	// userProcessors implements the processors.Processor interface. This is
+	// because the contents of what we return are later pulled out into a
+	// processing.group rather than a processors.Processors, and the two have
+	// different error semantics: processors.Processors aborts processing on
+	// any error, whereas processing.group only aborts on fatal errors. The
+	// latter is the most common behavior, and the one we are preserving here for
+	// backwards compatibility.
+	// We are unhappy about this and have plans to fix this inconsistency at a
+	// higher level, but for now we need to respect the existing semantics.
+	procs.List = append(procs.List, userProcessors.List...)
+	return procs, nil
+}
+
+func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) {
+	index, err := p.formatString.Run(event.Timestamp)
+	if err != nil {
+		return nil, err
+	}
+
+	if event.Meta == nil {
+		event.Meta = common.MapStr{}
+	}
+	event.Meta["raw_index"] = index
+	return event, nil
+}
+
+func (p *addFormattedIndex) String() string {
+	return fmt.Sprintf("add_index_pattern=%v", p.formatString)
+}
diff --git a/filebeat/channel/connector_test.go b/filebeat/channel/connector_test.go
new file mode 100644
index 000000000000..4708a7e45a5a
--- /dev/null
+++ b/filebeat/channel/connector_test.go
@@ -0,0 +1,213 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package channel
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/elastic/beats/libbeat/beat"
+	"github.com/elastic/beats/libbeat/common"
+	"github.com/elastic/beats/libbeat/processors"
+	"github.com/elastic/beats/libbeat/processors/actions"
+)
+
+func TestProcessorsForConfig(t *testing.T) {
+	testCases := map[string]struct {
+		beatInfo       beat.Info
+		configStr      string
+		clientCfg      beat.ClientConfig
+		event          beat.Event
+		expectedFields map[string]string
+	}{
+		"Simple static index": {
+			configStr: "index: 'test'",
+			expectedFields: map[string]string{
+				"@metadata.raw_index": "test",
+			},
+		},
+		"Index with agent info + timestamp": {
+			beatInfo:  beat.Info{Beat: "TestBeat", Version: "3.9.27"},
+			configStr: "index: 'beat-%{[agent.name]}-%{[agent.version]}-%{+yyyy.MM.dd}'",
+			event:     beat.Event{Timestamp: time.Date(1999, time.December, 31, 23, 0, 0, 0, time.UTC)},
+			expectedFields: map[string]string{
+				"@metadata.raw_index": "beat-TestBeat-3.9.27-1999.12.31",
+			},
+		},
+		"Set index in ClientConfig": {
+			clientCfg: beat.ClientConfig{
+				Processing: beat.ProcessingConfig{
+					Processor: makeProcessors(&setRawIndex{"clientCfgIndex"}),
+				},
+			},
+			expectedFields: map[string]string{
+				"@metadata.raw_index": "clientCfgIndex",
+			},
+		},
+		"ClientConfig processor runs after beat input Index": {
+			configStr: "index: 'test'",
+			clientCfg: beat.ClientConfig{
+				Processing: beat.ProcessingConfig{
+					Processor: makeProcessors(&setRawIndex{"clientCfgIndex"}),
+				},
+			},
+			expectedFields: map[string]string{
+				"@metadata.raw_index": "clientCfgIndex",
+			},
+		},
+		"Set field in input config": {
+			configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`,
+			expectedFields: map[string]string{
+				"fields.testField": "inputConfig",
+			},
+		},
+		"Set field in ClientConfig": {
+			clientCfg: beat.ClientConfig{
+				Processing: beat.ProcessingConfig{
+					Processor: makeProcessors(actions.NewAddFields(common.MapStr{
+						"fields": common.MapStr{"testField": "clientConfig"},
+					}, false)),
+				},
+			},
+			expectedFields: map[string]string{
+				"fields.testField": "clientConfig",
+			},
+		},
+		"Input config processors run after ClientConfig": {
+			configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`,
+			clientCfg: beat.ClientConfig{
+				Processing: beat.ProcessingConfig{
+					Processor: makeProcessors(actions.NewAddFields(common.MapStr{
+						"fields": common.MapStr{"testField": "clientConfig"},
+					}, false)),
+				},
+			},
+			expectedFields: map[string]string{
+				"fields.testField": "inputConfig",
+			},
+		},
+	}
+	for description, test := range testCases {
+		if test.event.Fields == nil {
+			test.event.Fields = common.MapStr{}
+		}
+		config, err := outletConfigFromString(test.configStr)
+		if err != nil {
+			t.Errorf("[%s] %v", description, err)
+			continue
+		}
+		processors, err := processorsForConfig(test.beatInfo, config, test.clientCfg)
+		if err != nil {
+			t.Errorf("[%s] %v", description, err)
+			continue
+		}
+		processedEvent, err := processors.Run(&test.event)
+		// We don't check if err != nil, because we are testing the final outcome
+		// of running the processors, including when some of them fail.
+		if processedEvent == nil {
+			t.Errorf("[%s] Unexpected fatal error running processors: %v\n",
+				description, err)
+		}
+		for key, value := range test.expectedFields {
+			field, err := processedEvent.GetValue(key)
+			if err != nil {
+				t.Errorf("[%s] Couldn't get field %s from event: %v", description, key, err)
+				continue
+			}
+			assert.Equal(t, field, value)
+			fieldStr, ok := field.(string)
+			if !ok {
+				// Note that requiring a string here is just to simplify the test setup,
+				// not a requirement of the underlying api.
+				t.Errorf("[%s] Field [%s] should be a string", description, key)
+				continue
+			}
+			if fieldStr != value {
+				t.Errorf("[%s] Event field [%s]: expected [%s], got [%s]", description, key, value, fieldStr)
+			}
+		}
+	}
+}
+
+func TestProcessorsForConfigIsFlat(t *testing.T) {
+	// This test is regrettable, and exists because of inconsistencies in
+	// processor handling between processors.Processors and processing.group
+	// (which implements beat.ProcessorList) -- see processorsForConfig for
+	// details. The upshot is that, for now, if the input configuration specifies
+	// processors, they must be returned as direct children of the resulting
+	// processors.Processors (rather than being collected in additional tree
+	// structure).
+	// This test should be removed once we have a more consistent mechanism for
+	// collecting and running processors.
+	configStr := `processors:
+- add_fields: {fields: {testField: value}}
+- add_fields: {fields: {testField2: stuff}}`
+	config, err := outletConfigFromString(configStr)
+	if err != nil {
+		t.Fatal(err)
+	}
+	processors, err := processorsForConfig(
+		beat.Info{}, config, beat.ClientConfig{})
+	if err != nil {
+		t.Fatal(err)
+	}
+	assert.Equal(t, 2, len(processors.List))
+}
+
+// setRawIndex is a bare-bones processor to set the raw_index field to a
+// constant string in the event metadata. It is used to test order of operations
+// for processorsForConfig.
+type setRawIndex struct {
+	indexStr string
+}
+
+func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) {
+	if event.Meta == nil {
+		event.Meta = common.MapStr{}
+	}
+	event.Meta["raw_index"] = p.indexStr
+	return event, nil
+}
+
+func (p *setRawIndex) String() string {
+	return fmt.Sprintf("set_raw_index=%v", p.indexStr)
+}
+
+// Helper function to convert from YML input string to an unpacked
+// inputOutletConfig
+func outletConfigFromString(s string) (inputOutletConfig, error) {
+	config := inputOutletConfig{}
+	cfg, err := common.NewConfigFrom(s)
+	if err != nil {
+		return config, err
+	}
+	if err := cfg.Unpack(&config); err != nil {
+		return config, err
+	}
+	return config, nil
+}
+
+// makeProcessors wraps one or more bare Processor objects in Processors.
+func makeProcessors(procs ...processors.Processor) *processors.Processors {
+	procList := processors.NewList(nil)
+	procList.List = procs
+	return procList
+}
diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go
index 1fe09039bbeb..d4373beec19d 100644
--- a/filebeat/channel/factory.go
+++ b/filebeat/channel/factory.go
@@ -20,6 +20,7 @@ package channel
 import (
 	"github.com/elastic/beats/libbeat/beat"
 	"github.com/elastic/beats/libbeat/common"
+	"github.com/elastic/beats/libbeat/common/fmtstr"
 	"github.com/elastic/beats/libbeat/processors"
 )
 
@@ -28,6 +29,7 @@ type OutletFactory struct {
 
 	eventer  beat.ClientEventer
 	wgEvents eventCounter
+	beatInfo beat.Info
 }
 
 type eventCounter interface {
@@ -57,8 +59,8 @@ type inputOutletConfig struct {
 	Fileset string `config:"_fileset_name"` // hidden setting
 
 	// Output meta data settings
-	Pipeline string `config:"pipeline"` // ES Ingest pipeline name
-
+	Pipeline string                   `config:"pipeline"` // ES Ingest pipeline name
+	Index    fmtstr.EventFormatString `config:"index"`    // ES output index pattern
 }
 
 // NewOutletFactory creates a new outlet factory for
@@ -66,10 +68,12 @@ type inputOutletConfig struct {
 func NewOutletFactory(
 	done <-chan struct{},
 	wgEvents eventCounter,
+	beatInfo beat.Info,
 ) *OutletFactory {
 	o := &OutletFactory{
 		done:     done,
 		wgEvents: wgEvents,
+		beatInfo: beatInfo,
 	}
 
 	if wgEvents != nil {
diff --git a/filebeat/docs/inputs/input-common-options.asciidoc b/filebeat/docs/inputs/input-common-options.asciidoc
index 6d09ff493b95..8e08a8074e4f 100644
--- a/filebeat/docs/inputs/input-common-options.asciidoc
+++ b/filebeat/docs/inputs/input-common-options.asciidoc
@@ -64,7 +64,7 @@ If this option is set to true, the custom
 <<{beatname_lc}-input-{type}-fields,fields>> are stored as top-level fields in
 the output document instead of being grouped under a `fields` sub-dictionary. If
 the custom field names conflict with other field names added by {beatname_uc},
-then the custom fields overwrite the other fields. 
+then the custom fields overwrite the other fields.
 
 [float]
 ===== `processors`
@@ -89,3 +89,15 @@ input is used.
 
 If this option is set to true, fields with `null` values will be published in
 the output document. By default, `keep_null` is set to `false`.
+
+[float]
+===== `index`
+
+If present, this formatted string overrides the index for events from this input
+(for elasticsearch outputs), or sets the `raw_index` field of the event's
+metadata (for other outputs). This string can only refer to the agent name and
+version and the event timestamp; for access to dynamic fields, use
+`output.elasticsearch.index` or a processor.
+
+Example value: `"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"` might
+expand to `"filebeat-myindex-2019.11.01"`.
diff --git a/filebeat/input/input.go b/filebeat/input/input.go
index 98eea51db8a5..e1931d4e4bfa 100644
--- a/filebeat/input/input.go
+++ b/filebeat/input/input.go
@@ -60,7 +60,7 @@ type Runner struct {
 // New instantiates a new Runner
 func New(
 	conf *common.Config,
-	outlet channel.Connector,
+	connector channel.Connector,
 	beatDone chan struct{},
 	states []file.State,
 	dynFields *common.MapStrPointer,
@@ -99,7 +99,7 @@ func New(
 		Meta:          nil,
 	}
 	var ipt Input
-	ipt, err = f(conf, outlet, context)
+	ipt, err = f(conf, connector, context)
 	if err != nil {
 		return input, err
 	}
diff --git a/filebeat/kafka.yml b/filebeat/kafka.yml
new file mode 100644
index 000000000000..81842758971d
--- /dev/null
+++ b/filebeat/kafka.yml
@@ -0,0 +1,8 @@
+version: '2.3'
+services:
+  kafka:
+    build: ${ES_BEATS}/testing/environments/docker/kafka
+    expose:
+      - 9092
+    environment:
+      - ADVERTISED_HOST=kafka
diff --git a/libbeat/common/fmtstr/formattimestamp.go b/libbeat/common/fmtstr/formattimestamp.go
new file mode 100644
index 000000000000..68c22d636496
--- /dev/null
+++ b/libbeat/common/fmtstr/formattimestamp.go
@@ -0,0 +1,80 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fmtstr
+
+import (
+	"time"
+
+	"github.com/elastic/beats/libbeat/beat"
+	"github.com/elastic/beats/libbeat/common"
+)
+
+// TimestampFormatString is a wrapper around EventFormatString for the
+// common special case where the format expression should only have access to
+// shared static fields (typically agent / version) and the event timestamp.
+type TimestampFormatString struct {
+	eventFormatString *EventFormatString
+	fields            common.MapStr
+}
+
+// NewTimestampFormatString creates from the given event format string a
+// TimestampFormatString that includes only the given static fields and
+// a timestamp.
+func NewTimestampFormatString(
+	eventFormatString *EventFormatString, staticFields common.MapStr,
+) (*TimestampFormatString, error) {
+	return &TimestampFormatString{
+		eventFormatString: eventFormatString,
+		fields:            staticFields.Clone(),
+	}, nil
+}
+
+// FieldsForBeat returns a common.MapStr with the given beat name and
+// version assigned to their standard field names.
+func FieldsForBeat(beat string, version string) common.MapStr {
+	return common.MapStr{
+		// beat object was left in for backward compatibility reason for older configs.
+		"beat": common.MapStr{
+			"name":    beat,
+			"version": version,
+		},
+		"agent": common.MapStr{
+			"name":    beat,
+			"version": version,
+		},
+		// For the Beats that have an observer role
+		"observer": common.MapStr{
+			"name":    beat,
+			"version": version,
+		},
+	}
+}
+
+// Run executes the format string returning a new expanded string or an error
+// if execution or event field expansion fails.
+func (fs *TimestampFormatString) Run(timestamp time.Time) (string, error) {
+	placeholderEvent := &beat.Event{
+		Fields:    fs.fields,
+		Timestamp: timestamp,
+	}
+	return fs.eventFormatString.Run(placeholderEvent)
+}
+
+func (fs *TimestampFormatString) String() string {
+	return fs.eventFormatString.expression
+}
diff --git a/libbeat/common/fmtstr/formattimestamp_test.go b/libbeat/common/fmtstr/formattimestamp_test.go
new file mode 100644
index 000000000000..d194f597ad5f
--- /dev/null
+++ b/libbeat/common/fmtstr/formattimestamp_test.go
@@ -0,0 +1,108 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fmtstr
+
+import (
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/elastic/beats/libbeat/common"
+)
+
+func TestTimestampFormatString(t *testing.T) {
+	tests := []struct {
+		title        string
+		format       string
+		staticFields common.MapStr
+		timestamp    time.Time
+		expected     string
+	}{
+		{
+			"empty string",
+			"",
+			nil,
+			time.Time{},
+			"",
+		},
+		{
+			"no fields configured",
+			"format string",
+			nil,
+			time.Time{},
+			"format string",
+		},
+		{
+			"expand field",
+			"%{[key]}",
+			common.MapStr{"key": "value"},
+			time.Time{},
+			"value",
+		},
+		{
+			"expand with default",
+			"%{[key]:default}",
+			nil,
+			time.Time{},
+			"default",
+		},
+		{
+			"expand nested field",
+			"%{[nested.key]}",
+			common.MapStr{"nested": common.MapStr{"key": "value"}},
+			time.Time{},
+			"value",
+		},
+		{
+			"test timestamp formatter",
+			"%{[key]}: %{+YYYY.MM.dd}",
+			common.MapStr{"key": "timestamp"},
+			time.Date(2015, 5, 1, 20, 12, 34, 0, time.Local),
+			"timestamp: 2015.05.01",
+		},
+		{
+			"test timestamp formatter",
+			"%{[@timestamp]}: %{+YYYY.MM.dd}",
+			common.MapStr{"key": "timestamp"},
+			time.Date(2015, 5, 1, 20, 12, 34, 0, time.Local),
+			"2015-05-01T20:12:34.000Z: 2015.05.01",
+		},
+	}
+
+	for i, test := range tests {
+		t.Logf("test(%v): %v", i, test.title)
+
+		efs, err := CompileEvent(test.format)
+		if err != nil {
+			t.Error(err)
+			continue
+		}
+
+		fs, err := NewTimestampFormatString(efs, test.staticFields)
+		if err != nil {
+			t.Error(err)
+			continue
+		}
+
+		actual, err := fs.Run(test.timestamp)
+
+		assert.NoError(t, err)
+		assert.Equal(t, test.expected, actual)
+	}
+}
diff --git a/libbeat/idxmgmt/ilm/ilm.go b/libbeat/idxmgmt/ilm/ilm.go
index 07c924214ece..84ca92c432e1 100644
--- a/libbeat/idxmgmt/ilm/ilm.go
+++ b/libbeat/idxmgmt/ilm/ilm.go
@@ -142,23 +142,9 @@ func NoopSupport(_ *logp.Logger, info beat.Info, config *common.Config) (Support
 }
 
 func applyStaticFmtstr(info beat.Info, fmt *fmtstr.EventFormatString) (string, error) {
-	return fmt.Run(&beat.Event{
-		Fields: common.MapStr{
-			// beat object was left in for backward compatibility reason for older configs.
-			"beat": common.MapStr{
-				"name":    info.Beat,
-				"version": info.Version,
-			},
-			"agent": common.MapStr{
-				"name":    info.Beat,
-				"version": info.Version,
-			},
-			// For the Beats that have an observer role
-			"observer": common.MapStr{
-				"name":    info.Beat,
-				"version": info.Version,
-			},
-		},
-		Timestamp: time.Now(),
-	})
+	return fmt.Run(
+		&beat.Event{
+			Fields:    fmtstr.FieldsForBeat(info.Beat, info.Version),
+			Timestamp: time.Now(),
+		})
 }
diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go
index 13d1a201fcf4..f6951d729add 100644
--- a/libbeat/idxmgmt/std.go
+++ b/libbeat/idxmgmt/std.go
@@ -54,12 +54,16 @@ type indexManager struct {
 	assets        Asseter
 }
 
-type indexSelector outil.Selector
+type indexSelector struct {
+	sel      outil.Selector
+	beatInfo beat.Info
+}
 
 type ilmIndexSelector struct {
-	index outil.Selector
-	alias outil.Selector
-	st    *indexState
+	index    outil.Selector
+	alias    outil.Selector
+	st       *indexState
+	beatInfo beat.Info
 }
 
 type componentType uint8
@@ -201,7 +205,7 @@ func (s *indexSupport) BuildSelector(cfg *common.Config) (outputs.IndexSelector,
 	}
 
 	if mode != ilm.ModeAuto {
-		return indexSelector(indexSel), nil
+		return indexSelector{indexSel, s.info}, nil
 	}
 
 	selCfg.SetString("index", -1, alias)
@@ -321,7 +325,7 @@ func (m *indexManager) setupWithILM() (bool, error) {
 }
 
 func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) {
-	if idx := getEventCustomIndex(evt); idx != "" {
+	if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" {
 		return idx, nil
 	}
 
@@ -335,13 +339,13 @@ func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) {
 }
 
 func (s indexSelector) Select(evt *beat.Event) (string, error) {
-	if idx := getEventCustomIndex(evt); idx != "" {
+	if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" {
 		return idx, nil
 	}
-	return outil.Selector(s).Select(evt)
+	return s.sel.Select(evt)
 }
 
-func getEventCustomIndex(evt *beat.Event) string {
+func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string {
 	if len(evt.Meta) == 0 {
 		return ""
 	}
@@ -360,6 +364,16 @@ func getEventCustomIndex(evt *beat.Event) string {
 		}
 	}
 
+	// This is functionally identical to Meta["alias"], returning the overriding
+	// metadata as the index name if present. It is currently used by Filebeat
+	// to send the index for particular inputs to formatted string templates,
+	// which are then expanded by a processor to the "raw_index" field.
+	if tmp := evt.Meta["raw_index"]; tmp != nil {
+		if idx, ok := tmp.(string); ok {
+			return idx
+		}
+	}
+
 	return ""
 }