Skip to content

Commit

Permalink
Add SecureDeleteMsg method to JetStreamManager
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Aug 4, 2022
1 parent d4eeb20 commit 2a932dd
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 7 deletions.
37 changes: 30 additions & 7 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,13 @@ type JetStreamManager interface {
// The stream must have been created/updated with the AllowDirect boolean.
GetLastMsg(name, subject string, opts ...JSOpt) (*RawStreamMsg, error)

// DeleteMsg erases a message from a stream.
// DeleteMsg deletes a message from a stream. The message is marked as erased, but its value is not overwritten.
DeleteMsg(name string, seq uint64, opts ...JSOpt) error

// SecureDeleteMsg deletes a message from a stream. The deleted message is overwritten with random data
// As a result, this operation is slower than DeleteMsg()
SecureDeleteMsg(name string, seq uint64, opts ...JSOpt) error

// AddConsumer adds a consumer to a stream.
AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)

Expand Down Expand Up @@ -1012,7 +1016,8 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error
}

type msgDeleteRequest struct {
Seq uint64 `json:"seq"`
Seq uint64 `json:"seq"`
NoErase bool `json:"no_erase,omitempty"`
}

// msgDeleteResponse is the response for a Stream delete request.
Expand All @@ -1022,6 +1027,7 @@ type msgDeleteResponse struct {
}

// DeleteMsg deletes a message from a stream.
// The message is marked as erased, but not overwritten
func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
Expand All @@ -1031,17 +1037,34 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error {
defer cancel()
}

if name == _EMPTY_ {
return ErrStreamNameRequired
return js.deleteMsg(o.ctx, name, &msgDeleteRequest{Seq: seq, NoErase: true})
}

// SecureDeleteMsg deletes a message from a stream. The deleted message is overwritten with random data
// As a result, this operation is slower than DeleteMsg()
func (js *js) SecureDeleteMsg(name string, seq uint64, opts ...JSOpt) error {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return err
}
if cancel != nil {
defer cancel()
}

req, err := json.Marshal(&msgDeleteRequest{Seq: seq})
return js.deleteMsg(o.ctx, name, &msgDeleteRequest{Seq: seq})
}

func (js *js) deleteMsg(ctx context.Context, stream string, req *msgDeleteRequest) error {
if err := checkStreamName(stream); err != nil {
return err
}
reqJSON, err := json.Marshal(req)
if err != nil {
return err
}

dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name))
r, err := js.apiRequestWithContext(o.ctx, dsSubj, req)
dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, stream))
r, err := js.apiRequestWithContext(ctx, dsSubj, reqJSON)
if err != nil {
return err
}
Expand Down
131 changes: 131 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2207,10 +2207,141 @@ func TestJetStreamManagement_DeleteMsg(t *testing.T) {
}
originalSeq := meta.Sequence.Stream

// create a subscription on delete message API subject to verify the content of delete operation
apiSub, err := nc.SubscribeSync("$JS.API.STREAM.MSG.DELETE.foo")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
err = js.DeleteMsg("foo", originalSeq)
if err != nil {
t.Fatal(err)
}
msg, err = apiSub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if str := string(msg.Data); !strings.Contains(str, "no_erase\":true") {
t.Fatalf("Request should not have no_erase field set: %s", str)
}

si, err = js.StreamInfo("foo")
if err != nil {
t.Fatal(err)
}
total = 14
if si.State.Msgs != total {
t.Errorf("Expected %d msgs, got: %d", total, si.State.Msgs)
}

// There should be only 4 messages since one deleted.
expected = 4
msgs = make([]*nats.Msg, 0)
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

sub, err = js.Subscribe("foo.C", func(msg *nats.Msg) {
msgs = append(msgs, msg)

if len(msgs) == expected {
cancel()
}
})
if err != nil {
t.Fatal(err)
}
<-ctx.Done()
sub.Unsubscribe()

msg = msgs[0]
meta, err = msg.Metadata()
if err != nil {
t.Fatal(err)
}
newSeq := meta.Sequence.Stream

// First message removed
if newSeq <= originalSeq {
t.Errorf("Expected %d to be higher sequence than %d", newSeq, originalSeq)
}
}

func TestJetStreamManagement_SecureDeleteMsg(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

_, err = js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.A", "foo.B", "foo.C"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for i := 0; i < 5; i++ {
js.Publish("foo.A", []byte("A"))
js.Publish("foo.B", []byte("B"))
js.Publish("foo.C", []byte("C"))
}

si, err := js.StreamInfo("foo")
if err != nil {
t.Fatal(err)
}
var total uint64 = 15
if si.State.Msgs != total {
t.Errorf("Expected %d msgs, got: %d", total, si.State.Msgs)
}

expected := 5
msgs := make([]*nats.Msg, 0)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

sub, err := js.Subscribe("foo.C", func(msg *nats.Msg) {
msgs = append(msgs, msg)
if len(msgs) == expected {
cancel()
}
})
if err != nil {
t.Fatal(err)
}
<-ctx.Done()
sub.Unsubscribe()

got := len(msgs)
if got != expected {
t.Fatalf("Expected %d, got %d", expected, got)
}

msg := msgs[0]
meta, err := msg.Metadata()
if err != nil {
t.Fatal(err)
}
originalSeq := meta.Sequence.Stream

// create a subscription on delete message API subject to verify the content of delete operation
apiSub, err := nc.SubscribeSync("$JS.API.STREAM.MSG.DELETE.foo")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
err = js.SecureDeleteMsg("foo", originalSeq)
if err != nil {
t.Fatal(err)
}
msg, err = apiSub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if str := string(msg.Data); strings.Contains(str, "no_erase\":true") {
t.Fatalf("Request should not have no_erase field set: %s", str)
}

si, err = js.StreamInfo("foo")
if err != nil {
Expand Down

0 comments on commit 2a932dd

Please sign in to comment.