Skip to content

Commit

Permalink
refactor: fix suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Jan 21, 2025
1 parent 3c7c47e commit e7e55cd
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 59 deletions.
6 changes: 3 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,13 +751,13 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
slotMetricGauges.IntervalSinceLastNormalizeGauge = a.OtelManager.Metrics.IntervalSinceLastNormalizeGauge

maintenanceEnabled, err := peerdbenv.PeerDBMaintenanceModeEnabled(ctx, nil)
instanceStatus := otel_metrics.InstancestatusReady
instanceStatus := otel_metrics.InstanceStatusReady
if err != nil {
logger.Error("Failed to get maintenance mode status", slog.Any("error", err))
instanceStatus = otel_metrics.InstanceStatusUknown
instanceStatus = otel_metrics.InstanceStatusUnknown
}
if maintenanceEnabled {
instanceStatus = otel_metrics.InstancestatusMaintenance
instanceStatus = otel_metrics.InstanceStatusMaintenance
}

a.OtelManager.Metrics.InstanceStatusGauge.Record(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
Expand Down
1 change: 0 additions & 1 deletion flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,6 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error)
var tags []string
if errors.Is(err, context.Canceled) {
tags = append(tags, string(shared.ErrTypeCanceled))
// TODO this is only set for context.Canceled, other types need to be added too
errorClassString = "context.Canceled"
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
Expand Down
26 changes: 13 additions & 13 deletions flow/otel_metrics/attributes.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package otel_metrics

const (
PeerNameKey string = "peerName"
SlotNameKey string = "slotName"
FlowNameKey string = "flowName"
DeploymentUidKey string = "deploymentUID"
ErrorClassKey string = "errorClass"
InstanceStatusKey string = "instanceStatus"
WorkflowTypeKey string = "workflowType"
BatchIdKey string = "batchId"
SourcePeerType string = "sourcePeerType"
DestinationPeerType string = "destinationPeerType"
PeerNameKey = "peerName"
SlotNameKey = "slotName"
FlowNameKey = "flowName"
DeploymentUidKey = "deploymentUID"
ErrorClassKey = "errorClass"
InstanceStatusKey = "instanceStatus"
WorkflowTypeKey = "workflowType"
BatchIdKey = "batchId"
SourcePeerType = "sourcePeerType"
DestinationPeerType = "destinationPeerType"
)

const (
InstancestatusReady string = "ready"
InstancestatusMaintenance string = "maintenance"
InstanceStatusUknown string = "unknown"
InstanceStatusMaintenance = "maintenance"
InstanceStatusUnknown = "unknown"
InstanceStatusReady = "ready"
)
70 changes: 28 additions & 42 deletions flow/otel_metrics/otel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
RecordsSyncedGaugeName = "records_synced"
SyncedTablesGaugeName = "synced_tables"
InstanceStatusGaugeName = "instance_status"
MaintenanceStatus = "maintenance_status"
MaintenanceStatusGaugeName = "maintenance_status"
)

type Metrics struct {
Expand Down Expand Up @@ -85,8 +85,7 @@ func NewOtelManager() (*OtelManager, error) {
Int64GaugesCache: make(map[string]metric.Int64Gauge),
Int64CountersCache: make(map[string]metric.Int64Counter),
}
err = otelManager.setupMetrics()
if err != nil {
if err = otelManager.setupMetrics(); err != nil {

Check failure on line 88 in flow/otel_metrics/otel_manager.go

View workflow job for this annotation

GitHub Actions / lint

sloppyReassign: re-assignment to `err` can be replaced with `err := otelManager.setupMetrics()` (gocritic)
return nil, err
}
return &otelManager, nil
Expand Down Expand Up @@ -131,100 +130,87 @@ func (om *OtelManager) GetOrInitInt64Counter(name string, opts ...metric.Int64Co
}

func (om *OtelManager) setupMetrics() error {
slog.Info("Setting up all metrics")
slog.Debug("Setting up all metrics")
var err error
om.Metrics.SlotLagGauge, err = om.GetOrInitFloat64Gauge(BuildMetricName(SlotLagGaugeName),
if om.Metrics.SlotLagGauge, err = om.GetOrInitFloat64Gauge(BuildMetricName(SlotLagGaugeName),
metric.WithUnit("MiBy"),
metric.WithDescription("Postgres replication slot lag in MB"),
)
if err != nil {
); err != nil {
return err
}

om.Metrics.CurrentBatchIdGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(CurrentBatchIdGaugeName))
if err != nil {
if om.Metrics.CurrentBatchIdGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(CurrentBatchIdGaugeName)); err != nil {
return err
}

om.Metrics.LastNormalizedBatchIdGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(LastNormalizedBatchIdGaugeName))
if err != nil {
if om.Metrics.LastNormalizedBatchIdGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(LastNormalizedBatchIdGaugeName)); err != nil {
return err
}

om.Metrics.OpenConnectionsGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(OpenConnectionsGaugeName),
if om.Metrics.OpenConnectionsGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(OpenConnectionsGaugeName),
metric.WithDescription("Current open connections for PeerDB user"),
)
if err != nil {
); err != nil {
return err
}

om.Metrics.OpenReplicationConnectionsGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(OpenReplicationConnectionsGaugeName),
if om.Metrics.OpenReplicationConnectionsGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(OpenReplicationConnectionsGaugeName),
metric.WithDescription("Current open replication connections for PeerDB user"),
)
if err != nil {
); err != nil {
return err
}

