From 7fc264c872900013b2c1ecdd3205c455024afb59 Mon Sep 17 00:00:00 2001 From: Young Xu Date: Fri, 26 May 2023 14:38:43 +0800 Subject: [PATCH] add getStats api (#17) --- padmin/non_persistent_topics_impl.go | 2 +- padmin/persistent_topics_impl.go | 27 +++++++++--- padmin/persistent_topics_impl_test.go | 62 +++++++++++++++++++++++++++ padmin/stats_interface.go | 40 ++++++++++++++++- 4 files changed, 124 insertions(+), 7 deletions(-) diff --git a/padmin/non_persistent_topics_impl.go b/padmin/non_persistent_topics_impl.go index a0105b2..b8c0bae 100644 --- a/padmin/non_persistent_topics_impl.go +++ b/padmin/non_persistent_topics_impl.go @@ -111,7 +111,7 @@ func (n *NonPersistentTopicsImpl) GetStats(tenant string, namespace string, topi panic("implement me") } -func (n *NonPersistentTopicsImpl) GetPartitionedStats(tenant string, namespace string, topic string) ([]*TopicStatistics, error) { +func (n *NonPersistentTopicsImpl) GetPartitionedStats(tenant string, namespace string, topic string) (*TopicStatistics, error) { //TODO implement me panic("implement me") } diff --git a/padmin/persistent_topics_impl.go b/padmin/persistent_topics_impl.go index 361e994..bc556e5 100644 --- a/padmin/persistent_topics_impl.go +++ b/padmin/persistent_topics_impl.go @@ -52,13 +52,30 @@ type PersistentTopicsImpl struct { } func (p *PersistentTopicsImpl) GetStats(tenant, namespace, topic string) (*TopicStatistics, error) { - //TODO implement me - panic("implement me") + resp, err := p.cli.Get(fmt.Sprintf(UrlPersistentGetStatsForTopicFormat, tenant, namespace, topic)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + var body = new(TopicStatistics) + if err := EasyReader(resp, body); err != nil { + return nil, err + } + return body, nil } -func (p *PersistentTopicsImpl) GetPartitionedStats(tenant, namespace, topic string) ([]*TopicStatistics, error) { - //TODO implement me - panic("implement me") +func (p *PersistentTopicsImpl) GetPartitionedStats(tenant, namespace, topic string) (*TopicStatistics, error) { + resp, err := p.cli.Get(fmt.Sprintf(UrlPersistentGetStatsForPartitionedTopicFormat, tenant, namespace, topic)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + var body = new(TopicStatistics) + if err := EasyReader(resp, body); err != nil { + return nil, err + } + + return body, nil } func (p *PersistentTopicsImpl) GetStatsInternal(tenant, namespace, topic string) (*TopicInternalStats, error) { diff --git a/padmin/persistent_topics_impl_test.go b/padmin/persistent_topics_impl_test.go index 0a19bab..5db7ee3 100644 --- a/padmin/persistent_topics_impl_test.go +++ b/padmin/persistent_topics_impl_test.go @@ -371,3 +371,65 @@ func TestPersistentTopicsImpl_GetStatsInternalForTopic(t *testing.T) { require.Nil(t, err) t.Logf("get stats: %+v", stats) } + +func TestPersistentTopicsImpl_GetStats(t *testing.T) { + broker := startTestBroker(t) + defer broker.Close() + admin := NewTestPulsarAdmin(t, broker.webPort) + testTenant := RandStr(8) + err := admin.Tenants.Create(testTenant, TenantInfo{ + AllowedClusters: []string{"standalone"}, + }) + require.Nil(t, err) + testNs := RandStr(8) + err = admin.Namespaces.Create(testTenant, testNs) + require.Nil(t, err) + namespaces, err := admin.Namespaces.List(testTenant) + require.Nil(t, err) + assert.Contains(t, namespaces, fmt.Sprintf("%s/%s", testTenant, testNs)) + testTopic := RandStr(8) + err = admin.PersistentTopics.CreateNonPartitioned(testTenant, testNs, testTopic) + require.Nil(t, err) + topicList, err := admin.PersistentTopics.ListNonPartitioned(testTenant, testNs) + require.Nil(t, err) + if len(topicList) != 1 { + t.Fatal("topic list should have one topic") + } + if topicList[0] != fmt.Sprintf("persistent://%s/%s/%s", testTenant, testNs, testTopic) { + t.Fatal("topic name should be equal") + } + stats, err := admin.PersistentTopics.GetStats(testTenant, testNs, testTopic) + require.Nil(t, err) + t.Logf("get stats: %+v", stats) +} + +func TestPersistentTopicsImpl_GetPartitionedStats(t *testing.T) { + broker := startTestBroker(t) + defer broker.Close() + admin := NewTestPulsarAdmin(t, broker.webPort) + testTenant := RandStr(8) + err := admin.Tenants.Create(testTenant, TenantInfo{ + AllowedClusters: []string{"standalone"}, + }) + require.Nil(t, err) + testNs := RandStr(8) + err = admin.Namespaces.Create(testTenant, testNs) + require.Nil(t, err) + namespaces, err := admin.Namespaces.List(testTenant) + require.Nil(t, err) + assert.Contains(t, namespaces, fmt.Sprintf("%s/%s", testTenant, testNs)) + testTopic := RandStr(8) + err = admin.PersistentTopics.CreatePartitioned(testTenant, testNs, testTopic, 2) + require.Nil(t, err) + topicList, err := admin.PersistentTopics.ListPartitioned(testTenant, testNs) + require.Nil(t, err) + if len(topicList) != 1 { + t.Fatal("topic list should have one topic") + } + if topicList[0] != fmt.Sprintf("persistent://%s/%s/%s", testTenant, testNs, testTopic) { + t.Fatal("topic name should be equal") + } + statsList, err := admin.PersistentTopics.GetPartitionedStats(testTenant, testNs, testTopic) + require.Nil(t, err) + t.Logf("get partitioned stats:%+v", statsList) +} diff --git a/padmin/stats_interface.go b/padmin/stats_interface.go index 0b5c8ec..c3a2fdf 100644 --- a/padmin/stats_interface.go +++ b/padmin/stats_interface.go @@ -19,7 +19,7 @@ package padmin type TopicStats interface { GetStats(string, string, string) (*TopicStatistics, error) - GetPartitionedStats(string, string, string) ([]*TopicStatistics, error) + GetPartitionedStats(string, string, string) (*TopicStatistics, error) GetStatsInternal(string, string, string) (*TopicInternalStats, error) GetPartitionedStatsInternal(string, string, string) (*PartitionedTopicInternalStats, error) } @@ -62,6 +62,37 @@ type PartitionedTopicInternalStats struct { } type TopicStatistics struct { + MsgRateIn float64 `json:"msgRateIn,omitempty"` + MsgThroughputIn float64 `json:"msgThroughputIn,omitempty"` + MsgRateOut float64 `json:"msgRateOut,omitempty"` + MsgThroughputOut float64 `json:"msgThroughputOut,omitempty"` + BytesInCounter uint64 `json:"bytesInCounter,omitempty"` + MsgInCounter uint64 `json:"msgInCounter,omitempty"` + BytesOutCounter uint64 `json:"bytesOutCounter,omitempty"` + MsgOutCounter uint64 `json:"msgOutCounter,omitempty"` + AverageMsgSize float64 `json:"averageMsgSize,omitempty"` + MsgChunkPublished bool `json:"msgChunkPublished,omitempty"` + StorageSize uint64 `json:"storageSize,omitempty"` + BacklogSize uint64 `json:"backlogSize,omitempty"` + PublishRateLimitedTimes uint64 `json:"publishRateLimitedTimes,omitempty"` + EarliestMsgPublishTimeInBacklogs uint64 `json:"earliestMsgPublishTimeInBacklogs,omitempty"` + OffloadedStorageSize uint64 `json:"offloadedStorageSize,omitempty"` + LastOffloadLedgerId uint64 `json:"lastOffloadLedgerId,omitempty"` + LastOffloadSuccessTimeStamp uint64 `json:"lastOffloadSuccessTimeStamp,omitempty"` + LastOffloadFailureTimeStamp uint64 `json:"lastOffloadFailureTimeStamp,omitempty"` + OngoingTxnCount uint64 `json:"ongoingTxnCount,omitempty"` + AbortedTxnCount uint64 `json:"abortedTxnCount,omitempty"` + CommittedTxnCount uint64 `json:"committedTxnCount,omitempty"` + Publishers []string `json:"publishers,omitempty"` + WaitingPublishers uint64 `json:"waitingPublishers,omitempty"` + Subscriptions map[string]string `json:"subscriptions,omitempty"` + Replication map[string]string `json:"replication,omitempty"` + DeduplicationStatus string `json:"deduplicationStatus,omitempty"` + NonContiguousDeletedMessagesRanges uint64 `json:"nonContiguousDeletedMessagesRanges,omitempty"` + NonContiguousDeletedMessagesRangesSerializedSize uint64 `json:"nonContiguousDeletedMessagesRangesSerializedSize,omitempty"` + DelayedMessageIndexSizeInBytes uint64 `json:"delayedMessageIndexSizeInBytes,omitempty"` + Compaction Compaction `json:"compaction"` + OwnerBroker string `json:"ownerBroker,omitempty"` } type CursorStats struct { @@ -82,3 +113,10 @@ type CursorStats struct { SubscriptionHavePendingReplayRead bool `json:"subscriptionHavePendingReplayRead"` Properties map[string]int64 `json:"properties"` } + +type Compaction struct { + LastCompactionRemovedEventCount uint64 `json:"lastCompactionRemovedEventCount,omitempty"` + LastCompactionSucceedTimestamp uint64 `json:"lastCompactionSucceedTimestamp,omitempty"` + LastCompactionFailedTimestamp uint64 `json:"lastCompactionFailedTimestamp,omitempty"` + LastCompactionDurationTimeInMills uint64 `json:"lastCompactionDurationTimeInMills,omitempty"` +}