From 88559cc42aa40e4ef4213ca9dd3486ffdfc329d1 Mon Sep 17 00:00:00 2001
From: Vytenis Darulis <vytenis@uber.com>
Date: Thu, 4 Mar 2021 12:40:25 -0500
Subject: [PATCH 1/3] Optimize StagedMetadatas conversion

---
 src/metrics/aggregation/id.go                 |  19 ++-
 src/metrics/aggregation/id_test.go            |   8 +-
 .../protobuf/unaggregated_encoder_test.go     |  30 ++---
 .../generated/proto/metricpb/composite.pb.go  |  20 +--
 src/metrics/metadata/metadata.go              |  94 +++++++++++---
 .../metadata/metadata_benchmark_test.go       |  83 +++++++++++++
 src/metrics/metadata/metadata_test.go         |  71 ++++++++++-
 src/metrics/metric/aggregated/types_test.go   |  18 ++-
 src/metrics/pipeline/applied/type.go          | 115 ++++++++++++------
 src/metrics/pipeline/applied/type_test.go     |  37 +++---
 src/metrics/policy/storage_policy.go          |  24 +++-
 src/metrics/transformation/type.go            |  44 +++----
 src/x/time/unit.go                            |  25 +++-
 13 files changed, 440 insertions(+), 148 deletions(-)

diff --git a/src/metrics/aggregation/id.go b/src/metrics/aggregation/id.go
index ceef5d6deb..f6e28d5e8e 100644
--- a/src/metrics/aggregation/id.go
+++ b/src/metrics/aggregation/id.go
@@ -141,21 +141,13 @@ func (id *ID) UnmarshalYAML(unmarshal func(interface{}) error) error {
 }
 
 // ToProto converts the aggregation id to a protobuf message in place.
