Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: message_ttl and compaction api #9

Merged
merged 1 commit into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions padmin/compaction_interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package padmin

type NamespaceCompaction interface {
// GetMaximumUnCompactedBytes Delete maximum number of uncompacted bytes in a topic before compaction is triggered.
GetMaximumUnCompactedBytes(tenant, namespace string) (int64, error)
// SetMaximumUnCompactedBytes Set maximum number of uncompacted bytes in a topic before compaction is triggered.
SetMaximumUnCompactedBytes(tenant, namespace string, threshold int64) error
// RemoveMaximumUnCompactedBytes Delete maximum number of uncompacted bytes in a topic before compaction is triggered.
RemoveMaximumUnCompactedBytes(tenant, namespace string) error
}

type TopicCompaction interface {
// GetTopicCompactionThreshold Get compaction threshold configuration for specified topic.
GetTopicCompactionThreshold(tenant, namespace, topic string) (int64, error)
// SetTopicCompactionThreshold Set compaction threshold configuration for specified topic.
SetTopicCompactionThreshold(tenant, namespace, topic string, threshold int64) error
// RemoveTopicCompactionThreshold Remove compaction threshold configuration for specified topic.
RemoveTopicCompactionThreshold(tenant, namespace, topic string) error
// GetTopicCompactionStatus Get the status of a compaction operation for a topic.
GetTopicCompactionStatus(tenant, namespace, topic string) (*LongRunningProcessStatus, error)
// TriggerTopicCompaction Trigger a compaction operation on a topic.
TriggerTopicCompaction(tenant, namespace, topic string) error
}

type CompactionStatus string

const (
NotRun CompactionStatus = "NOT_RUN"
Running CompactionStatus = "RUNNING"
Success CompactionStatus = "SUCCESS"
ERROR CompactionStatus = "ERROR"
)

type LongRunningProcessStatus struct {
Status CompactionStatus `json:"status"`
LastError string `json:"lastError"`
}
7 changes: 6 additions & 1 deletion padmin/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package padmin
import (
"bytes"
"encoding/json"
"io"
"net/http"
"time"
)
Expand Down Expand Up @@ -48,7 +49,11 @@ func (h *HttpClientImpl) Put(path string, body any) (*http.Response, error) {
if err != nil {
return nil, err
}
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(data))
var putData io.Reader
if body != nil {
putData = bytes.NewBuffer(data)
}
req, err := http.NewRequest("PUT", url, putData)
if err != nil {
return nil, err
}
Expand Down
38 changes: 38 additions & 0 deletions padmin/message_ttl_interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package padmin

// NamespaceMessageTTL Discard data after some time (by automatically acknowledging)
type NamespaceMessageTTL interface {
// GetNamespaceMessageTTL Get the message TTL for the namespace
GetNamespaceMessageTTL(tenant, namespace string) (int64, error)
// SetNamespaceMessageTTL Set the message TTL for the namespace
SetNamespaceMessageTTL(tenant, namespace string, seconds int64) error
// RemoveNamespaceMessageTTL Remove the message TTL for the namespace
RemoveNamespaceMessageTTL(tenant, namespace string) error
}

// TopicMessageTTL Discard data after some time (by automatically acknowledging)
type TopicMessageTTL interface {
// GetTopicMessageTTL Get the message TTL for the topic
GetTopicMessageTTL(tenant, namespace, topic string) (int64, error)
// SetTopicMessageTTL Set the message TTL for the topic
SetTopicMessageTTL(tenant, namespace, topic string, seconds int64) error
// RemoveTopicMessageTTL Remove the message TTL for the topic
RemoveTopicMessageTTL(tenant, namespace, topic string) error
}
79 changes: 79 additions & 0 deletions padmin/namespaces_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"io"
"strconv"
)