om.Metrics.IntervalSinceLastNormalizeGauge, err = om.GetOrInitFloat64Gauge(BuildMetricName(IntervalSinceLastNormalizeGaugeName),
if om.Metrics.IntervalSinceLastNormalizeGauge, err = om.GetOrInitFloat64Gauge(BuildMetricName(IntervalSinceLastNormalizeGaugeName),
metric.WithUnit("s"),
metric.WithDescription("Interval since last normalize"),
)
if err != nil {
); err != nil {
return err
}

om.Metrics.FetchedBytesCounter, err = om.GetOrInitInt64Counter(BuildMetricName(FetchedBytesCounterName),
if om.Metrics.FetchedBytesCounter, err = om.GetOrInitInt64Counter(BuildMetricName(FetchedBytesCounterName),
metric.WithUnit("By"),
metric.WithDescription("Bytes received of CopyData over replication slot"),
)
if err != nil {
); err != nil {
return err
}

om.Metrics.ErrorEmittedGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(ErrorEmittedGaugeName),
if om.Metrics.ErrorEmittedGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(ErrorEmittedGaugeName),
// This mostly tells whether an error is emitted or not, used for hooking up event based alerting
metric.WithDescription("Whether an error was emitted, 1 if emitted, 0 otherwise"),
)
if err != nil {
); err != nil {
return err
}

om.Metrics.ErrorsEmittedCounter, err = om.GetOrInitInt64Counter(BuildMetricName(ErrorsEmittedCounterName),
if om.Metrics.ErrorsEmittedCounter, err = om.GetOrInitInt64Counter(BuildMetricName(ErrorsEmittedCounterName),
// This the actual counter for errors emitted, used for alerting based on error rate/more detailed error analysis
metric.WithDescription("Counter of errors emitted"),
)
if err != nil {
); err != nil {
return err
}

om.Metrics.RecordsSyncedGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(RecordsSyncedGaugeName),
if om.Metrics.RecordsSyncedGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(RecordsSyncedGaugeName),
metric.WithDescription("Number of records synced for every Sync batch"),
)
if err != nil {
); err != nil {
return err
}

om.Metrics.SyncedTablesGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(SyncedTablesGaugeName),
if om.Metrics.SyncedTablesGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(SyncedTablesGaugeName),
metric.WithDescription("Number of tables synced"),
)
if err != nil {
); err != nil {
return err
}

om.Metrics.InstanceStatusGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(InstanceStatusGaugeName),
if om.Metrics.InstanceStatusGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(InstanceStatusGaugeName),
metric.WithDescription("Status of the instance, always emits a 1 metric with different attributes for different statuses"),
)
if err != nil {
); err != nil {
return err
}

om.Metrics.MaintenanceStatusGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(MaintenanceStatus),
if om.Metrics.MaintenanceStatusGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(MaintenanceStatusGaugeName),
metric.WithDescription("Whether maintenance is running, 1 if running with different attributes for start/end"),
)
if err != nil {
); err != nil {
return err
}
slog.Info("Finished setting up all metrics")
slog.Debug("Finished setting up all metrics")
return nil
}

Expand Down

0 comments on commit e7e55cd

Please sign in to comment.