Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/metrics: Send the accounting registry to InfluxDB #18470

Merged
merged 3 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions metrics/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,9 @@ func (r *PrefixedRegistry) UnregisterAll() {
}

var (
DefaultRegistry = NewRegistry()
EphemeralRegistry = NewRegistry()
DefaultRegistry = NewRegistry()
EphemeralRegistry = NewRegistry()
AccountingRegistry = NewRegistry() // registry used in swarm
)

// Call the given function for each registered metric.
Expand Down
35 changes: 11 additions & 24 deletions p2p/protocols/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,21 @@ var (
// All metrics are cumulative

// total amount of units credited
mBalanceCredit metrics.Counter
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", metrics.AccountingRegistry)
// total amount of units debited
mBalanceDebit metrics.Counter
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", metrics.AccountingRegistry)
// total amount of bytes credited
mBytesCredit metrics.Counter
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", metrics.AccountingRegistry)
// total amount of bytes debited
mBytesDebit metrics.Counter
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", metrics.AccountingRegistry)
// total amount of credited messages
mMsgCredit metrics.Counter
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", metrics.AccountingRegistry)
// total amount of debited messages
mMsgDebit metrics.Counter
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", metrics.AccountingRegistry)
// how many times local node had to drop remote peers
mPeerDrops metrics.Counter
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", metrics.AccountingRegistry)
// how many times local node overdrafted and dropped
mSelfDrops metrics.Counter

MetricsRegistry metrics.Registry
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", metrics.AccountingRegistry)
)

// Prices defines how prices are being passed on to the accounting instance
Expand Down Expand Up @@ -110,24 +108,13 @@ func NewAccounting(balance Balance, po Prices) *Accounting {
return ah
}

// SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
// SetupAccountingMetrics uses a separate registry for p2p accounting metrics;
// this registry should be independent of any other metrics as it persists at different endpoints.
// It also instantiates the given metrics and starts the persisting go-routine which
// It also starts the persisting go-routine which
// at the passed interval writes the metrics to a LevelDB
func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics {
// create an empty registry
MetricsRegistry = metrics.NewRegistry()
// instantiate the metrics
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", MetricsRegistry)
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", MetricsRegistry)
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", MetricsRegistry)
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", MetricsRegistry)
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", MetricsRegistry)
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", MetricsRegistry)
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", MetricsRegistry)
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", MetricsRegistry)
// create the DB and start persisting
return NewAccountingMetrics(MetricsRegistry, reportInterval, path)
return NewAccountingMetrics(metrics.AccountingRegistry, reportInterval, path)
}

// Send takes a peer, a size and a msg and
Expand Down
28 changes: 17 additions & 11 deletions p2p/protocols/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,35 +43,41 @@ func TestReporter(t *testing.T) {
metrics := SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db"))
log.Debug("Done.")

//do some metrics
//change metrics
mBalanceCredit.Inc(12)
mBytesCredit.Inc(34)
mMsgDebit.Inc(9)

//store expected metrics
expectedBalanceCredit := mBalanceCredit.Count()
expectedBytesCredit := mBytesCredit.Count()
expectedMsgDebit := mMsgDebit.Count()

//give the reporter time to write the metrics to DB
time.Sleep(20 * time.Millisecond)

//set the metrics to nil - this effectively simulates the node having shut down...
mBalanceCredit = nil
mBytesCredit = nil
mMsgDebit = nil
//close the DB also, or we can't create a new one
metrics.Close()

//clear the metrics - this effectively simulates the node having shut down...
mBalanceCredit.Clear()
mBytesCredit.Clear()
mMsgDebit.Clear()

//setup the metrics again
log.Debug("Setting up metrics second time")
metrics = SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db"))
defer metrics.Close()
log.Debug("Done.")

//now check the metrics, they should have the same value as before "shutdown"
if mBalanceCredit.Count() != 12 {
t.Fatalf("Expected counter to be %d, but is %d", 12, mBalanceCredit.Count())
if mBalanceCredit.Count() != expectedBalanceCredit {
t.Fatalf("Expected counter to be %d, but is %d", expectedBalanceCredit, mBalanceCredit.Count())
}
if mBytesCredit.Count() != 34 {
t.Fatalf("Expected counter to be %d, but is %d", 23, mBytesCredit.Count())
if mBytesCredit.Count() != expectedBytesCredit {
t.Fatalf("Expected counter to be %d, but is %d", expectedBytesCredit, mBytesCredit.Count())
}
if mMsgDebit.Count() != 9 {
t.Fatalf("Expected counter to be %d, but is %d", 9, mMsgDebit.Count())
if mMsgDebit.Count() != expectedMsgDebit {
t.Fatalf("Expected counter to be %d, but is %d", expectedMsgDebit, mMsgDebit.Count())
}
}
25 changes: 19 additions & 6 deletions swarm/metrics/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ var (
Name: "metrics.influxdb.export",
Usage: "Enable metrics export/push to an external InfluxDB database",
}
MetricsEnableInfluxDBAccountingExportFlag = cli.BoolFlag{
Name: "metrics.influxdb.accounting",
Usage: "Enable accounting metrics export/push to an external InfluxDB database",
}
MetricsInfluxDBEndpointFlag = cli.StringFlag{
Name: "metrics.influxdb.endpoint",
Usage: "Metrics InfluxDB endpoint",
Expand Down Expand Up @@ -66,6 +70,7 @@ var (
var Flags = []cli.Flag{
utils.MetricsEnabledFlag,
MetricsEnableInfluxDBExportFlag,
MetricsEnableInfluxDBAccountingExportFlag,
MetricsInfluxDBEndpointFlag,
MetricsInfluxDBDatabaseFlag,
MetricsInfluxDBUsernameFlag,
Expand All @@ -77,12 +82,13 @@ func Setup(ctx *cli.Context) {
if gethmetrics.Enabled {
log.Info("Enabling swarm metrics collection")
var (
enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name)
endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name)
database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name)
username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name)
password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name)
hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name)
enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name)
enableAccountingExport = ctx.GlobalBool(MetricsEnableInfluxDBAccountingExportFlag.Name)
endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name)
database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name)
username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name)
password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name)
hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name)
)

// Start system runtime metrics collection
Expand All @@ -94,5 +100,12 @@ func Setup(ctx *cli.Context) {
"host": hosttag,
})
}

if enableAccountingExport {
log.Info("Exporting accounting metrics to InfluxDB")
go influxdb.InfluxDBWithTags(gethmetrics.AccountingRegistry, 10*time.Second, endpoint, database, username, password, "accounting.", map[string]string{
"host": hosttag,
})
}
}
}