type Namespaces interface {
Expand All @@ -29,6 +30,8 @@ type Namespaces interface {
List(tenant string) ([]string, error)
NamespaceRetention
NamespaceBacklog
NamespaceMessageTTL
NamespaceCompaction
}

type NamespacesImpl struct {
Expand Down Expand Up @@ -201,3 +204,79 @@ func (n *NamespacesImpl) RemoveNamespaceRetention(tenant, namespace string) erro
}
return HttpCheck(resp)
}

func (n *NamespacesImpl) GetNamespaceMessageTTL(tenant, namespace string) (int64, error) {
url := fmt.Sprintf(UrlNamespaceMessageTTLFormat, tenant, namespace)
resp, err := n.cli.Get(url)
if err != nil {
return 0, err
}
res, err := ReadAll(resp.Body)
if err != nil {
return 0, err
}
if len(res) == 0 {
return 0, nil
}
i, err := strconv.ParseInt(res, 10, 64)
if err != nil {
return 0, err
}
return i, nil
}

func (n *NamespacesImpl) SetNamespaceMessageTTL(tenant, namespace string, seconds int64) error {
url := fmt.Sprintf(UrlNamespaceMessageTTLFormat, tenant, namespace)
resp, err := n.cli.Post(url, seconds)
if err != nil {
return err
}
return HttpCheck(resp)
}

func (n *NamespacesImpl) RemoveNamespaceMessageTTL(tenant, namespace string) error {
url := fmt.Sprintf(UrlNamespaceMessageTTLFormat, tenant, namespace)
resp, err := n.cli.Delete(url)
if err != nil {
return err
}
return HttpCheck(resp)
}

func (n *NamespacesImpl) GetMaximumUnCompactedBytes(tenant, namespace string) (int64, error) {
url := fmt.Sprintf(UrlNamespaceCompactionThresholdFormat, tenant, namespace)
resp, err := n.cli.Get(url)
if err != nil {
return 0, err
}
res, err := ReadAll(resp.Body)
if err != nil {
return 0, err
}
if len(res) == 0 {
return 0, nil
}
i, err := strconv.ParseInt(res, 10, 64)
if err != nil {
return 0, err
}
return i, nil
}

func (n *NamespacesImpl) SetMaximumUnCompactedBytes(tenant, namespace string, threshold int64) error {
url := fmt.Sprintf(UrlNamespaceCompactionThresholdFormat, tenant, namespace)
resp, err := n.cli.Put(url, threshold)
if err != nil {
return err
}
return HttpCheck(resp)
}

func (n *NamespacesImpl) RemoveMaximumUnCompactedBytes(tenant, namespace string) error {
url := fmt.Sprintf(UrlNamespaceCompactionThresholdFormat, tenant, namespace)
resp, err := n.cli.Delete(url)
if err != nil {
return err
}
return HttpCheck(resp)
}
123 changes: 106 additions & 17 deletions padmin/namespaces_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,45 @@ func TestNamespaces(t *testing.T) {
broker := startTestBroker(t)
defer broker.Close()
admin := NewTestPulsarAdmin(t, broker.webPort)
testTenant := RandStr(8)
err := admin.Tenants.Create(testTenant, TenantInfo{
AllowedClusters: []string{"standalone"},
})
require.Nil(t, err)
testNs := RandStr(8)
err := admin.Namespaces.Create("public", testNs)
err = admin.Namespaces.Create(testTenant, testNs)
require.Nil(t, err)
namespaces, err := admin.Namespaces.List("public")
namespaces, err := admin.Namespaces.List(testTenant)
require.Nil(t, err)
assert.Contains(t, namespaces, fmt.Sprintf("public/%s", testNs))
err = admin.Namespaces.Delete("public", testNs)
assert.Contains(t, namespaces, fmt.Sprintf("%s/%s", testTenant, testNs))
err = admin.Namespaces.Delete(testTenant, testNs)
require.Nil(t, err)
}