-func (id ID) ToProto(pb *aggregationpb.AggregationID) error {
-	if IDLen != 1 {
-		return fmt.Errorf("id length %d cannot be represented by a single integer", IDLen)
-	}
+func (id ID) ToProto(pb *aggregationpb.AggregationID) {
 	pb.Id = id[0]
-	return nil
 }
 
 // FromProto converts the protobuf message to an aggregation id in place.
-func (id *ID) FromProto(pb aggregationpb.AggregationID) error {
-	if IDLen != 1 {
-		return fmt.Errorf("id length %d cannot be represented by a single integer", IDLen)
-	}
+func (id *ID) FromProto(pb aggregationpb.AggregationID) {
 	(*id)[0] = pb.Id
-	return nil
 }
 
 // CompressTypes compresses a list of aggregation types to an ID.
@@ -172,3 +164,10 @@ func MustCompressTypes(aggTypes ...Type) ID {
 	}
 	return res
 }
+
+func init() {
+	if IDLen != 1 {
+		// changing this const requires extensive surgery
+		panic(fmt.Sprintf("id length %d cannot be represented by a single integer", IDLen))
+	}
+}
diff --git a/src/metrics/aggregation/id_test.go b/src/metrics/aggregation/id_test.go
index 20eef73b46..9e2615c3f9 100644
--- a/src/metrics/aggregation/id_test.go
+++ b/src/metrics/aggregation/id_test.go
@@ -39,13 +39,13 @@ var (
 
 func TestIDToProto(t *testing.T) {
 	var pb aggregationpb.AggregationID
-	require.NoError(t, testID.ToProto(&pb))
+	testID.ToProto(&pb)
 	require.Equal(t, testIDProto, pb)
 }
 
 func TestIDFromProto(t *testing.T) {
 	var res ID
-	require.NoError(t, res.FromProto(testIDProto))
+	res.FromProto(testIDProto)
 	require.Equal(t, testID, res)
 }
 
@@ -54,8 +54,8 @@ func TestIDRoundTrip(t *testing.T) {
 		pb  aggregationpb.AggregationID
 		res ID
 	)
-	require.NoError(t, testID.ToProto(&pb))
-	require.NoError(t, res.FromProto(pb))
+	testID.ToProto(&pb)
+	res.FromProto(pb)
 	require.Equal(t, testID, res)
 }
 
diff --git a/src/metrics/encoding/protobuf/unaggregated_encoder_test.go b/src/metrics/encoding/protobuf/unaggregated_encoder_test.go
index 7c3ad0026d..2b9063d31b 100644
--- a/src/metrics/encoding/protobuf/unaggregated_encoder_test.go
+++ b/src/metrics/encoding/protobuf/unaggregated_encoder_test.go
@@ -253,7 +253,7 @@ var (
 	}
 	testPassthroughMetadata1 = policy.NewStoragePolicy(time.Minute, xtime.Minute, 12*time.Hour)
 	testPassthroughMetadata2 = policy.NewStoragePolicy(10*time.Second, xtime.Second, 6*time.Hour)
-	testCounter1Proto = metricpb.Counter{
+	testCounter1Proto        = metricpb.Counter{
 		Id:    []byte("testCounter1"),
 		Value: 123,
 	}
@@ -673,11 +673,11 @@ func TestUnaggregatedEncoderEncodeBatchTimerWithMetadatas(t *testing.T) {
 	enc.(*unaggregatedEncoder).encodeMessageFn = func(pb metricpb.MetricWithMetadatas) error { pbRes = pb; return nil }
 	for i, input := range inputs {
 		require.NoError(t, enc.EncodeMessage(encoding.UnaggregatedMessageUnion{
-			Type: encoding.BatchTimerWithMetadatasType,
+			Type:                    encoding.BatchTimerWithMetadatasType,
 			BatchTimerWithMetadatas: input,
 		}))
 		expectedProto := metricpb.MetricWithMetadatas{
-			Type: metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS,
+			Type:                    metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS,
 			BatchTimerWithMetadatas: &expected[i],
 		}
 		expectedMsgSize := expectedProto.Size()
@@ -793,11 +793,11 @@ func TestUnaggregatedEncoderEncodeForwardedMetricWithMetadata(t *testing.T) {
 	enc.(*unaggregatedEncoder).encodeMessageFn = func(pb metricpb.MetricWithMetadatas) error { pbRes = pb; return nil }
 	for i, input := range inputs {
 		require.NoError(t, enc.EncodeMessage(encoding.UnaggregatedMessageUnion{
-			Type: encoding.ForwardedMetricWithMetadataType,
+			Type:                        encoding.ForwardedMetricWithMetadataType,
 			ForwardedMetricWithMetadata: input,
 		}))
 		expectedProto := metricpb.MetricWithMetadatas{
-			Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
+			Type:                        metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
 			ForwardedMetricWithMetadata: &expected[i],
 		}
 		expectedMsgSize := expectedProto.Size()
@@ -853,11 +853,11 @@ func TestUnaggregatedEncoderEncodeTimedMetricWithMetadata(t *testing.T) {
 	enc.(*unaggregatedEncoder).encodeMessageFn = func(pb metricpb.MetricWithMetadatas) error { pbRes = pb; return nil }
 	for i, input := range inputs {
 		require.NoError(t, enc.EncodeMessage(encoding.UnaggregatedMessageUnion{
-			Type: encoding.TimedMetricWithMetadataType,
+			Type:                    encoding.TimedMetricWithMetadataType,
 			TimedMetricWithMetadata: input,
 		}))
 		expectedProto := metricpb.MetricWithMetadatas{
-			Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA,
+			Type:                    metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA,
 			TimedMetricWithMetadata: &expected[i],
 		}
 		expectedMsgSize := expectedProto.Size()
@@ -1121,12 +1121,12 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
 				}
 			case unaggregated.BatchTimerWithMetadatas:
 				msg = encoding.UnaggregatedMessageUnion{
-					Type: encoding.BatchTimerWithMetadatasType,
+					Type:                    encoding.BatchTimerWithMetadatasType,
 					BatchTimerWithMetadatas: input,
 				}
 				res := expected[i].(metricpb.BatchTimerWithMetadatas)
 				expectedProto = metricpb.MetricWithMetadatas{
-					Type: metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS,
+					Type:                    metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS,
 					BatchTimerWithMetadatas: &res,
 				}
 			case unaggregated.GaugeWithMetadatas:
@@ -1141,32 +1141,32 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
 				}
 			case aggregated.ForwardedMetricWithMetadata:
 				msg = encoding.UnaggregatedMessageUnion{
-					Type: encoding.ForwardedMetricWithMetadataType,
+					Type:                        encoding.ForwardedMetricWithMetadataType,
 					ForwardedMetricWithMetadata: input,
 				}
 				res := expected[i].(metricpb.ForwardedMetricWithMetadata)
 				expectedProto = metricpb.MetricWithMetadatas{
-					Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
+					Type:                        metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
 					ForwardedMetricWithMetadata: &res,
 				}
 			case aggregated.TimedMetricWithMetadata:
 				msg = encoding.UnaggregatedMessageUnion{
-					Type: encoding.TimedMetricWithMetadataType,
+					Type:                    encoding.TimedMetricWithMetadataType,
 					TimedMetricWithMetadata: input,
 				}
 				res := expected[i].(metricpb.TimedMetricWithMetadata)
 				expectedProto = metricpb.MetricWithMetadatas{
-					Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA,
+					Type:                    metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA,
 					TimedMetricWithMetadata: &res,
 				}
 			case aggregated.PassthroughMetricWithMetadata:
 				msg = encoding.UnaggregatedMessageUnion{
-					Type: encoding.PassthroughMetricWithMetadataType,
+					Type:                          encoding.PassthroughMetricWithMetadataType,
 					PassthroughMetricWithMetadata: input,
 				}
 				res := expected[i].(metricpb.TimedMetricWithStoragePolicy)
 				expectedProto = metricpb.MetricWithMetadatas{
-					Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_STORAGE_POLICY,
+					Type:                         metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_STORAGE_POLICY,
 					TimedMetricWithStoragePolicy: &res,
 				}
 			default:
diff --git a/src/metrics/generated/proto/metricpb/composite.pb.go b/src/metrics/generated/proto/metricpb/composite.pb.go
index e364a562f3..20af477ce2 100644
--- a/src/metrics/generated/proto/metricpb/composite.pb.go
+++ b/src/metrics/generated/proto/metricpb/composite.pb.go
@@ -143,10 +143,12 @@ type BatchTimerWithMetadatas struct {
 	Metadatas  StagedMetadatas `protobuf:"bytes,2,opt,name=metadatas" json:"metadatas"`
 }
 
-func (m *BatchTimerWithMetadatas) Reset()                    { *m = BatchTimerWithMetadatas{} }
-func (m *BatchTimerWithMetadatas) String() string            { return proto.CompactTextString(m) }
-func (*BatchTimerWithMetadatas) ProtoMessage()               {}
-func (*BatchTimerWithMetadatas) Descriptor() ([]byte, []int) { return fileDescriptorComposite, []int{1} }
+func (m *BatchTimerWithMetadatas) Reset()         { *m = BatchTimerWithMetadatas{} }
+func (m *BatchTimerWithMetadatas) String() string { return proto.CompactTextString(m) }
+func (*BatchTimerWithMetadatas) ProtoMessage()    {}
+func (*BatchTimerWithMetadatas) Descriptor() ([]byte, []int) {
+	return fileDescriptorComposite, []int{1}
+}
 
 func (m *BatchTimerWithMetadatas) GetBatchTimer() BatchTimer {
 	if m != nil {
@@ -217,10 +219,12 @@ type TimedMetricWithMetadata struct {
 	Metadata TimedMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
 }
 
-func (m *TimedMetricWithMetadata) Reset()                    { *m = TimedMetricWithMetadata{} }
-func (m *TimedMetricWithMetadata) String() string            { return proto.CompactTextString(m) }
-func (*TimedMetricWithMetadata) ProtoMessage()               {}
-func (*TimedMetricWithMetadata) Descriptor() ([]byte, []int) { return fileDescriptorComposite, []int{4} }
+func (m *TimedMetricWithMetadata) Reset()         { *m = TimedMetricWithMetadata{} }
+func (m *TimedMetricWithMetadata) String() string { return proto.CompactTextString(m) }
+func (*TimedMetricWithMetadata) ProtoMessage()    {}
+func (*TimedMetricWithMetadata) Descriptor() ([]byte, []int) {
+	return fileDescriptorComposite, []int{4}
+}
 
 func (m *TimedMetricWithMetadata) GetMetric() TimedMetric {
 	if m != nil {
diff --git a/src/metrics/metadata/metadata.go b/src/metrics/metadata/metadata.go
index b69da3daa4..8ddfa997bc 100644
--- a/src/metrics/metadata/metadata.go
+++ b/src/metrics/metadata/metadata.go
@@ -135,9 +135,7 @@ func (m PipelineMetadata) Clone() PipelineMetadata {
 
 // ToProto converts the pipeline metadata to a protobuf message in place.
 func (m PipelineMetadata) ToProto(pb *metricpb.PipelineMetadata) error {
-	if err := m.AggregationID.ToProto(&pb.AggregationId); err != nil {
-		return err
-	}
+	m.AggregationID.ToProto(&pb.AggregationId)
 	if err := m.Pipeline.ToProto(&pb.Pipeline); err != nil {
 		return err
 	}
@@ -158,9 +156,7 @@ func (m PipelineMetadata) ToProto(pb *metricpb.PipelineMetadata) error {
 
 // FromProto converts the protobuf message to a pipeline metadata in place.
 func (m *PipelineMetadata) FromProto(pb metricpb.PipelineMetadata) error {
-	if err := m.AggregationID.FromProto(pb.AggregationId); err != nil {
-		return err
-	}
+	m.AggregationID.FromProto(pb.AggregationId)
 	if err := m.Pipeline.FromProto(pb.Pipeline); err != nil {
 		return err
 	}
@@ -358,9 +354,7 @@ type ForwardMetadata struct {
 
 // ToProto converts the forward metadata to a protobuf message in place.
 func (m ForwardMetadata) ToProto(pb *metricpb.ForwardMetadata) error {
-	if err := m.AggregationID.ToProto(&pb.AggregationId); err != nil {
-		return err
-	}
+	m.AggregationID.ToProto(&pb.AggregationId)
 	if err := m.StoragePolicy.ToProto(&pb.StoragePolicy); err != nil {
 		return err
 	}
@@ -374,9 +368,7 @@ func (m ForwardMetadata) ToProto(pb *metricpb.ForwardMetadata) error {
 
 // FromProto converts the protobuf message to a forward metadata in place.
 func (m *ForwardMetadata) FromProto(pb metricpb.ForwardMetadata) error {
-	if err := m.AggregationID.FromProto(pb.AggregationId); err != nil {
-		return err
-	}
+	m.AggregationID.FromProto(pb.AggregationId)
 	if err := m.StoragePolicy.FromProto(pb.StoragePolicy); err != nil {
 		return err
 	}
@@ -487,6 +479,7 @@ func (sms StagedMetadatas) ToProto(pb *metricpb.StagedMetadatas) error {
 }
 
 // FromProto converts the protobuf message to a staged metadatas in place.
+// This is an optimized method that merges some nested steps.
 func (sms *StagedMetadatas) FromProto(pb metricpb.StagedMetadatas) error {
 	numMetadatas := len(pb.Metadatas)
 	if cap(*sms) >= numMetadatas {
@@ -494,6 +487,75 @@ func (sms *StagedMetadatas) FromProto(pb metricpb.StagedMetadatas) error {
 	} else {
 		*sms = make([]StagedMetadata, numMetadatas)
 	}
+
+	for i := 0; i < numMetadatas; i++ {
+		metadata := &(*sms)[i]
+		metadataPb := &pb.Metadatas[i]
+		numPipelines := len(metadataPb.Metadata.Pipelines)
+
+		metadata.CutoverNanos = metadataPb.CutoverNanos
+		metadata.Tombstoned = metadataPb.Tombstoned
+
+		if cap(metadata.Pipelines) >= numPipelines {
+			metadata.Pipelines = metadata.Pipelines[:numPipelines]
+		} else {
+			metadata.Pipelines = make(PipelineMetadatas, numPipelines)
+		}
+
+		for j := 0; j < numPipelines; j++ {
+			var (
+				pipelinePb         = &metadataPb.Metadata.Pipelines[j]
+				pipeline           = &metadata.Pipelines[j]
+				numStoragePolicies = len(pipelinePb.StoragePolicies)
+				numOps             = len(pipelinePb.Pipeline.Ops)
+				err                error
+			)
+
+			pipeline.AggregationID[0] = pipelinePb.AggregationId.Id
+			pipeline.DropPolicy = policy.DropPolicy(pipelinePb.DropPolicy)
+
+			if len(pipeline.Tags) > 0 {
+				pipeline.Tags = pipeline.Tags[:0]
+			}
+			if len(pipeline.GraphitePrefix) > 0 {
+				pipeline.GraphitePrefix = pipeline.GraphitePrefix[:0]
+			}
+
+			if cap(pipeline.StoragePolicies) >= numStoragePolicies {
+				pipeline.StoragePolicies = pipeline.StoragePolicies[:numStoragePolicies]
+			} else {
+				pipeline.StoragePolicies = make([]policy.StoragePolicy, numStoragePolicies)
+			}
+
+			if cap(pipeline.Pipeline.Operations) >= numOps {
+				pipeline.Pipeline.Operations = pipeline.Pipeline.Operations[:numOps]
+			} else {
+				pipeline.Pipeline.Operations = make([]applied.OpUnion, numOps)
+			}
+
+			err = applied.OperationsFromProto(pipelinePb.Pipeline.Ops, pipeline.Pipeline.Operations)
+			if err != nil {
+				return err
+			}
+
+			err = policy.StoragePoliciesFromProto(pipelinePb.StoragePolicies, pipeline.StoragePolicies)
+			if err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}
+
+// fromProto is a non-optimized in place protobuf conversion method, used as a reference for tests.
+func (sms *StagedMetadatas) fromProto(pb metricpb.StagedMetadatas) error {
+	numMetadatas := len(pb.Metadatas)
+	if cap(*sms) >= numMetadatas {
+		*sms = (*sms)[:numMetadatas]
+	} else {
+		*sms = make([]StagedMetadata, numMetadatas)
+	}
 	for i := 0; i < numMetadatas; i++ {
 		if err := (*sms)[i].FromProto(pb.Metadatas[i]); err != nil {
 			return err
@@ -519,9 +581,7 @@ type TimedMetadata struct {
 
 // ToProto converts the timed metadata to a protobuf message in place.
 func (m TimedMetadata) ToProto(pb *metricpb.TimedMetadata) error {
-	if err := m.AggregationID.ToProto(&pb.AggregationId); err != nil {
-		return err
-	}
+	m.AggregationID.ToProto(&pb.AggregationId)
 	if err := m.StoragePolicy.ToProto(&pb.StoragePolicy); err != nil {
 		return err
 	}
@@ -530,9 +590,7 @@ func (m TimedMetadata) ToProto(pb *metricpb.TimedMetadata) error {
 
 // FromProto converts the protobuf message to a timed metadata in place.
 func (m *TimedMetadata) FromProto(pb metricpb.TimedMetadata) error {
-	if err := m.AggregationID.FromProto(pb.AggregationId); err != nil {
-		return err
-	}
+	m.AggregationID.FromProto(pb.AggregationId)
 	if err := m.StoragePolicy.FromProto(pb.StoragePolicy); err != nil {
 		return err
 	}
diff --git a/src/metrics/metadata/metadata_benchmark_test.go b/src/metrics/metadata/metadata_benchmark_test.go
index ff505e28ae..49099e55b7 100644
--- a/src/metrics/metadata/metadata_benchmark_test.go
+++ b/src/metrics/metadata/metadata_benchmark_test.go
@@ -25,6 +25,7 @@ import (
 	"testing"
 
 	"github.com/m3db/m3/src/metrics/aggregation"
+	"github.com/m3db/m3/src/metrics/generated/proto/metricpb"
 	"github.com/m3db/m3/src/metrics/policy"
 )
 
@@ -54,3 +55,85 @@ func BenchmarkMetadata_IsDefault(b *testing.B) {
 	}
 	runtime.KeepAlive(m)
 }
+
+func BenchmarkMetadata_FromProto(b *testing.B) {
+	var (
+		testAllPayload metricpb.StagedMetadatas
+		m              StagedMetadatas
+	)
+
+	testAllPayload.Metadatas = append(testAllPayload.Metadatas,
+		testSmallStagedMetadatasProto.Metadatas...)
+	testAllPayload.Metadatas = append(testAllPayload.Metadatas,
+		testLargeStagedMetadatasProto.Metadatas...)
+	testAllPayload.Metadatas = append(testAllPayload.Metadatas,
+		testSmallStagedMetadatasWithLargeStoragePoliciesProto.Metadatas...)
+
+	b.Run("large metadatas", func(b *testing.B) {
+		for i := 0; i < b.N; i++ {
+			if err := m.FromProto(testLargeStagedMetadatasProto); err != nil {
+				b.Fail()
+			}
+		}
+	})
+
+	b.Run("small metadatas", func(b *testing.B) {
+		for i := 0; i < b.N; i++ {
+			if err := m.FromProto(testSmallStagedMetadatasProto); err != nil {
+				b.Fail()
+			}
+		}
+	})
+
+	b.Run("storage policies", func(b *testing.B) {
+		for i := 0; i < b.N; i++ {
+			if err := m.FromProto(
+				testSmallStagedMetadatasWithLargeStoragePoliciesProto,
+			); err != nil {
+				b.Fail()
+			}
+		}
+	})
+
+	b.Run("all", func(b *testing.B) {
+		for i := 0; i < b.N; i++ {
+			if err := m.FromProto(testAllPayload); err != nil {
+				b.Fail()
+			}
+		}
+	})
+
+	b.Run("reference, large metadatas", func(b *testing.B) {
+		for i := 0; i < b.N; i++ {
+			if err := m.fromProto(testLargeStagedMetadatasProto); err != nil {
+				b.Fail()
+			}
+		}
+	})
+
+	b.Run("reference, small metadatas", func(b *testing.B) {
+		for i := 0; i < b.N; i++ {
+			if err := m.fromProto(testSmallStagedMetadatasProto); err != nil {
+				b.Fail()
+			}
+		}
+	})
+
+	b.Run("reference, storage policies", func(b *testing.B) {
+		for i := 0; i < b.N; i++ {
+			if err := m.fromProto(testSmallStagedMetadatasWithLargeStoragePoliciesProto); err != nil {
+				b.Fail()
+			}
+		}
+	})
+
+	b.Run("reference, all", func(b *testing.B) {
+		for i := 0; i < b.N; i++ {
+			if err := m.fromProto(testAllPayload); err != nil {
+				b.Fail()
+			}
+		}
+	})
+
+	runtime.KeepAlive(m)
+}
diff --git a/src/metrics/metadata/metadata_test.go b/src/metrics/metadata/metadata_test.go
index 7376747a07..475c952f07 100644
--- a/src/metrics/metadata/metadata_test.go
+++ b/src/metrics/metadata/metadata_test.go
@@ -413,6 +413,68 @@ var (
 			},
 		},
 	}
+	testSmallStagedMetadatasWithLargeStoragePoliciesProto = metricpb.StagedMetadatas{
+		Metadatas: []metricpb.StagedMetadata{
+			{
+				CutoverNanos: 4567,
+				Tombstoned:   true,
+				Metadata: metricpb.Metadata{
+					Pipelines: []metricpb.PipelineMetadata{
+						{
+							AggregationId: aggregationpb.AggregationID{Id: aggregation.MustCompressTypes(aggregation.Sum)[0]},
+							StoragePolicies: []policypb.StoragePolicy{
+								{
+									Resolution: policypb.Resolution{
+										WindowSize: time.Second.Nanoseconds(),
+										Precision:  time.Second.Nanoseconds(),
+									},
+									Retention: policypb.Retention{
+										Period: 10 * time.Second.Nanoseconds(),
+									},
+								},
+								{
+									Resolution: policypb.Resolution{
+										WindowSize: 10 * time.Second.Nanoseconds(),
+										Precision:  time.Second.Nanoseconds(),
+									},
+									Retention: policypb.Retention{
+										Period: time.Hour.Nanoseconds(),
+									},
+								},
+								{
+									Resolution: policypb.Resolution{
+										WindowSize: 10 * time.Minute.Nanoseconds(),
+										Precision:  time.Second.Nanoseconds(),
+									},
+									Retention: policypb.Retention{
+										Period: time.Minute.Nanoseconds(),
+									},
+								},
+								{
+									Resolution: policypb.Resolution{
+										WindowSize: 10 * time.Minute.Nanoseconds(),
+										Precision:  time.Second.Nanoseconds(),
+									},
+									Retention: policypb.Retention{
+										Period: time.Second.Nanoseconds(),
+									},
+								},
+								{
+									Resolution: policypb.Resolution{
+										WindowSize: 10 * time.Hour.Nanoseconds(),
+										Precision:  time.Second.Nanoseconds(),
+									},
+									Retention: policypb.Retention{
+										Period: time.Second.Nanoseconds(),
+									},
+								},
+							},
+						},
+					},
+				},
+			},
+		},
+	}
 	testLargeStagedMetadatasProto = metricpb.StagedMetadatas{
 		Metadatas: []metricpb.StagedMetadata{
 			{
@@ -1013,10 +1075,13 @@ func TestStagedMetadatasFromProto(t *testing.T) {
 	}
 
 	for _, input := range inputs {
-		var res StagedMetadatas
+		var resOpt, resReference StagedMetadatas
 		for i, pb := range input.sequence {
-			require.NoError(t, res.FromProto(pb))
-			require.Equal(t, input.expected[i], res)
+			require.NoError(t, resReference.fromProto(pb))
+			require.NoError(t, resOpt.FromProto(pb))
+			require.Equal(t, input.expected[i], resOpt)
+			require.Equal(t, input.expected[i], resReference)
+			require.Equal(t, resOpt, resReference)
 		}
 	}
 }
diff --git a/src/metrics/metric/aggregated/types_test.go b/src/metrics/metric/aggregated/types_test.go
index 9d27d8de02..a0760ce752 100644
--- a/src/metrics/metric/aggregated/types_test.go
+++ b/src/metrics/metric/aggregated/types_test.go
@@ -38,6 +38,7 @@ import (
 	"github.com/m3db/m3/src/metrics/transformation"
 	xtime "github.com/m3db/m3/src/x/time"
 
+	"github.com/google/go-cmp/cmp"
 	"github.com/stretchr/testify/require"
 )
 
@@ -365,13 +366,18 @@ func TestForwardedMetricWithMetadataFromProto(t *testing.T) {
 	}
 
 	var res ForwardedMetricWithMetadata
+	comparer := cmp.Comparer(spComparer)
 	for _, input := range inputs {
 		require.NoError(t, res.FromProto(&input.data))
 		expected := ForwardedMetricWithMetadata{
 			ForwardedMetric: input.expectedMetric,
 			ForwardMetadata: input.expectedMetadata,
 		}
-		require.Equal(t, expected, res)
+
+		if !cmp.Equal(expected, res, comparer) {
+			t.Log(cmp.Diff(expected, res, comparer))
+			t.Fail()
+		}
 	}
 }
 
@@ -416,6 +422,7 @@ func TestForwardedMetricWithMetadataRoundtrip(t *testing.T) {
 		res ForwardedMetricWithMetadata
 		pb  metricpb.ForwardedMetricWithMetadata
 	)
+	comparer := cmp.Comparer(spComparer)
 	for _, input := range inputs {
 		data := ForwardedMetricWithMetadata{
 			ForwardedMetric: input.metric,
@@ -423,6 +430,13 @@ func TestForwardedMetricWithMetadataRoundtrip(t *testing.T) {
 		}
 		require.NoError(t, data.ToProto(&pb))
 		require.NoError(t, res.FromProto(&pb))
-		require.Equal(t, data, res)
+		if !cmp.Equal(data, res, comparer) {
+			t.Log(cmp.Diff(data, res, comparer))
+			t.Fail()
+		}
 	}
 }
+
+func spComparer(x, y policy.StoragePolicy) bool {
+	return x.Equivalent(y)
+}
diff --git a/src/metrics/pipeline/applied/type.go b/src/metrics/pipeline/applied/type.go
index fd320f4d82..fd5ae22f65 100644
--- a/src/metrics/pipeline/applied/type.go
+++ b/src/metrics/pipeline/applied/type.go
@@ -28,13 +28,17 @@ import (
 	"github.com/m3db/m3/src/metrics/aggregation"
 	"github.com/m3db/m3/src/metrics/generated/proto/pipelinepb"
 	"github.com/m3db/m3/src/metrics/pipeline"
+	"github.com/m3db/m3/src/metrics/transformation"
 )
 
 var (
 	// DefaultPipeline is a default pipeline.
 	DefaultPipeline Pipeline
 
-	errNilAppliedRollupOpProto = errors.New("nil applied rollup op proto message")
+	errNilAppliedRollupOpProto  = errors.New("nil applied rollup op proto message")
+	errUnknownOpType            = errors.New("unknown op type")
+	errOperationsLengthMismatch = errors.New("operations list length does not match proto")
+	errNilTransformationOpProto = errors.New("nil transformation op proto message")
 )
 
 // RollupOp captures the rollup metadata after the operation is applied against a metric ID.
@@ -45,7 +49,7 @@ type RollupOp struct {
 	AggregationID aggregation.ID
 }
 
-// Equal determines whether two rollup operations are equal.
+// Equal determines whether two rollup Operations are equal.
 func (op RollupOp) Equal(other RollupOp) bool {
 	return op.AggregationID == other.AggregationID && bytes.Equal(op.ID, other.ID)
 }
@@ -63,9 +67,7 @@ func (op RollupOp) String() string {
 
 // ToProto converts the applied rollup op to a protobuf message in place.
 func (op RollupOp) ToProto(pb *pipelinepb.AppliedRollupOp) error {
-	if err := op.AggregationID.ToProto(&pb.AggregationId); err != nil {
-		return err
-	}
+	op.AggregationID.ToProto(&pb.AggregationId)
 	pb.Id = op.ID
 	return nil
 }
@@ -75,9 +77,7 @@ func (op *RollupOp) FromProto(pb *pipelinepb.AppliedRollupOp) error {
 	if pb == nil {
 		return errNilAppliedRollupOpProto
 	}
-	if err := op.AggregationID.FromProto(pb.AggregationId); err != nil {
-		return err
-	}
+	op.AggregationID.FromProto(pb.AggregationId)
 	op.ID = pb.Id
 	return nil
 }
@@ -143,7 +143,7 @@ func (u OpUnion) ToProto(pb *pipelinepb.AppliedPipelineOp) error {
 		pb.Rollup = &pipelinepb.AppliedRollupOp{}
 		return u.Rollup.ToProto(pb.Rollup)
 	default:
-		return fmt.Errorf("unknown op type: %v", u.Type)
+		return errUnknownOpType
 	}
 }
 
@@ -152,58 +152,62 @@ func (u *OpUnion) Reset() { *u = OpUnion{} }
 
 // FromProto converts the protobuf message to an applied pipeline op in place.
 func (u *OpUnion) FromProto(pb pipelinepb.AppliedPipelineOp) error {
-	u.Reset()
 	switch pb.Type {
 	case pipelinepb.AppliedPipelineOp_TRANSFORMATION:
 		u.Type = pipeline.TransformationOpType
+		if u.Rollup.ID != nil {
+			u.Rollup.ID = u.Rollup.ID[:0]
+		}
+		u.Rollup.AggregationID[0] = aggregation.DefaultID[0]
 		return u.Transformation.FromProto(pb.Transformation)
 	case pipelinepb.AppliedPipelineOp_ROLLUP:
 		u.Type = pipeline.RollupOpType
+		u.Transformation.Type = transformation.UnknownType
 		return u.Rollup.FromProto(pb.Rollup)
 	default:
-		return fmt.Errorf("unknown op type in proto: %v", pb.Type)
+		return errUnknownOpType
 	}
 }
 
 // Pipeline is a pipeline of operations.
 type Pipeline struct {
-	// a list of pipeline operations.
-	operations []OpUnion
+	// a list of pipeline Operations.
+	Operations []OpUnion
 }
 
 // NewPipeline creates a new pipeline.
 func NewPipeline(ops []OpUnion) Pipeline {
-	return Pipeline{operations: ops}
+	return Pipeline{Operations: ops}
 }
 
 // Len returns the number of steps in a pipeline.
-func (p Pipeline) Len() int { return len(p.operations) }
+func (p Pipeline) Len() int { return len(p.Operations) }
 
 // IsEmpty determines whether a pipeline is empty.
-func (p Pipeline) IsEmpty() bool { return len(p.operations) == 0 }
+func (p Pipeline) IsEmpty() bool { return len(p.Operations) == 0 }
 
 // At returns the operation at a given step.
-func (p Pipeline) At(i int) OpUnion { return p.operations[i] }
+func (p Pipeline) At(i int) OpUnion { return p.Operations[i] }
 
 // Equal determines whether two pipelines are equal.
 func (p Pipeline) Equal(other Pipeline) bool {
 	// keep in sync with OpUnion.Equal as go is terrible at inlining anything with a loop
-	if len(p.operations) != len(other.operations) {
+	if len(p.Operations) != len(other.Operations) {
 		return false
 	}
 
-	for i := 0; i < len(p.operations); i++ {
-		if p.operations[i].Type != other.operations[i].Type {
+	for i := 0; i < len(p.Operations); i++ {
+		if p.Operations[i].Type != other.Operations[i].Type {
 			return false
 		}
 		//nolint:exhaustive
-		switch p.operations[i].Type {
+		switch p.Operations[i].Type {
 		case pipeline.RollupOpType:
-			if !p.operations[i].Rollup.Equal(other.operations[i].Rollup) {
+			if !p.Operations[i].Rollup.Equal(other.Operations[i].Rollup) {
 				return false
 			}
 		case pipeline.TransformationOpType:
-			if p.operations[i].Transformation.Type != other.operations[i].Transformation.Type {
+			if p.Operations[i].Transformation.Type != other.Operations[i].Transformation.Type {
 				return false
 			}
 		}
@@ -214,25 +218,25 @@ func (p Pipeline) Equal(other Pipeline) bool {
 
 // Clone clones the pipeline.
 func (p Pipeline) Clone() Pipeline {
-	clone := make([]OpUnion, len(p.operations))
-	for i := range p.operations {
-		clone[i] = p.operations[i].Clone()
+	clone := make([]OpUnion, len(p.Operations))
+	for i := range p.Operations {
+		clone[i] = p.Operations[i].Clone()
 	}
-	return Pipeline{operations: clone}
+	return Pipeline{Operations: clone}
 }
 
-// SubPipeline returns a sub-pipeline containing operations between step `startInclusive`
+// SubPipeline returns a sub-pipeline containing Operations between step `startInclusive`
 // and step `endExclusive` of the current pipeline.
 func (p Pipeline) SubPipeline(startInclusive int, endExclusive int) Pipeline {
-	return Pipeline{operations: p.operations[startInclusive:endExclusive]}
+	return Pipeline{Operations: p.Operations[startInclusive:endExclusive]}
 }
 
 func (p Pipeline) String() string {
 	var b bytes.Buffer
 	b.WriteString("{operations: [")
-	for i, op := range p.operations {
+	for i, op := range p.Operations {
 		b.WriteString(op.String())
-		if i < len(p.operations)-1 {
+		if i < len(p.Operations)-1 {
 			b.WriteString(", ")
 		}
 	}
@@ -242,14 +246,14 @@ func (p Pipeline) String() string {
 
 // ToProto converts the applied pipeline to a protobuf message in place.
 func (p Pipeline) ToProto(pb *pipelinepb.AppliedPipeline) error {
-	numOps := len(p.operations)
+	numOps := len(p.Operations)
 	if cap(pb.Ops) >= numOps {
 		pb.Ops = pb.Ops[:numOps]
 	} else {
 		pb.Ops = make([]pipelinepb.AppliedPipelineOp, numOps)
 	}
 	for i := 0; i < numOps; i++ {
-		if err := p.operations[i].ToProto(&pb.Ops[i]); err != nil {
+		if err := p.Operations[i].ToProto(&pb.Ops[i]); err != nil {
 			return err
 		}
 	}
@@ -259,13 +263,13 @@ func (p Pipeline) ToProto(pb *pipelinepb.AppliedPipeline) error {
 // FromProto converts the protobuf message to an applied pipeline in place.
 func (p *Pipeline) FromProto(pb pipelinepb.AppliedPipeline) error {
 	numOps := len(pb.Ops)
-	if cap(p.operations) >= numOps {
-		p.operations = p.operations[:numOps]
+	if cap(p.Operations) >= numOps {
+		p.Operations = p.Operations[:numOps]
 	} else {
-		p.operations = make([]OpUnion, numOps)
+		p.Operations = make([]OpUnion, numOps)
 	}
 	for i := 0; i < numOps; i++ {
-		if err := p.operations[i].FromProto(pb.Ops[i]); err != nil {
+		if err := p.Operations[i].FromProto(pb.Ops[i]); err != nil {
 			return err
 		}
 	}
@@ -275,10 +279,45 @@ func (p *Pipeline) FromProto(pb pipelinepb.AppliedPipeline) error {
 // IsMappingRule returns whether this is a mapping rule, determined by
 // if any rollup pipelines are included.
 func (p Pipeline) IsMappingRule() bool {
-	for _, op := range p.operations {
+	for _, op := range p.Operations {
 		if op.Rollup.ID != nil {
 			return false
 		}
 	}
 	return true
 }
+
+// OperationsFromProto converts a list of protobuf AppliedPipelineOps, used in optimized staged metadata methods.
+func OperationsFromProto(pb []pipelinepb.AppliedPipelineOp, ops []OpUnion) error {
+	numOps := len(pb)
+	if numOps != len(ops) {
+		return errOperationsLengthMismatch
+	}
+	for i := 0; i < numOps; i++ {
+		u := &ops[i]
+		u.Type = pipeline.OpType(pb[i].Type + 1)
+		switch u.Type {
+		case pipeline.TransformationOpType:
+			if u.Rollup.ID != nil {
+				u.Rollup.ID = u.Rollup.ID[:0]
+			}
+			u.Rollup.AggregationID[0] = aggregation.DefaultID[0]
+			if pb[i].Transformation == nil {
+				return errNilTransformationOpProto
+			}
+			if err := u.Transformation.Type.FromProto(pb[i].Transformation.Type); err != nil {
+				return err
+			}
+		case pipeline.RollupOpType:
+			u.Transformation.Type = transformation.UnknownType
+			if pb == nil {
+				return errNilAppliedRollupOpProto
+			}
+			u.Rollup.AggregationID[0] = pb[i].Rollup.AggregationId.Id
+			u.Rollup.ID = pb[i].Rollup.Id
+		default:
+			return errUnknownOpType
+		}
+	}
+	return nil
+}
diff --git a/src/metrics/pipeline/applied/type_test.go b/src/metrics/pipeline/applied/type_test.go
index edd8e26b39..75366fef16 100644
--- a/src/metrics/pipeline/applied/type_test.go
+++ b/src/metrics/pipeline/applied/type_test.go
@@ -31,6 +31,7 @@ import (
 	"github.com/m3db/m3/src/metrics/pipeline"
 	"github.com/m3db/m3/src/metrics/transformation"
 
+	"github.com/google/go-cmp/cmp"
 	"github.com/stretchr/testify/require"
 )
 
@@ -367,18 +368,18 @@ func TestPipelineEqual(t *testing.T) {
 			require.Equal(t, input.expected, input.p2.Equal(input.p1))
 			// assert implementation is equal to OpUnion
 			if input.expected {
-				for i, op := range input.p1.operations {
-					require.True(t, op.Equal(input.p2.operations[i]))
+				for i, op := range input.p1.Operations {
+					require.True(t, op.Equal(input.p2.Operations[i]))
 				}
-				for i, op := range input.p2.operations {
-					require.True(t, op.Equal(input.p1.operations[i]))
+				for i, op := range input.p2.Operations {
+					require.True(t, op.Equal(input.p1.Operations[i]))
 				}
-			} else if len(input.p1.operations) == len(input.p2.operations) {
-				for i, op := range input.p1.operations {
-					require.False(t, op.Equal(input.p2.operations[i]))
+			} else if len(input.p1.Operations) == len(input.p2.Operations) {
+				for i, op := range input.p1.Operations {
+					require.False(t, op.Equal(input.p2.Operations[i]))
 				}
-				for i, op := range input.p2.operations {
-					require.False(t, op.Equal(input.p1.operations[i]))
+				for i, op := range input.p2.Operations {
+					require.False(t, op.Equal(input.p1.Operations[i]))
 				}
 			}
 		})
@@ -392,7 +393,7 @@ func TestPipelineCloneEmptyPipeline(t *testing.T) {
 	p2 := p1.Clone()
 	require.True(t, p1.Equal(p2))
 
-	p2.operations = append(p2.operations, OpUnion{
+	p2.Operations = append(p2.Operations, OpUnion{
 		Type: pipeline.RollupOpType,
 		Rollup: RollupOp{
 			ID:            []byte("foo"),
@@ -438,9 +439,9 @@ func TestPipelineCloneMultiLevelPipeline(t *testing.T) {
 	require.True(t, p1.Equal(p3))
 
 	// Mutate the operations of a cloned pipeline.
-	p2.operations[0].Transformation.Type = transformation.PerSecond
-	p2.operations[1].Rollup.ID[0] = 'z'
-	p2.operations[3].Rollup.AggregationID = aggregation.MustCompressTypes(aggregation.Count)
+	p2.Operations[0].Transformation.Type = transformation.PerSecond
+	p2.Operations[1].Rollup.ID[0] = 'z'
+	p2.Operations[3].Rollup.AggregationID = aggregation.MustCompressTypes(aggregation.Count)
 
 	// Verify the mutations do not affect the source pipeline or other clones.
 	require.False(t, p1.Equal(p2))
@@ -616,7 +617,10 @@ func TestPipelineFromProto(t *testing.T) {
 		var res Pipeline
 		for i, pb := range input.sequence {
 			require.NoError(t, res.FromProto(pb))
-			require.Equal(t, input.expected[i], res)
+			if !cmp.Equal(input.expected[i], res) {
+				t.Log(cmp.Diff(input.expected[i], res))
+				t.Fail()
+			}
 		}
 	}
 }
@@ -646,7 +650,10 @@ func TestPipelineRoundTrip(t *testing.T) {
 		for _, pipeline := range input {
 			require.NoError(t, pipeline.ToProto(&pb))
 			require.NoError(t, res.FromProto(pb))
-			require.Equal(t, pipeline, res)
+			if !cmp.Equal(pipeline, res) {
+				t.Log(cmp.Diff(pipeline, res))
+				t.Fail()
+			}
 		}
 	}
 }
diff --git a/src/metrics/policy/storage_policy.go b/src/metrics/policy/storage_policy.go
index 3d8e2905c0..0cdbf92390 100644
--- a/src/metrics/policy/storage_policy.go
+++ b/src/metrics/policy/storage_policy.go
@@ -39,8 +39,9 @@ var (
 	// EmptyStoragePolicy represents an empty storage policy.
 	EmptyStoragePolicy StoragePolicy
 
-	errNilStoragePolicyProto      = errors.New("nil storage policy proto")
-	errInvalidStoragePolicyString = errors.New("invalid storage policy string")
+	errNilStoragePolicyProto       = errors.New("nil storage policy proto")
+	errInvalidStoragePolicyString  = errors.New("invalid storage policy string")
+	errStoragePolicyLengthMismatch = errors.New("storage policy list length does not match proto")
 )
 
 // StoragePolicy represents the resolution and retention period metric datapoints
@@ -265,3 +266,22 @@ func (sp ByRetentionAscResolutionAsc) Less(i, j int) bool {
 	}
 	return sp[i].Resolution().Precision < sp[j].Resolution().Precision
 }
+
+// StoragePoliciesFromProto converts a list of protobuf storage policies to a storage policy in place.
+func StoragePoliciesFromProto(src []policypb.StoragePolicy, dst []StoragePolicy) error {
+	if len(src) != len(dst) {
+		return errStoragePolicyLengthMismatch
+	}
+	for i := 0; i < len(src); i++ {
+		d := &dst[i]
+		if err := d.resolution.FromProto(src[i].Resolution); err != nil {
+			return err
+		}
+
+		if err := d.retention.FromProto(src[i].Retention); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
diff --git a/src/metrics/transformation/type.go b/src/metrics/transformation/type.go
index e66ebbe4c3..46e428682e 100644
--- a/src/metrics/transformation/type.go
+++ b/src/metrics/transformation/type.go
@@ -22,13 +22,16 @@
 package transformation
 
 import (
+	"errors"
 	"fmt"
 
 	"github.com/m3db/m3/src/metrics/generated/proto/transformationpb"
 )
 
 // Type defines a transformation function.
-type Type int
+type Type int32
+
+var errUnknownTransformationType = errors.New("unknown transformation type")
 
 // Supported transformation types.
 const (
@@ -40,6 +43,11 @@ const (
 	Reset
 )
 
+const (
+	_minValidTransformationType = Absolute
+	_maxValidTransformationType = Reset
+)
+
 // IsValid checks if the transformation type is valid.
 func (t Type) IsValid() bool {
 	return t.IsUnaryTransform() || t.IsBinaryTransform() || t.IsUnaryMultiOutputTransform()
@@ -79,7 +87,7 @@ func (t Type) NewOp() (Op, error) {
 	case t.IsUnaryMultiOutputTransform():
 		unaryMulti, err = t.UnaryMultiOutputTransform()
 	default:
-		err = fmt.Errorf("unknown transformation type: %v", t)
+		err = errUnknownTransformationType
 	}
 	if err != nil {
 		return Op{}, err
@@ -154,38 +162,18 @@ func (t Type) MustUnaryMultiOutputTransform() UnaryMultiOutputTransform {
 
 // ToProto converts the transformation type to a protobuf message in place.
 func (t Type) ToProto(pb *transformationpb.TransformationType) error {
-	switch t {
-	case Absolute:
-		*pb = transformationpb.TransformationType_ABSOLUTE
-	case PerSecond:
-		*pb = transformationpb.TransformationType_PERSECOND
-	case Increase:
-		*pb = transformationpb.TransformationType_INCREASE
-	case Add:
-		*pb = transformationpb.TransformationType_ADD
-	case Reset:
-		*pb = transformationpb.TransformationType_RESET
-	default:
-		return fmt.Errorf("unknown transformation type: %v", t)
+	if t < _minValidTransformationType || t > _maxValidTransformationType {
+		return errUnknownTransformationType
 	}
+	*pb = transformationpb.TransformationType(t)
 	return nil
 }
 
 // FromProto converts the protobuf message to a transformation type in place.
 func (t *Type) FromProto(pb transformationpb.TransformationType) error {
-	switch pb {
-	case transformationpb.TransformationType_ABSOLUTE:
-		*t = Absolute
-	case transformationpb.TransformationType_PERSECOND:
-		*t = PerSecond
-	case transformationpb.TransformationType_INCREASE:
-		*t = Increase
-	case transformationpb.TransformationType_ADD:
-		*t = Add
-	case transformationpb.TransformationType_RESET:
-		*t = Reset
-	default:
-		return fmt.Errorf("unknown transformation type in proto: %v", pb)
+	*t = Type(pb)
+	if *t < _minValidTransformationType || *t > _maxValidTransformationType {
+		return errUnknownTransformationType
 	}
 	return nil
 }
diff --git a/src/x/time/unit.go b/src/x/time/unit.go
index f120def9d3..8d423e7f3c 100644
--- a/src/x/time/unit.go
+++ b/src/x/time/unit.go
@@ -108,11 +108,18 @@ func (tu Unit) String() string {
 
 // UnitFromDuration creates a time unit from a time duration.
 func UnitFromDuration(d time.Duration) (Unit, error) {
-	if unit, found := durationsToUnit[d]; found {
-		return unit, nil
+	i := 0
+	// TODO: remove this once we're on go 1.16+, as for loops prevent inlining with older compilers
+For:
+	if i >= len(unitLookupArray) {
+		return None, errConvertDurationToUnit
 	}
 
-	return None, errConvertDurationToUnit
+	if unitLookupArray[i].duration == d {
+		return unitLookupArray[i].unit, nil
+	}
+	i++
+	goto For
 }
 
 // DurationFromUnit creates a time duration from a time unit.
@@ -184,13 +191,19 @@ var (
 		Day:         time.Hour * 24,
 		Year:        time.Hour * 24 * 365,
 	}
-	durationsToUnit = make(map[time.Duration]Unit)
 
 	unitCount = len(unitsToDuration)
 
 	unitsByDurationDesc []Unit
 )
 
+type unitLookupEntry struct {
+	duration time.Duration
+	unit     Unit
+}
+
+var unitLookupArray []unitLookupEntry
+
 // byDurationDesc sorts time units by their durations in descending order.
 // The order is undefined if the units are invalid.
 type byDurationDesc []Unit
@@ -206,13 +219,15 @@ func (b byDurationDesc) Less(i, j int) bool {
 
 func init() {
 	unitsByDurationDesc = make([]Unit, 0, unitCount)
+	unitLookupArray = make([]unitLookupEntry, 0, unitCount)
+
 	for u, d := range unitsToDuration {
 		unit := Unit(u)
 		if unit == None {
 			continue
 		}
 
-		durationsToUnit[d] = unit
+		unitLookupArray = append(unitLookupArray, unitLookupEntry{unit: unit, duration: d})
 		unitsByDurationDesc = append(unitsByDurationDesc, unit)
 	}
 	sort.Sort(byDurationDesc(unitsByDurationDesc))

From 373be785f9b1f9b2492289c3f08eeb4753b9af63 Mon Sep 17 00:00:00 2001
From: Vytenis Darulis <vytenis@uber.com>
Date: Fri, 5 Mar 2021 10:14:09 -0500
Subject: [PATCH 2/3] .

---
 src/metrics/metadata/metadata.go | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git a/src/metrics/metadata/metadata.go b/src/metrics/metadata/metadata.go
index 8ddfa997bc..5cf5e225eb 100644
--- a/src/metrics/metadata/metadata.go
+++ b/src/metrics/metadata/metadata.go
@@ -533,14 +533,17 @@ func (sms *StagedMetadatas) FromProto(pb metricpb.StagedMetadatas) error {
 				pipeline.Pipeline.Operations = make([]applied.OpUnion, numOps)
 			}
 
-			err = applied.OperationsFromProto(pipelinePb.Pipeline.Ops, pipeline.Pipeline.Operations)
-			if err != nil {
-				return err
+			if len(pipelinePb.Pipeline.Ops) > 0 {
+				err = applied.OperationsFromProto(pipelinePb.Pipeline.Ops, pipeline.Pipeline.Operations)
+				if err != nil {
+					return err
+				}
 			}
-
-			err = policy.StoragePoliciesFromProto(pipelinePb.StoragePolicies, pipeline.StoragePolicies)
-			if err != nil {
-				return err
+			if len(pipelinePb.StoragePolicies) > 0 {
+				err = policy.StoragePoliciesFromProto(pipelinePb.StoragePolicies, pipeline.StoragePolicies)
+				if err != nil {
+					return err
+				}
 			}
 		}
 	}

From 5203dd7a9c937d216a34cdc77a9450f2ba18e063 Mon Sep 17 00:00:00 2001
From: Vytenis Darulis <vytenis@uber.com>
Date: Fri, 5 Mar 2021 11:30:51 -0500
Subject: [PATCH 3/3] undo accidental codegen change

---
 .../generated/proto/metricpb/composite.pb.go  | 20 ++++++++-----------
 1 file changed, 8 insertions(+), 12 deletions(-)

diff --git a/src/metrics/generated/proto/metricpb/composite.pb.go b/src/metrics/generated/proto/metricpb/composite.pb.go
index 20af477ce2..e364a562f3 100644
--- a/src/metrics/generated/proto/metricpb/composite.pb.go
+++ b/src/metrics/generated/proto/metricpb/composite.pb.go
@@ -143,12 +143,10 @@ type BatchTimerWithMetadatas struct {
 	Metadatas  StagedMetadatas `protobuf:"bytes,2,opt,name=metadatas" json:"metadatas"`
 }
 
-func (m *BatchTimerWithMetadatas) Reset()         { *m = BatchTimerWithMetadatas{} }
-func (m *BatchTimerWithMetadatas) String() string { return proto.CompactTextString(m) }
-func (*BatchTimerWithMetadatas) ProtoMessage()    {}
-func (*BatchTimerWithMetadatas) Descriptor() ([]byte, []int) {
-	return fileDescriptorComposite, []int{1}
-}
+func (m *BatchTimerWithMetadatas) Reset()                    { *m = BatchTimerWithMetadatas{} }
+func (m *BatchTimerWithMetadatas) String() string            { return proto.CompactTextString(m) }
+func (*BatchTimerWithMetadatas) ProtoMessage()               {}
+func (*BatchTimerWithMetadatas) Descriptor() ([]byte, []int) { return fileDescriptorComposite, []int{1} }
 
 func (m *BatchTimerWithMetadatas) GetBatchTimer() BatchTimer {
 	if m != nil {
@@ -219,12 +217,10 @@ type TimedMetricWithMetadata struct {
 	Metadata TimedMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
 }
 
-func (m *TimedMetricWithMetadata) Reset()         { *m = TimedMetricWithMetadata{} }
-func (m *TimedMetricWithMetadata) String() string { return proto.CompactTextString(m) }
-func (*TimedMetricWithMetadata) ProtoMessage()    {}
-func (*TimedMetricWithMetadata) Descriptor() ([]byte, []int) {
-	return fileDescriptorComposite, []int{4}
-}
+func (m *TimedMetricWithMetadata) Reset()                    { *m = TimedMetricWithMetadata{} }
+func (m *TimedMetricWithMetadata) String() string            { return proto.CompactTextString(m) }
+func (*TimedMetricWithMetadata) ProtoMessage()               {}
+func (*TimedMetricWithMetadata) Descriptor() ([]byte, []int) { return fileDescriptorComposite, []int{4} }
 
 func (m *TimedMetricWithMetadata) GetMetric() TimedMetric {
 	if m != nil {