func TestNamespacesImpl_OperateNamespaceRetention(t *testing.T) {
broker := startTestBroker(t)
defer broker.Close()
admin := NewTestPulsarAdmin(t, broker.webPort)
testTenant := RandStr(8)
err := admin.Tenants.Create(testTenant, TenantInfo{
AllowedClusters: []string{"standalone"},
})
require.Nil(t, err)
testNs := RandStr(8)
err := admin.Namespaces.Create("public", testNs)
err = admin.Namespaces.Create(testTenant, testNs)
require.Nil(t, err)
err = admin.Namespaces.SetNamespaceRetention("public", testNs, &RetentionConfiguration{
err = admin.Namespaces.SetNamespaceRetention(testTenant, testNs, &RetentionConfiguration{
RetentionSizeInMB: 100,
RetentionTimeInMinutes: 10,
})
require.Nil(t, err)
cfg, err := admin.Namespaces.GetNamespaceRetention("public", testNs)
cfg, err := admin.Namespaces.GetNamespaceRetention(testTenant, testNs)
require.Nil(t, err)
require.EqualValues(t, 100, cfg.RetentionSizeInMB)
require.EqualValues(t, 10, cfg.RetentionTimeInMinutes)
err = admin.Namespaces.RemoveNamespaceRetention("public", testNs)
err = admin.Namespaces.RemoveNamespaceRetention(testTenant, testNs)
require.Nil(t, err)
cfg, err = admin.Namespaces.GetNamespaceRetention("public", testNs)
cfg, err = admin.Namespaces.GetNamespaceRetention(testTenant, testNs)
require.Nil(t, err)
require.EqualValues(t, 0, cfg.RetentionSizeInMB)
require.EqualValues(t, 0, cfg.RetentionTimeInMinutes)
Expand All @@ -67,10 +77,15 @@ func TestNamespacesImpl_GetBacklogQuota(t *testing.T) {
broker := startTestBroker(t)
defer broker.Close()
admin := NewTestPulsarAdmin(t, broker.webPort)
testTenant := RandStr(8)
err := admin.Tenants.Create(testTenant, TenantInfo{
AllowedClusters: []string{"standalone"},
})
require.Nil(t, err)
testNs := RandStr(8)
err := admin.Namespaces.Create("public", testNs)
err = admin.Namespaces.Create(testTenant, testNs)
require.Nil(t, err)
info, err := admin.Namespaces.GetNamespaceBacklogQuota("public", testNs)
info, err := admin.Namespaces.GetNamespaceBacklogQuota(testTenant, testNs)
require.Nil(t, err)
t.Logf("get quota info: %+v", info)
}
Expand All @@ -79,10 +94,15 @@ func TestNamespacesImpl_SetNamespaceBacklogQuota(t *testing.T) {
broker := startTestBroker(t)
defer broker.Close()
admin := NewTestPulsarAdmin(t, broker.webPort)
testTenant := RandStr(8)
err := admin.Tenants.Create(testTenant, TenantInfo{
AllowedClusters: []string{"standalone"},
})
require.Nil(t, err)
testNs := RandStr(8)
err := admin.Namespaces.Create("public", testNs)
err = admin.Namespaces.Create(testTenant, testNs)
require.Nil(t, err)
err = admin.Namespaces.SetNamespaceBacklogQuota("public", testNs, &BacklogQuota{
err = admin.Namespaces.SetNamespaceBacklogQuota(testTenant, testNs, &BacklogQuota{
Limit: 100,
LimitSize: 100,
LimitTime: 30,
Expand All @@ -95,20 +115,89 @@ func TestNamespacesImpl_RemoveNamespaceBacklogQuota(t *testing.T) {
broker := startTestBroker(t)
defer broker.Close()
admin := NewTestPulsarAdmin(t, broker.webPort)
testTenant := RandStr(8)
err := admin.Tenants.Create(testTenant, TenantInfo{
AllowedClusters: []string{"standalone"},
})
require.Nil(t, err)
testNs := RandStr(8)
err := admin.Namespaces.Create("public", testNs)
err = admin.Namespaces.Create(testTenant, testNs)
require.Nil(t, err)
err = admin.Namespaces.RemoveNamespaceBacklogQuota("public", testNs)
err = admin.Namespaces.RemoveNamespaceBacklogQuota(testTenant, testNs)
require.Nil(t, err)
}

func TestNamespacesImpl_ClearNamespaceAllTopicsBacklog(t *testing.T) {
broker := startTestBroker(t)
defer broker.Close()
admin := NewTestPulsarAdmin(t, broker.webPort)
testTenant := RandStr(8)
err := admin.Tenants.Create(testTenant, TenantInfo{
AllowedClusters: []string{"standalone"},
})
require.Nil(t, err)
testNs := RandStr(8)
err = admin.Namespaces.Create(testTenant, testNs)
require.Nil(t, err)
err = admin.Namespaces.ClearNamespaceAllTopicsBacklog(testTenant, testNs)
require.Nil(t, err)
}

func TestNamespacesImpl_OperateMessageTTL(t *testing.T) {
broker := startTestBroker(t)
defer broker.Close()
admin := NewTestPulsarAdmin(t, broker.webPort)
testTenant := RandStr(8)
err := admin.Tenants.Create(testTenant, TenantInfo{
AllowedClusters: []string{"standalone"},
})
require.Nil(t, err)
testNs := RandStr(8)
err = admin.Namespaces.Create(testTenant, testNs)
require.Nil(t, err)
namespaces, err := admin.Namespaces.List(testTenant)
require.Nil(t, err)
assert.Contains(t, namespaces, fmt.Sprintf("%s/%s", testTenant, testNs))
_, err = admin.Namespaces.GetNamespaceMessageTTL(testTenant, testNs)
require.Nil(t, err)
err = admin.Namespaces.SetNamespaceMessageTTL(testTenant, testNs, 30)
require.Nil(t, err)
ttl, err := admin.Namespaces.GetNamespaceMessageTTL(testTenant, testNs)
require.Nil(t, err)
require.EqualValues(t, 30, ttl)
err = admin.Namespaces.RemoveNamespaceMessageTTL(testTenant, testNs)
require.Nil(t, err)
ttl, err = admin.Namespaces.GetNamespaceMessageTTL(testTenant, testNs)
require.Nil(t, err)
require.EqualValues(t, 0, ttl)
}

func TestNamespacesImpl_OperateCompaction(t *testing.T) {
broker := startTestBroker(t)
defer broker.Close()
admin := NewTestPulsarAdmin(t, broker.webPort)
testTenant := RandStr(8)
err := admin.Tenants.Create(testTenant, TenantInfo{
AllowedClusters: []string{"standalone"},
})
require.Nil(t, err)
testNs := RandStr(8)
err := admin.Namespaces.Create("public", testNs)
err = admin.Namespaces.Create(testTenant, testNs)
require.Nil(t, err)
namespaces, err := admin.Namespaces.List(testTenant)
require.Nil(t, err)
assert.Contains(t, namespaces, fmt.Sprintf("%s/%s", testTenant, testNs))
threshold, err := admin.Namespaces.GetMaximumUnCompactedBytes(testTenant, testNs)
require.Nil(t, err)
require.EqualValues(t, 0, threshold)
err = admin.Namespaces.SetMaximumUnCompactedBytes(testTenant, testNs, 10)
require.Nil(t, err)
threshold, err = admin.Namespaces.GetMaximumUnCompactedBytes(testTenant, testNs)
require.Nil(t, err)
require.EqualValues(t, 10, threshold)
err = admin.Namespaces.RemoveMaximumUnCompactedBytes(testTenant, testNs)
require.Nil(t, err)
err = admin.Namespaces.ClearNamespaceAllTopicsBacklog("public", testNs)
threshold, err = admin.Namespaces.GetMaximumUnCompactedBytes(testTenant, testNs)
require.Nil(t, err)
require.EqualValues(t, 0, threshold)
}
Loading