From 5bab0978874974f2697acae46da7b01da6481f4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81ngel=20Ortu=C3=B1o?= Date: Sun, 12 Jun 2022 12:51:43 +0200 Subject: [PATCH] module: added support for xep-0313 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miguel Ángel Ortuño --- config/example.config.yaml | 33 ++- helm/sql/postgres.up.psql | 22 ++ helm/values.yaml | 4 + pkg/c2s/in.go | 22 +- pkg/hook/c2s.go | 8 +- pkg/hook/hooks.go | 10 +- pkg/hook/s2s.go | 14 +- pkg/jackal/config.go | 5 + pkg/jackal/jackal.go | 3 +- pkg/jackal/modules.go | 9 +- pkg/model/archive/archive.pb.go | 393 ++++++++++++++++++++++++++ pkg/model/archive/codec.go | 37 +++ pkg/module/offline/interface.go | 6 + pkg/module/offline/offline.go | 54 ++-- pkg/module/offline/offline_test.go | 29 +- pkg/module/xep0198/stream.go | 16 +- pkg/module/xep0313/mam.go | 269 ++++++++++++++++++ pkg/module/xep0313/mam_test.go | 15 + pkg/s2s/in.go | 31 +- pkg/storage/boltdb/archive.go | 144 ++++++++++ pkg/storage/boltdb/archive_test.go | 182 ++++++++++++ pkg/storage/boltdb/op.go | 3 +- pkg/storage/boltdb/repository.go | 1 + pkg/storage/boltdb/repository_test.go | 36 +++ pkg/storage/boltdb/tx.go | 2 + pkg/storage/cached/cached.go | 2 + pkg/storage/cached/tx.go | 2 + pkg/storage/measured/archive.go | 56 ++++ pkg/storage/measured/archive_test.go | 84 ++++++ pkg/storage/measured/measured.go | 2 + pkg/storage/measured/tx.go | 2 + pkg/storage/pgsql/archive.go | 114 ++++++++ pkg/storage/pgsql/archive_test.go | 118 ++++++++ pkg/storage/pgsql/repository.go | 2 + pkg/storage/pgsql/tx.go | 2 + pkg/storage/repository/archive.go | 36 +++ pkg/storage/repository/repository.go | 1 + pkg/util/xmpp/xmpp.go | 16 ++ proto/model/v1/archive.proto | 64 +++++ scripts/genproto.sh | 1 + sql/postgres.up.psql | 22 ++ 41 files changed, 1779 insertions(+), 93 deletions(-) create mode 100644 pkg/model/archive/archive.pb.go create mode 100644 pkg/model/archive/codec.go create mode 100644 pkg/module/xep0313/mam.go create mode 100644 pkg/module/xep0313/mam_test.go create mode 100644 pkg/storage/boltdb/archive.go create mode 100644 pkg/storage/boltdb/archive_test.go create mode 100644 pkg/storage/boltdb/repository_test.go create mode 100644 pkg/storage/measured/archive.go create mode 100644 pkg/storage/measured/archive_test.go create mode 100644 pkg/storage/pgsql/archive.go create mode 100644 pkg/storage/pgsql/archive_test.go create mode 100644 pkg/storage/repository/archive.go create mode 100644 proto/model/v1/archive.proto diff --git a/config/example.config.yaml b/config/example.config.yaml index 34e9f18b9..7dbf2c3f9 100644 --- a/config/example.config.yaml +++ b/config/example.config.yaml @@ -24,20 +24,20 @@ # cert_file: "" # privkey_file: "" -#storage: -# type: pgsql -# pgsql: -# host: 127.0.0.1:5432 -# user: jackal -# password: a-secret-key -# database: jackal -# max_open_conns: 16 -# -# cache: -# type: redis -# redis: -# addresses: -# - localhost:6379 +storage: + type: pgsql + pgsql: + host: 127.0.0.1:5432 + user: jackal + password: a-secret-key + database: jackal + max_open_conns: 16 + + cache: + type: redis + redis: + addresses: + - localhost:6379 #cluster: # type: kv @@ -128,6 +128,7 @@ modules: # - ping # XEP-0199: XMPP Ping # - time # XEP-0202: Entity Time # - carbons # XEP-0280: Message Carbons +# - mam # XEP-0313: Message Archive Management # # version: # show_os: true @@ -140,6 +141,10 @@ modules: # interval: 3m # send_pings: true # timeout_action: kill +# +# mam: +# queue_size: 1500 +# components: secret: a-super-secret-key diff --git a/helm/sql/postgres.up.psql b/helm/sql/postgres.up.psql index 3be5cf8da..39d0ad1c1 100644 --- a/helm/sql/postgres.up.psql +++ b/helm/sql/postgres.up.psql @@ -170,3 +170,25 @@ CREATE TABLE IF NOT EXISTS vcards ( ); SELECT enable_updated_at('vcards'); + +-- archives + +CREATE TABLE IF NOT EXISTS archives ( + serial SERIAL PRIMARY KEY, + archive_id VARCHAR(1023), + id VARCHAR(255) NOT NULL, + "from" TEXT NOT NULL, + from_bare TEXT NOT NULL, + "to" TEXT NOT NULL, + to_bare TEXT NOT NULL, + message BYTEA NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS i_archives_archive_id ON archives(archive_id); +CREATE INDEX IF NOT EXISTS i_archives_id ON archives(id); +CREATE INDEX IF NOT EXISTS i_archives_to ON archives("to"); +CREATE INDEX IF NOT EXISTS i_archives_to_bare ON archives(to_bare); +CREATE INDEX IF NOT EXISTS i_archives_from ON archives("from"); +CREATE INDEX IF NOT EXISTS i_archives_from_bare ON archives(from_bare); +CREATE INDEX IF NOT EXISTS i_archives_created_at ON archives(created_at); diff --git a/helm/values.yaml b/helm/values.yaml index 8f9e21369..d28dc9c05 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -125,6 +125,7 @@ jackal: - ping # XEP-0199: XMPP Ping - time # XEP-0202: Entity Time - carbons # XEP-0280: Message Carbons + - mam # XEP-0313: Message Archive Management version: show_os: true @@ -138,6 +139,9 @@ jackal: send_pings: true timeout_action: kill + mam: + queue_size: 1500 + components: # listeners: # - port: 5275 diff --git a/pkg/c2s/in.go b/pkg/c2s/in.go index 226bf5724..dcd011f37 100644 --- a/pkg/c2s/in.go +++ b/pkg/c2s/in.go @@ -601,8 +601,8 @@ func (s *inC2S) processIQ(ctx context.Context, iq *stravaganza.IQ) error { case router.ErrRemoteServerTimeout: return s.sendElement(ctx, stanzaerror.E(stanzaerror.RemoteServerTimeout, iq).Element()) - case nil: - _, err := s.runHook(ctx, hook.C2SStreamIQRouted, &hook.C2SStreamInfo{ + case nil, router.ErrUserNotAvailable: + _, err = s.runHook(ctx, hook.C2SStreamIQRouted, &hook.C2SStreamInfo{ ID: s.ID().String(), JID: s.JID(), Presence: s.Presence(), @@ -647,7 +647,7 @@ func (s *inC2S) processPresence(ctx context.Context, presence *stravaganza.Prese } targets, err := s.router.Route(ctx, outPr) switch err { - case nil: + case nil, router.ErrUserNotAvailable: _, err = s.runHook(ctx, hook.C2SStreamPresenceRouted, &hook.C2SStreamInfo{ ID: s.ID().String(), JID: s.JID(), @@ -718,18 +718,21 @@ sendMsg: case router.ErrRemoteServerTimeout: return s.sendElement(ctx, stanzaerror.E(stanzaerror.RemoteServerTimeout, message).Element()) - case router.ErrUserNotAvailable: - return s.sendElement(ctx, stanzaerror.E(stanzaerror.ServiceUnavailable, message).Element()) - - case nil: - _, err = s.runHook(ctx, hook.C2SStreamMessageRouted, &hook.C2SStreamInfo{ + case nil, router.ErrUserNotAvailable: + halted, hErr := s.runHook(ctx, hook.C2SStreamMessageRouted, &hook.C2SStreamInfo{ ID: s.ID().String(), JID: s.JID(), Presence: s.Presence(), Targets: targets, Element: msg, }) - return err + if halted { + return nil + } + if errors.Is(err, router.ErrUserNotAvailable) { + return s.sendElement(ctx, stanzaerror.E(stanzaerror.ServiceUnavailable, message).Element()) + } + return hErr default: return err @@ -1105,6 +1108,7 @@ func (s *inC2S) close(ctx context.Context, disconnectErr error) error { halted, err := s.runHook(ctx, hook.C2SStreamDisconnected, &hook.C2SStreamInfo{ ID: s.ID().String(), JID: s.JID(), + Presence: s.Presence(), DisconnectError: disconnectErr, }) if halted { diff --git a/pkg/hook/c2s.go b/pkg/hook/c2s.go index 744361b1f..16d763b40 100644 --- a/pkg/hook/c2s.go +++ b/pkg/hook/c2s.go @@ -47,16 +47,16 @@ const ( // C2SStreamWillRouteElement hook runs when an XMPP element is about to be routed over a C2S stream. C2SStreamWillRouteElement = "c2s.stream.will_route_element" - // C2SStreamIQRouted hook runs when an iq stanza is successfully routed to one ore more C2S streams. + // C2SStreamIQRouted hook runs when an iq stanza is successfully routed to zero or more C2S streams. C2SStreamIQRouted = "c2s.stream.iq_routed" - // C2SStreamPresenceRouted hook runs when a presence stanza is successfully routed to one ore more C2S streams. + // C2SStreamPresenceRouted hook runs when a presence stanza is successfully routed to zero or more C2S streams. C2SStreamPresenceRouted = "c2s.stream.presence_routed" - // C2SStreamMessageRouted hook runs when a message stanza is successfully routed to one ore more C2S streams. + // C2SStreamMessageRouted hook runs when a message stanza is successfully routed to zero or more C2S streams. C2SStreamMessageRouted = "c2s.stream.message_routed" - // C2SStreamElementSent hook runs when a XMPP element is sent over a C2S stream. + // C2SStreamElementSent hook runs when an XMPP element is sent over a C2S stream. C2SStreamElementSent = "c2s.stream.element_sent" ) diff --git a/pkg/hook/hooks.go b/pkg/hook/hooks.go index 51b445bac..4a53b8e71 100644 --- a/pkg/hook/hooks.go +++ b/pkg/hook/hooks.go @@ -28,13 +28,19 @@ type Priority int32 const ( // LowestPriority defines lowest hook execution priority. - LowestPriority = Priority(math.MinInt32 + 100) + LowestPriority = Priority(math.MinInt32) + + // LowPriority defines low hook execution priority. + LowPriority = Priority(math.MinInt32 + 1000) // DefaultPriority defines default hook execution priority. DefaultPriority = Priority(0) + // HighPriority defines high hook execution priority. + HighPriority = Priority(math.MaxInt32 - 1000) + // HighestPriority defines highest hook execution priority. - HighestPriority = Priority(math.MaxInt32 - 100) + HighestPriority = Priority(math.MaxInt32) ) // Handler defines a generic hook handler function. diff --git a/pkg/hook/s2s.go b/pkg/hook/s2s.go index 0eab5ed2f..6b13c4d6a 100644 --- a/pkg/hook/s2s.go +++ b/pkg/hook/s2s.go @@ -16,6 +16,7 @@ package hook import ( "github.com/jackal-xmpp/stravaganza" + "github.com/jackal-xmpp/stravaganza/jid" ) const ( @@ -25,7 +26,7 @@ const ( // S2SOutStreamDisconnected hook runs when an outgoing S2S connection is unregistered. S2SOutStreamDisconnected = "s2s.out.stream.disconnected" - // S2SOutStreamElementSent hook runs whenever a XMPP element is sent over an outgoing S2S stream. + // S2SOutStreamElementSent hook runs whenever an XMPP element is sent over an outgoing S2S stream. S2SOutStreamElementSent = "s2s.out.stream.element_sent" // S2SInStreamRegistered hook runs when an incoming S2S connection is registered. @@ -34,7 +35,7 @@ const ( // S2SInStreamUnregistered hook runs when an incoming S2S connection is unregistered. S2SInStreamUnregistered = "s2s.in.stream.unregistered" - // S2SInStreamElementReceived hook runs when a XMPP element is received over an incoming S2S stream. + // S2SInStreamElementReceived hook runs when an XMPP element is received over an incoming S2S stream. S2SInStreamElementReceived = "s2s.in.stream.stanza_received" // S2SInStreamIQReceived hook runs when an iq stanza is received over an incoming S2S stream. @@ -49,13 +50,13 @@ const ( // S2SInStreamWillRouteElement hook runs when an XMPP element is about to be routed on an incoming S2S stream. S2SInStreamWillRouteElement = "s2s.in.stream.will_route_element" - // S2SInStreamIQRouted hook runs when an iq stanza is successfully routed to one ore more S2S streams. + // S2SInStreamIQRouted hook runs when an iq stanza is successfully routed to zero or more C2S streams. S2SInStreamIQRouted = "s2s.in.stream.iq_routed" - // S2SInStreamPresenceRouted hook runs when a presence stanza is successfully routed to one ore more S2S streams. + // S2SInStreamPresenceRouted hook runs when a presence stanza is successfully routed to zero or more C2S streams. S2SInStreamPresenceRouted = "s2s.in.stream.presence_routed" - // S2SInStreamMessageRouted hook runs when a message stanza is successfully routed to one ore more S2S streams. + // S2SInStreamMessageRouted hook runs when a message stanza is successfully routed to zero or more C2S streams. S2SInStreamMessageRouted = "s2s.in.stream.message_routed" ) @@ -70,6 +71,9 @@ type S2SStreamInfo struct { // Target is the S2S target domain. Target string + // Targets contains all JIDs to which event stanza was routed. + Targets []jid.JID + // Element is the event associated XMPP element. Element stravaganza.Element } diff --git a/pkg/jackal/config.go b/pkg/jackal/config.go index ae4c95173..95452d224 100644 --- a/pkg/jackal/config.go +++ b/pkg/jackal/config.go @@ -17,6 +17,8 @@ package jackal import ( "path/filepath" + "github.com/ortuman/jackal/pkg/module/xep0313" + "github.com/kkyr/fig" adminserver "github.com/ortuman/jackal/pkg/admin/server" "github.com/ortuman/jackal/pkg/auth/pepper" @@ -95,6 +97,9 @@ type ModulesConfig struct { // XEP-0199: XMPP Ping Ping xep0199.Config `fig:"ping"` + + // XEP-0313: Message Archive Management + Mam xep0313.Config `fig:"mam"` } // Config defines jackal application configuration. diff --git a/pkg/jackal/jackal.go b/pkg/jackal/jackal.go index 6ccdac89a..1eeb6dfff 100644 --- a/pkg/jackal/jackal.go +++ b/pkg/jackal/jackal.go @@ -26,8 +26,6 @@ import ( "syscall" "time" - streamqueue "github.com/ortuman/jackal/pkg/module/xep0198/queue" - kitlog "github.com/go-kit/log" "github.com/go-kit/log/level" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" @@ -47,6 +45,7 @@ import ( "github.com/ortuman/jackal/pkg/host" "github.com/ortuman/jackal/pkg/log" "github.com/ortuman/jackal/pkg/module" + streamqueue "github.com/ortuman/jackal/pkg/module/xep0198/queue" "github.com/ortuman/jackal/pkg/router" "github.com/ortuman/jackal/pkg/s2s" "github.com/ortuman/jackal/pkg/shaper" diff --git a/pkg/jackal/modules.go b/pkg/jackal/modules.go index d03b87fff..d6b11bcda 100644 --- a/pkg/jackal/modules.go +++ b/pkg/jackal/modules.go @@ -30,6 +30,7 @@ import ( "github.com/ortuman/jackal/pkg/module/xep0199" "github.com/ortuman/jackal/pkg/module/xep0202" "github.com/ortuman/jackal/pkg/module/xep0280" + "github.com/ortuman/jackal/pkg/module/xep0313" ) var defaultModules = []string{ @@ -45,6 +46,7 @@ var defaultModules = []string{ xep0198.ModuleName, xep0199.ModuleName, xep0280.ModuleName, + xep0313.ModuleName, } var modFns = map[string]func(a *Jackal, cfg *ModulesConfig) module.Module{ @@ -56,7 +58,7 @@ var modFns = map[string]func(a *Jackal, cfg *ModulesConfig) module.Module{ // Offline // (https://xmpp.org/extensions/xep-0160.html) offline.ModuleName: func(j *Jackal, cfg *ModulesConfig) module.Module { - return offline.New(cfg.Offline, j.router, j.hosts, j.resMng, j.rep, j.hk, j.logger) + return offline.New(cfg.Offline, j.router, j.hosts, j.rep, j.hk, j.logger) }, // XEP-0012: Last Activity // (https://xmpp.org/extensions/xep-0012.html) @@ -114,4 +116,9 @@ var modFns = map[string]func(a *Jackal, cfg *ModulesConfig) module.Module{ xep0280.ModuleName: func(j *Jackal, _ *ModulesConfig) module.Module { return xep0280.New(j.router, j.hosts, j.resMng, j.hk, j.logger) }, + // XEP-0313: Message Archive Management + // (https://xmpp.org/extensions/xep-0313.html) + xep0313.ModuleName: func(j *Jackal, cfg *ModulesConfig) module.Module { + return xep0313.New(cfg.Mam, j.router, j.hosts, j.rep, j.hk, j.logger) + }, } diff --git a/pkg/model/archive/archive.pb.go b/pkg/model/archive/archive.pb.go new file mode 100644 index 000000000..139e92487 --- /dev/null +++ b/pkg/model/archive/archive.pb.go @@ -0,0 +1,393 @@ +// Copyright 2022 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.19.4 +// source: proto/model/v1/archive.proto + +package archivemodel + +import ( + stravaganza "github.com/jackal-xmpp/stravaganza" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Message represents an archive message entity. +type Message struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // archived_id is the message archive identifier. + ArchiveId string `protobuf:"bytes,1,opt,name=archive_id,json=archiveId,proto3" json:"archive_id,omitempty"` + // id is the message archive unique identifier. + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` + // from_jid is the message from jid value. + FromJid string `protobuf:"bytes,3,opt,name=from_jid,json=fromJid,proto3" json:"from_jid,omitempty"` + // to_jid is the message from jid value. + ToJid string `protobuf:"bytes,4,opt,name=to_jid,json=toJid,proto3" json:"to_jid,omitempty"` + // message is the archived message. + Message *stravaganza.PBElement `protobuf:"bytes,5,opt,name=message,proto3" json:"message,omitempty"` + // stamp is the timestamp in which the message was archived. + Stamp *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=stamp,proto3" json:"stamp,omitempty"` +} + +func (x *Message) Reset() { + *x = Message{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_model_v1_archive_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_proto_model_v1_archive_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_proto_model_v1_archive_proto_rawDescGZIP(), []int{0} +} + +func (x *Message) GetArchiveId() string { + if x != nil { + return x.ArchiveId + } + return "" +} + +func (x *Message) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *Message) GetFromJid() string { + if x != nil { + return x.FromJid + } + return "" +} + +func (x *Message) GetToJid() string { + if x != nil { + return x.ToJid + } + return "" +} + +func (x *Message) GetMessage() *stravaganza.PBElement { + if x != nil { + return x.Message + } + return nil +} + +func (x *Message) GetStamp() *timestamppb.Timestamp { + if x != nil { + return x.Stamp + } + return nil +} + +// Messages represents a set of archive messages. +type Messages struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ArchiveMessages []*Message `protobuf:"bytes,1,rep,name=archive_messages,json=archiveMessages,proto3" json:"archive_messages,omitempty"` +} + +func (x *Messages) Reset() { + *x = Messages{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_model_v1_archive_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Messages) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Messages) ProtoMessage() {} + +func (x *Messages) ProtoReflect() protoreflect.Message { + mi := &file_proto_model_v1_archive_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Messages.ProtoReflect.Descriptor instead. +func (*Messages) Descriptor() ([]byte, []int) { + return file_proto_model_v1_archive_proto_rawDescGZIP(), []int{1} +} + +func (x *Messages) GetArchiveMessages() []*Message { + if x != nil { + return x.ArchiveMessages + } + return nil +} + +// Metadata represents an archive metadata information. +type Metadata struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // start_timestamp is the identifier of the first archive message. + StartId string `protobuf:"bytes,1,opt,name=start_id,json=startId,proto3" json:"start_id,omitempty"` + // start_timestamp is the timestamp value of the first archive message. + StartTimestamp string `protobuf:"bytes,2,opt,name=start_timestamp,json=startTimestamp,proto3" json:"start_timestamp,omitempty"` + // end_id is the identifier of the last archive message. + EndId string `protobuf:"bytes,3,opt,name=end_id,json=endId,proto3" json:"end_id,omitempty"` + // end_timestamp is the timestamp value of the last archive message. + EndTimestamp string `protobuf:"bytes,4,opt,name=end_timestamp,json=endTimestamp,proto3" json:"end_timestamp,omitempty"` +} + +func (x *Metadata) Reset() { + *x = Metadata{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_model_v1_archive_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Metadata) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Metadata) ProtoMessage() {} + +func (x *Metadata) ProtoReflect() protoreflect.Message { + mi := &file_proto_model_v1_archive_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Metadata.ProtoReflect.Descriptor instead. +func (*Metadata) Descriptor() ([]byte, []int) { + return file_proto_model_v1_archive_proto_rawDescGZIP(), []int{2} +} + +func (x *Metadata) GetStartId() string { + if x != nil { + return x.StartId + } + return "" +} + +func (x *Metadata) GetStartTimestamp() string { + if x != nil { + return x.StartTimestamp + } + return "" +} + +func (x *Metadata) GetEndId() string { + if x != nil { + return x.EndId + } + return "" +} + +func (x *Metadata) GetEndTimestamp() string { + if x != nil { + return x.EndTimestamp + } + return "" +} + +var File_proto_model_v1_archive_proto protoreflect.FileDescriptor + +var file_proto_model_v1_archive_proto_rawDesc = []byte{ + 0x0a, 0x1c, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x31, + 0x2f, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x10, + 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x76, 0x31, + 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x1a, 0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6a, 0x61, + 0x63, 0x6b, 0x61, 0x6c, 0x2d, 0x78, 0x6d, 0x70, 0x70, 0x2f, 0x73, 0x74, 0x72, 0x61, 0x76, 0x61, + 0x67, 0x61, 0x6e, 0x7a, 0x61, 0x2f, 0x73, 0x74, 0x72, 0x61, 0x76, 0x61, 0x67, 0x61, 0x6e, 0x7a, + 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xce, 0x01, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x5f, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, + 0x49, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x69, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x66, 0x72, 0x6f, 0x6d, 0x5f, 0x6a, 0x69, 0x64, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x66, 0x72, 0x6f, 0x6d, 0x4a, 0x69, 0x64, 0x12, 0x15, 0x0a, + 0x06, 0x74, 0x6f, 0x5f, 0x6a, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, + 0x6f, 0x4a, 0x69, 0x64, 0x12, 0x30, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x73, 0x74, 0x72, 0x61, 0x76, 0x61, 0x67, 0x61, + 0x6e, 0x7a, 0x61, 0x2e, 0x50, 0x42, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x07, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x30, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, + 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x52, 0x05, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, 0x50, 0x0a, 0x08, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x73, 0x12, 0x44, 0x0a, 0x10, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x5f, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x19, + 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x76, + 0x31, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x0f, 0x61, 0x72, 0x63, 0x68, 0x69, + 0x76, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0x8a, 0x01, 0x0a, 0x08, 0x4d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x74, 0x61, 0x72, 0x74, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x74, 0x61, 0x72, 0x74, + 0x49, 0x64, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x73, 0x74, 0x61, + 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x15, 0x0a, 0x06, 0x65, + 0x6e, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x6e, 0x64, + 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x6e, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x6e, 0x64, 0x54, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x21, 0x5a, 0x1f, 0x70, 0x6b, 0x67, 0x2f, 0x6d, + 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2f, 0x3b, 0x61, 0x72, + 0x63, 0x68, 0x69, 0x76, 0x65, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_proto_model_v1_archive_proto_rawDescOnce sync.Once + file_proto_model_v1_archive_proto_rawDescData = file_proto_model_v1_archive_proto_rawDesc +) + +func file_proto_model_v1_archive_proto_rawDescGZIP() []byte { + file_proto_model_v1_archive_proto_rawDescOnce.Do(func() { + file_proto_model_v1_archive_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_model_v1_archive_proto_rawDescData) + }) + return file_proto_model_v1_archive_proto_rawDescData +} + +var file_proto_model_v1_archive_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_proto_model_v1_archive_proto_goTypes = []interface{}{ + (*Message)(nil), // 0: model.archive.v1.Message + (*Messages)(nil), // 1: model.archive.v1.Messages + (*Metadata)(nil), // 2: model.archive.v1.Metadata + (*stravaganza.PBElement)(nil), // 3: stravaganza.PBElement + (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp +} +var file_proto_model_v1_archive_proto_depIdxs = []int32{ + 3, // 0: model.archive.v1.Message.message:type_name -> stravaganza.PBElement + 4, // 1: model.archive.v1.Message.stamp:type_name -> google.protobuf.Timestamp + 0, // 2: model.archive.v1.Messages.archive_messages:type_name -> model.archive.v1.Message + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_proto_model_v1_archive_proto_init() } +func file_proto_model_v1_archive_proto_init() { + if File_proto_model_v1_archive_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_model_v1_archive_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Message); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_model_v1_archive_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Messages); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_model_v1_archive_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Metadata); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_model_v1_archive_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_proto_model_v1_archive_proto_goTypes, + DependencyIndexes: file_proto_model_v1_archive_proto_depIdxs, + MessageInfos: file_proto_model_v1_archive_proto_msgTypes, + }.Build() + File_proto_model_v1_archive_proto = out.File + file_proto_model_v1_archive_proto_rawDesc = nil + file_proto_model_v1_archive_proto_goTypes = nil + file_proto_model_v1_archive_proto_depIdxs = nil +} diff --git a/pkg/model/archive/codec.go b/pkg/model/archive/codec.go new file mode 100644 index 000000000..e3d126426 --- /dev/null +++ b/pkg/model/archive/codec.go @@ -0,0 +1,37 @@ +// Copyright 2022 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package archivemodel + +import "google.golang.org/protobuf/proto" + +// MarshalBinary satisfies encoding.BinaryMarshaler interface. +func (x *Message) MarshalBinary() (data []byte, err error) { + return proto.Marshal(x) +} + +// UnmarshalBinary satisfies encoding.BinaryUnmarshaler interface. +func (x *Message) UnmarshalBinary(data []byte) error { + return proto.Unmarshal(data, x) +} + +// MarshalBinary satisfies encoding.BinaryMarshaler interface. +func (x *Messages) MarshalBinary() (data []byte, err error) { + return proto.Marshal(x) +} + +// UnmarshalBinary satisfies encoding.BinaryUnmarshaler interface. +func (x *Messages) UnmarshalBinary(data []byte) error { + return proto.Unmarshal(data, x) +} diff --git a/pkg/module/offline/interface.go b/pkg/module/offline/interface.go index 1b8962dae..388731e3a 100644 --- a/pkg/module/offline/interface.go +++ b/pkg/module/offline/interface.go @@ -17,6 +17,7 @@ package offline import ( "github.com/ortuman/jackal/pkg/cluster/resourcemanager" "github.com/ortuman/jackal/pkg/router" + "github.com/ortuman/jackal/pkg/router/stream" "github.com/ortuman/jackal/pkg/storage/repository" ) @@ -39,3 +40,8 @@ type hosts interface { type resourceManager interface { resourcemanager.Manager } + +//go:generate moq -out stream.mock_test.go . c2sStream +type c2sStream interface { + stream.C2S +} diff --git a/pkg/module/offline/offline.go b/pkg/module/offline/offline.go index 5c3b9bc03..36180698b 100644 --- a/pkg/module/offline/offline.go +++ b/pkg/module/offline/offline.go @@ -19,11 +19,14 @@ import ( "fmt" "time" + "github.com/ortuman/jackal/pkg/router/stream" + + "github.com/jackal-xmpp/stravaganza/jid" + kitlog "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/jackal-xmpp/stravaganza" stanzaerror "github.com/jackal-xmpp/stravaganza/errors/stanza" - "github.com/ortuman/jackal/pkg/cluster/resourcemanager" "github.com/ortuman/jackal/pkg/hook" "github.com/ortuman/jackal/pkg/host" "github.com/ortuman/jackal/pkg/router" @@ -51,7 +54,6 @@ type Offline struct { cfg Config hosts hosts router router.Router - resMng resourcemanager.Manager rep repository.Repository hk *hook.Hooks logger kitlog.Logger @@ -62,7 +64,6 @@ func New( cfg Config, router router.Router, hosts *host.Hosts, - resMng resourcemanager.Manager, rep repository.Repository, hk *hook.Hooks, logger kitlog.Logger, @@ -71,7 +72,6 @@ func New( cfg: cfg, router: router, hosts: hosts, - resMng: resMng, rep: rep, hk: hk, logger: kitlog.With(logger, "module", ModuleName), @@ -96,8 +96,8 @@ func (m *Offline) AccountFeatures(_ context.Context) ([]string, error) { return // Start starts offline module. func (m *Offline) Start(_ context.Context) error { - m.hk.AddHook(hook.C2SStreamWillRouteElement, m.onWillRouteElement, hook.LowestPriority) - m.hk.AddHook(hook.S2SInStreamWillRouteElement, m.onWillRouteElement, hook.LowestPriority) + m.hk.AddHook(hook.C2SStreamMessageRouted, m.onMessageRouted, hook.LowestPriority) + m.hk.AddHook(hook.S2SInStreamMessageRouted, m.onMessageRouted, hook.LowestPriority) m.hk.AddHook(hook.C2SStreamPresenceReceived, m.onC2SPresenceRecv, hook.DefaultPriority) m.hk.AddHook(hook.UserDeleted, m.onUserDeleted, hook.DefaultPriority) @@ -108,8 +108,8 @@ func (m *Offline) Start(_ context.Context) error { // Stop stops offline module. func (m *Offline) Stop(_ context.Context) error { - m.hk.RemoveHook(hook.C2SStreamWillRouteElement, m.onWillRouteElement) - m.hk.RemoveHook(hook.S2SInStreamWillRouteElement, m.onWillRouteElement) + m.hk.RemoveHook(hook.C2SStreamMessageRouted, m.onMessageRouted) + m.hk.RemoveHook(hook.S2SInStreamMessageRouted, m.onMessageRouted) m.hk.RemoveHook(hook.C2SStreamPresenceReceived, m.onC2SPresenceRecv) m.hk.RemoveHook(hook.UserDeleted, m.onUserDeleted) @@ -118,15 +118,23 @@ func (m *Offline) Stop(_ context.Context) error { return nil } -func (m *Offline) onWillRouteElement(ctx context.Context, execCtx *hook.ExecutionContext) error { +func (m *Offline) onMessageRouted(ctx context.Context, execCtx *hook.ExecutionContext) error { var elem stravaganza.Element + var targets []jid.JID switch inf := execCtx.Info.(type) { case *hook.C2SStreamInfo: + targets = inf.Targets elem = inf.Element case *hook.S2SStreamInfo: + targets = inf.Targets elem = inf.Element } + // message was successufully routed to one of the available resources + if len(targets) > 0 { + return nil + } + msg, ok := elem.(*stravaganza.Message) if !ok || !isMessageArchievable(msg) { return nil @@ -135,18 +143,12 @@ func (m *Offline) onWillRouteElement(ctx context.Context, execCtx *hook.Executio if !m.hosts.IsLocalHost(toJID.Domain()) { return nil } - rss, err := m.resMng.GetResources(ctx, toJID.Node()) - if err != nil { - return err - } - if len(rss) > 0 { - return nil - } return m.archiveMessage(ctx, msg) } func (m *Offline) onC2SPresenceRecv(ctx context.Context, execCtx *hook.ExecutionContext) error { inf := execCtx.Info.(*hook.C2SStreamInfo) + stm := execCtx.Sender.(stream.C2S) pr := inf.Element.(*stravaganza.Presence) toJID := pr.ToJID() @@ -156,7 +158,7 @@ func (m *Offline) onC2SPresenceRecv(ctx context.Context, execCtx *hook.Execution if !pr.IsAvailable() || pr.Priority() < 0 { return nil } - return m.deliverOfflineMessages(ctx, toJID.Node()) + return m.deliverOfflineMessages(ctx, stm) } func (m *Offline) onUserDeleted(ctx context.Context, execCtx *hook.ExecutionContext) error { @@ -167,18 +169,20 @@ func (m *Offline) onUserDeleted(ctx context.Context, execCtx *hook.ExecutionCont if err := m.rep.Lock(ctx, lockID); err != nil { return err } - defer func() { _ = m.rep.Unlock(ctx, lockID) }() + defer m.releaseLock(ctx, lockID) return m.rep.DeleteOfflineMessages(ctx, inf.Username) } -func (m *Offline) deliverOfflineMessages(ctx context.Context, username string) error { +func (m *Offline) deliverOfflineMessages(ctx context.Context, stm stream.C2S) error { + username := stm.Username() + lockID := offlineQueueLockID(username) if err := m.rep.Lock(ctx, lockID); err != nil { return err } - defer func() { _ = m.rep.Unlock(ctx, lockID) }() + defer m.releaseLock(ctx, lockID) ms, err := m.rep.FetchOfflineMessages(ctx, username) if err != nil { @@ -193,7 +197,7 @@ func (m *Offline) deliverOfflineMessages(ctx context.Context, username string) e } // route offline messages for _, msg := range ms { - _, _ = m.router.Route(ctx, msg) + stm.SendElement(msg) } level.Info(m.logger).Log("msg", "delivered offline messages", "queue_size", len(ms), "username", username) @@ -209,7 +213,7 @@ func (m *Offline) archiveMessage(ctx context.Context, msg *stravaganza.Message) if err := m.rep.Lock(ctx, lockID); err != nil { return err } - defer func() { _ = m.rep.Unlock(ctx, lockID) }() + defer m.releaseLock(ctx, lockID) qSize, err := m.rep.CountOfflineMessages(ctx, username) if err != nil { @@ -241,6 +245,12 @@ func (m *Offline) archiveMessage(ctx context.Context, msg *stravaganza.Message) return hook.ErrStopped // already handled } +func (m *Offline) releaseLock(ctx context.Context, lockID string) { + if err := m.rep.Unlock(ctx, lockID); err != nil { + level.Warn(m.logger).Log("msg", "failed to release lock", "err", err) + } +} + func isMessageArchievable(msg *stravaganza.Message) bool { if msg.ChildNamespace("no-store", hintsNamespace) != nil { return false diff --git a/pkg/module/offline/offline_test.go b/pkg/module/offline/offline_test.go index f79238ee6..1a7f6ea53 100644 --- a/pkg/module/offline/offline_test.go +++ b/pkg/module/offline/offline_test.go @@ -43,15 +43,10 @@ func TestOffline_ArchiveOfflineMessage(t *testing.T) { hostsMock := &hostsMock{} hostsMock.IsLocalHostFunc = func(h string) bool { return h == "jackal.im" } - resManagerMock := &resourceManagerMock{} - resManagerMock.GetResourcesFunc = func(ctx context.Context, username string) ([]c2smodel.ResourceDesc, error) { - return nil, nil - } hk := hook.NewHooks() m := &Offline{ cfg: Config{QueueSize: 100}, hosts: hostsMock, - resMng: resManagerMock, rep: repMock, hk: hk, logger: kitlog.NewNopLogger(), @@ -70,7 +65,7 @@ func TestOffline_ArchiveOfflineMessage(t *testing.T) { _ = m.Start(context.Background()) defer func() { _ = m.Stop(context.Background()) }() - _, _ = hk.Run(context.Background(), hook.C2SStreamWillRouteElement, &hook.ExecutionContext{ + _, _ = hk.Run(context.Background(), hook.C2SStreamMessageRouted, &hook.ExecutionContext{ Info: &hook.C2SStreamInfo{ Element: msg, }, @@ -113,7 +108,6 @@ func TestOffline_ArchiveOfflineMessageQueueFull(t *testing.T) { cfg: Config{QueueSize: 100}, router: routerMock, hosts: hostsMock, - resMng: resManagerMock, rep: repMock, hk: hk, logger: kitlog.NewNopLogger(), @@ -132,7 +126,7 @@ func TestOffline_ArchiveOfflineMessageQueueFull(t *testing.T) { _ = m.Start(context.Background()) defer func() { _ = m.Stop(context.Background()) }() - halted, err := hk.Run(context.Background(), hook.C2SStreamWillRouteElement, &hook.ExecutionContext{ + halted, err := hk.Run(context.Background(), hook.C2SStreamMessageRouted, &hook.ExecutionContext{ Info: &hook.C2SStreamInfo{ Element: msg, }, @@ -152,11 +146,6 @@ func TestOffline_DeliverOfflineMessages(t *testing.T) { // given routerMock := &routerMock{} - output := bytes.NewBuffer(nil) - routerMock.RouteFunc = func(ctx context.Context, stanza stravaganza.Stanza) ([]jid.JID, error) { - _ = stanza.ToXML(output, true) - return nil, nil - } hostsMock := &hostsMock{} hostsMock.IsLocalHostFunc = func(h string) bool { return h == "jackal.im" } @@ -184,6 +173,19 @@ func TestOffline_DeliverOfflineMessages(t *testing.T) { return nil } + stmMock := &c2sStreamMock{} + stmMock.UsernameFunc = func() string { + return "ortuman" + } + + output := bytes.NewBuffer(nil) + stmMock.SendElementFunc = func(elem stravaganza.Element) <-chan error { + _ = elem.ToXML(output, true) + ch := make(chan error) + close(ch) + return ch + } + hk := hook.NewHooks() m := &Offline{ cfg: Config{QueueSize: 100}, @@ -206,6 +208,7 @@ func TestOffline_DeliverOfflineMessages(t *testing.T) { Info: &hook.C2SStreamInfo{ Element: pr, }, + Sender: stmMock, }) // then diff --git a/pkg/module/xep0198/stream.go b/pkg/module/xep0198/stream.go index fbea2b856..b9d9abd9b 100644 --- a/pkg/module/xep0198/stream.go +++ b/pkg/module/xep0198/stream.go @@ -25,20 +25,17 @@ import ( "sync" "time" - "github.com/ortuman/jackal/pkg/cluster/instance" - - clusterconnmanager "github.com/ortuman/jackal/pkg/cluster/connmanager" - - streamqueue "github.com/ortuman/jackal/pkg/module/xep0198/queue" - kitlog "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/jackal-xmpp/stravaganza" streamerror "github.com/jackal-xmpp/stravaganza/errors/stream" "github.com/jackal-xmpp/stravaganza/jid" + clusterconnmanager "github.com/ortuman/jackal/pkg/cluster/connmanager" + "github.com/ortuman/jackal/pkg/cluster/instance" "github.com/ortuman/jackal/pkg/cluster/resourcemanager" "github.com/ortuman/jackal/pkg/hook" "github.com/ortuman/jackal/pkg/host" + streamqueue "github.com/ortuman/jackal/pkg/module/xep0198/queue" xmppparser "github.com/ortuman/jackal/pkg/parser" "github.com/ortuman/jackal/pkg/router" "github.com/ortuman/jackal/pkg/router/stream" @@ -234,8 +231,11 @@ func (m *Stream) onDisconnect(_ context.Context, execCtx *hook.ExecutionContext) inf := execCtx.Info.(*hook.C2SStreamInfo) discErr := inf.DisconnectError - _, ok := discErr.(*streamerror.Error) - if ok || errors.Is(discErr, xmppparser.ErrStreamClosedByPeer) { + _, isStreamErr := discErr.(*streamerror.Error) + + shouldHibernate := inf.Presence.IsAvailable() && !isStreamErr && !errors.Is(discErr, xmppparser.ErrStreamClosedByPeer) + + if !shouldHibernate { return nil } // schedule stream termination diff --git a/pkg/module/xep0313/mam.go b/pkg/module/xep0313/mam.go new file mode 100644 index 000000000..65ce318b0 --- /dev/null +++ b/pkg/module/xep0313/mam.go @@ -0,0 +1,269 @@ +// Copyright 2022 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xep0313 + +import ( + "context" + + stanzaerror "github.com/jackal-xmpp/stravaganza/errors/stanza" + "github.com/ortuman/jackal/pkg/router" + + "google.golang.org/protobuf/types/known/timestamppb" + + kitlog "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/google/uuid" + "github.com/jackal-xmpp/stravaganza" + "github.com/ortuman/jackal/pkg/hook" + "github.com/ortuman/jackal/pkg/host" + archivemodel "github.com/ortuman/jackal/pkg/model/archive" + "github.com/ortuman/jackal/pkg/storage/repository" + xmpputil "github.com/ortuman/jackal/pkg/util/xmpp" +) + +const ( + mamNamespace = "urn:xmpp:mam:2" + extendedMamNamespace = "urn:xmpp:mam:2#extended" + + stanzaIDNamespace = "urn:xmpp:sid:0" +) + +const ( + // ModuleName represents mam module name. + ModuleName = "mam" + + // XEPNumber represents mam XEP number. + XEPNumber = "0313" +) + +// Config contains mam module configuration options. +type Config struct { + // QueueSize defines maximum number of archive messages stanzas. + // When the limit is reached, the oldest message will be purged to make room for the new one. + QueueSize int `fig:"queue_size" default:"1000"` +} + +// Mam represents a mam (XEP-0313) module type. +type Mam struct { + cfg Config + hosts *host.Hosts + router router.Router + hk *hook.Hooks + rep repository.Repository + logger kitlog.Logger +} + +// New returns a new initialized mam instance. +func New( + cfg Config, + router router.Router, + hosts *host.Hosts, + rep repository.Repository, + hk *hook.Hooks, + logger kitlog.Logger, +) *Mam { + return &Mam{ + cfg: cfg, + router: router, + hosts: hosts, + rep: rep, + hk: hk, + logger: kitlog.With(logger, "module", ModuleName, "xep", XEPNumber), + } +} + +// Name returns mam module name. +func (m *Mam) Name() string { return ModuleName } + +// StreamFeature returns mam module stream feature. +func (m *Mam) StreamFeature(_ context.Context, _ string) (stravaganza.Element, error) { + return nil, nil +} + +// ServerFeatures returns mam server disco features. +func (m *Mam) ServerFeatures(_ context.Context) ([]string, error) { + return nil, nil +} + +// AccountFeatures returns mam account disco features. +func (m *Mam) AccountFeatures(_ context.Context) ([]string, error) { + return []string{mamNamespace, extendedMamNamespace}, nil +} + +// Start starts stream module. +func (m *Mam) Start(_ context.Context) error { + m.hk.AddHook(hook.C2SStreamElementReceived, m.onMessageRecv, hook.HighestPriority) + m.hk.AddHook(hook.S2SInStreamElementReceived, m.onMessageRecv, hook.HighestPriority) + + m.hk.AddHook(hook.C2SStreamMessageRouted, m.onMessageRouted, hook.LowestPriority+1) + m.hk.AddHook(hook.S2SInStreamMessageRouted, m.onMessageRouted, hook.LowestPriority+1) + + m.hk.AddHook(hook.UserDeleted, m.onUserDeleted, hook.DefaultPriority) + + level.Info(m.logger).Log("msg", "started mam module") + return nil +} + +// Stop stops stream module. +func (m *Mam) Stop(_ context.Context) error { + m.hk.RemoveHook(hook.C2SStreamElementReceived, m.onMessageRecv) + m.hk.RemoveHook(hook.S2SInStreamElementReceived, m.onMessageRecv) + m.hk.RemoveHook(hook.C2SStreamMessageRouted, m.onMessageRouted) + m.hk.RemoveHook(hook.S2SInStreamMessageRouted, m.onMessageRouted) + m.hk.RemoveHook(hook.UserDeleted, m.onUserDeleted) + + level.Info(m.logger).Log("msg", "stopped mam module") + return nil +} + +// MatchesNamespace tells whether namespace matches blocklist module. +func (m *Mam) MatchesNamespace(namespace string, serverTarget bool) bool { + if serverTarget { + return false + } + return namespace == mamNamespace +} + +// ProcessIQ process a mam iq. +func (m *Mam) ProcessIQ(ctx context.Context, iq *stravaganza.IQ) error { + fromJID := iq.FromJID() + toJID := iq.ToJID() + if fromJID.Node() != toJID.Node() { + _, _ = m.router.Route(ctx, xmpputil.MakeErrorStanza(iq, stanzaerror.Forbidden)) + return nil + } + switch { + case iq.IsGet() && iq.ChildNamespace("metadata", mamNamespace) != nil: + return m.sendArchiveMetadata(ctx, iq) + } + return nil +} + +func (m *Mam) sendArchiveMetadata(ctx context.Context, iq *stravaganza.IQ) error { + metadata, err := m.rep.FetchArchiveMetadata(ctx, iq.FromJID().Node()) + if err != nil { + _, _ = m.router.Route(ctx, xmpputil.MakeErrorStanza(iq, stanzaerror.InternalServerError)) + return err + } + // send reply + metadataBuilder := stravaganza.NewBuilder("metadata").WithAttribute(stravaganza.Namespace, mamNamespace) + + startBuilder := stravaganza.NewBuilder("start") + if metadata != nil { + startBuilder.WithAttribute("id", metadata.StartId) + startBuilder.WithAttribute("timestamp", metadata.StartTimestamp) + } + endBuilder := stravaganza.NewBuilder("end") + if metadata != nil { + endBuilder.WithAttribute("id", metadata.EndId) + endBuilder.WithAttribute("timestamp", metadata.EndTimestamp) + } + + metadataBuilder.WithChildren(startBuilder.Build(), endBuilder.Build()) + + resIQ := xmpputil.MakeResultIQ(iq, metadataBuilder.Build()) + _, _ = m.router.Route(ctx, resIQ) + + return nil +} + +func (m *Mam) onMessageRecv(_ context.Context, execCtx *hook.ExecutionContext) error { + switch inf := execCtx.Info.(type) { + case *hook.C2SStreamInfo: + inf.Element = m.addMessageStanzaID(inf.Element) + case *hook.S2SStreamInfo: + inf.Element = m.addMessageStanzaID(inf.Element) + } + return nil +} + +func (m *Mam) onMessageRouted(ctx context.Context, execCtx *hook.ExecutionContext) error { + var elem stravaganza.Element + + switch inf := execCtx.Info.(type) { + case *hook.C2SStreamInfo: + elem = inf.Element + case *hook.S2SStreamInfo: + elem = inf.Element + } + return m.handleRoutedElement(ctx, elem) +} + +func (m *Mam) onUserDeleted(ctx context.Context, execCtx *hook.ExecutionContext) error { + inf := execCtx.Info.(*hook.UserInfo) + return m.rep.DeleteArchive(ctx, inf.Username) +} + +func (m *Mam) addMessageStanzaID(elem stravaganza.Element) stravaganza.Element { + msg, ok := elem.(*stravaganza.Message) + if !ok { + return elem + } + if !isMessageArchievable(msg) { + return elem + } + + if !m.hosts.IsLocalHost(msg.ToJID().Domain()) { + return elem + } + return xmpputil.MakeStanzaIDMessage(msg) +} + +func (m *Mam) handleRoutedElement(ctx context.Context, elem stravaganza.Element) error { + msg, ok := elem.(*stravaganza.Message) + if !ok { + return nil + } + if !isMessageArchievable(msg) { + return nil + } + + fromJID := msg.FromJID() + if m.hosts.IsLocalHost(fromJID.Domain()) { + if err := m.archiveMessage(ctx, msg, fromJID.Node(), uuid.New().String()); err != nil { + return err + } + } + toJID := msg.ToJID() + if !m.hosts.IsLocalHost(toJID.Domain()) { + return nil + } + stanzaID := msg.ChildNamespace("stanza-id", stanzaIDNamespace) + if stanzaID == nil { + return nil + } + return m.archiveMessage(ctx, msg, toJID.Node(), stanzaID.Attribute("id")) +} + +func (m *Mam) archiveMessage(ctx context.Context, message *stravaganza.Message, archiveID, id string) error { + return m.rep.InTransaction(ctx, func(ctx context.Context, tx repository.Transaction) error { + err := tx.InsertArchiveMessage(ctx, &archivemodel.Message{ + ArchiveId: archiveID, + Id: id, + FromJid: message.FromJID().String(), + ToJid: message.ToJID().String(), + Message: message.Proto(), + Stamp: timestamppb.Now(), + }) + if err != nil { + return err + } + return tx.DeleteArchiveOldestMessages(ctx, archiveID, m.cfg.QueueSize) + }) +} + +func isMessageArchievable(msg *stravaganza.Message) bool { + return (msg.IsNormal() || msg.IsChat()) && msg.IsMessageWithBody() +} diff --git a/pkg/module/xep0313/mam_test.go b/pkg/module/xep0313/mam_test.go new file mode 100644 index 000000000..323f2e91d --- /dev/null +++ b/pkg/module/xep0313/mam_test.go @@ -0,0 +1,15 @@ +// Copyright 2022 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xep0313 diff --git a/pkg/s2s/in.go b/pkg/s2s/in.go index eb38ca7d5..c7fe80b24 100644 --- a/pkg/s2s/in.go +++ b/pkg/s2s/in.go @@ -17,6 +17,7 @@ package s2s import ( "context" "crypto/tls" + "errors" "sync" "sync/atomic" "time" @@ -401,7 +402,7 @@ func (s *inS2S) processIQ(ctx context.Context, iq *stravaganza.IQ) error { if !ok { return nil } - _, err = s.router.Route(ctx, outIQ) + targets, err := s.router.Route(ctx, outIQ) switch err { case router.ErrResourceNotFound: return s.sendElement(ctx, stanzaerror.E(stanzaerror.ServiceUnavailable, iq).Element()) @@ -412,11 +413,12 @@ func (s *inS2S) processIQ(ctx context.Context, iq *stravaganza.IQ) error { case router.ErrRemoteServerTimeout: return s.sendElement(ctx, stanzaerror.E(stanzaerror.RemoteServerTimeout, iq).Element()) - case nil: + case nil, router.ErrUserNotAvailable: _, err = s.runHook(ctx, hook.S2SInStreamIQRouted, &hook.S2SStreamInfo{ ID: s.ID().String(), Sender: s.sender, Target: s.target, + Targets: targets, Element: iq, }) return err @@ -456,7 +458,7 @@ sendMsg: if !ok { return nil } - _, err = s.router.Route(ctx, outMsg) + targets, err := s.router.Route(ctx, outMsg) switch err { case router.ErrResourceNotFound: // treat the stanza as if it were addressed to @@ -475,19 +477,25 @@ sendMsg: case router.ErrRemoteServerTimeout: return s.sendElement(ctx, stanzaerror.E(stanzaerror.RemoteServerTimeout, message).Element()) - case router.ErrUserNotAvailable: - return s.sendElement(ctx, stanzaerror.E(stanzaerror.ServiceUnavailable, message).Element()) - - case nil: - _, err = s.runHook(ctx, hook.S2SInStreamMessageRouted, &hook.S2SStreamInfo{ + case nil, router.ErrUserNotAvailable: + halted, hErr := s.runHook(ctx, hook.S2SInStreamMessageRouted, &hook.S2SStreamInfo{ ID: s.ID().String(), Sender: s.sender, Target: s.target, + Targets: targets, Element: msg, }) + if halted { + return nil + } + if errors.Is(err, router.ErrUserNotAvailable) { + return s.sendElement(ctx, stanzaerror.E(stanzaerror.ServiceUnavailable, message).Element()) + } + return hErr + + default: return err } - return nil } func (s *inS2S) processPresence(ctx context.Context, presence *stravaganza.Presence) error { @@ -520,13 +528,14 @@ func (s *inS2S) processPresence(ctx context.Context, presence *stravaganza.Prese if !ok { return nil } - _, err = s.router.Route(ctx, outPr) + targets, err := s.router.Route(ctx, outPr) switch err { - case nil: + case nil, router.ErrUserNotAvailable: _, err := s.runHook(ctx, hook.S2SInStreamPresenceRouted, &hook.S2SStreamInfo{ ID: s.ID().String(), Sender: s.sender, Target: s.target, + Targets: targets, Element: presence, }) return err diff --git a/pkg/storage/boltdb/archive.go b/pkg/storage/boltdb/archive.go new file mode 100644 index 000000000..0421408d3 --- /dev/null +++ b/pkg/storage/boltdb/archive.go @@ -0,0 +1,144 @@ +// Copyright 2022 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package boltdb + +import ( + "context" + "fmt" + + "github.com/golang/protobuf/proto" + archivemodel "github.com/ortuman/jackal/pkg/model/archive" + bolt "go.etcd.io/bbolt" +) + +const archiveStampFormat = "2006-01-02T15:04:05Z" + +type boltDBArchiveRep struct { + tx *bolt.Tx +} + +func newArchiveRep(tx *bolt.Tx) *boltDBArchiveRep { + return &boltDBArchiveRep{tx: tx} +} + +func (r *boltDBArchiveRep) InsertArchiveMessage(_ context.Context, message *archivemodel.Message) error { + op := insertSeqOp{ + tx: r.tx, + bucket: archiveBucket(message.ArchiveId), + obj: message, + } + return op.do() +} + +func (r *boltDBArchiveRep) FetchArchiveMetadata(ctx context.Context, archiveID string) (metadata *archivemodel.Metadata, err error) { + bucketID := archiveBucket(archiveID) + + b := r.tx.Bucket([]byte(bucketID)) + if b == nil { + return nil, nil + } + var retVal archivemodel.Metadata + + c := b.Cursor() + _, val := c.First() + + var msg archivemodel.Message + if err := proto.Unmarshal(val, &msg); err != nil { + return nil, err + } + retVal.StartId = msg.Id + retVal.StartTimestamp = msg.Stamp.AsTime().UTC().Format(archiveStampFormat) + + _, val = c.Last() + if err := proto.Unmarshal(val, &msg); err != nil { + return nil, err + } + retVal.EndId = msg.Id + retVal.EndTimestamp = msg.Stamp.AsTime().UTC().Format(archiveStampFormat) + + return &retVal, nil +} + +func (r *boltDBArchiveRep) DeleteArchiveOldestMessages(_ context.Context, archiveID string, maxElements int) error { + bucketID := archiveBucket(archiveID) + + b := r.tx.Bucket([]byte(bucketID)) + if b == nil { + return nil + } + // count items + var count int + + c := b.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + count++ + } + if count < maxElements { + return nil + } + // store old value keys + var oldKeys [][]byte + + c = b.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + if count <= maxElements { + break + } + count-- + oldKeys = append(oldKeys, k) + } + // delete old values + for _, k := range oldKeys { + if err := b.Delete(k); err != nil { + return err + } + } + return nil +} + +func (r *boltDBArchiveRep) DeleteArchive(_ context.Context, archiveID string) error { + op := delBucketOp{ + tx: r.tx, + bucket: archiveBucket(archiveID), + } + return op.do() +} + +func archiveBucket(archiveID string) string { + return fmt.Sprintf("archive:%s", archiveID) +} + +// InsertArchiveMessage inserts a new message element into an archive queue. +func (r *Repository) InsertArchiveMessage(ctx context.Context, message *archivemodel.Message) error { + return r.db.Update(func(tx *bolt.Tx) error { + return newArchiveRep(tx).InsertArchiveMessage(ctx, message) + }) +} + +// FetchArchiveMetadata returns the metadata value associated to an archive. +func (r *Repository) FetchArchiveMetadata(ctx context.Context, archiveID string) (metadata *archivemodel.Metadata, err error) { + err = r.db.View(func(tx *bolt.Tx) error { + metadata, err = newArchiveRep(tx).FetchArchiveMetadata(ctx, archiveID) + return err + }) + return +} + +// DeleteArchive clears an archive queue. +func (r *Repository) DeleteArchive(ctx context.Context, archiveID string) error { + return r.db.Update(func(tx *bolt.Tx) error { + return newArchiveRep(tx).DeleteArchive(ctx, archiveID) + }) +} diff --git a/pkg/storage/boltdb/archive_test.go b/pkg/storage/boltdb/archive_test.go new file mode 100644 index 000000000..9cebd8f25 --- /dev/null +++ b/pkg/storage/boltdb/archive_test.go @@ -0,0 +1,182 @@ +// Copyright 2022 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package boltdb + +import ( + "context" + "testing" + "time" + + "google.golang.org/protobuf/types/known/timestamppb" + + archivemodel "github.com/ortuman/jackal/pkg/model/archive" + "github.com/stretchr/testify/require" + bolt "go.etcd.io/bbolt" +) + +func TestBoltDB_InsertArchiveMessage(t *testing.T) { + t.Parallel() + + db := setupDB(t) + t.Cleanup(func() { cleanUp(db) }) + + err := db.Update(func(tx *bolt.Tx) error { + rep := boltDBArchiveRep{tx: tx} + + m0 := testMessageStanza("message 0") + + err := rep.InsertArchiveMessage(context.Background(), &archivemodel.Message{ + ArchiveId: "a1234", + Message: m0.Proto(), + }) + require.NoError(t, err) + + return nil + }) + require.NoError(t, err) +} + +func TestBoltDB_FetchArchiveMetadata(t *testing.T) { + t.Parallel() + + db := setupDB(t) + t.Cleanup(func() { cleanUp(db) }) + + err := db.Update(func(tx *bolt.Tx) error { + rep := boltDBArchiveRep{tx: tx} + + m0 := testMessageStanza("message 0") + m1 := testMessageStanza("message 1") + m2 := testMessageStanza("message 2") + + now0 := time.Now() + now1 := now0.Add(time.Hour) + now2 := now1.Add(time.Hour) + + err := rep.InsertArchiveMessage(context.Background(), &archivemodel.Message{ + ArchiveId: "a1234", + Id: "id0", + Message: m0.Proto(), + Stamp: timestamppb.New(now0), + }) + require.NoError(t, err) + + err = rep.InsertArchiveMessage(context.Background(), &archivemodel.Message{ + ArchiveId: "a1234", + Id: "id1", + Message: m1.Proto(), + Stamp: timestamppb.New(now1), + }) + require.NoError(t, err) + + err = rep.InsertArchiveMessage(context.Background(), &archivemodel.Message{ + ArchiveId: "a1234", + Id: "id2", + Message: m2.Proto(), + Stamp: timestamppb.New(now2), + }) + require.NoError(t, err) + + metadata, err := rep.FetchArchiveMetadata(context.Background(), "a1234") + require.NoError(t, err) + + require.Equal(t, "id0", metadata.StartId) + require.Equal(t, now0.UTC().Format(archiveStampFormat), metadata.StartTimestamp) + require.Equal(t, "id2", metadata.EndId) + require.Equal(t, now2.UTC().Format(archiveStampFormat), metadata.EndTimestamp) + + return nil + }) + require.NoError(t, err) +} + +func TestBoltDB_DeleteArchiveOldestMessages(t *testing.T) { + t.Parallel() + + db := setupDB(t) + t.Cleanup(func() { cleanUp(db) }) + + err := db.Update(func(tx *bolt.Tx) error { + rep := boltDBArchiveRep{tx: tx} + + m0 := testMessageStanza("message 0") + m1 := testMessageStanza("message 1") + m2 := testMessageStanza("message 2") + + err := rep.InsertArchiveMessage(context.Background(), &archivemodel.Message{ + ArchiveId: "a1234", + Message: m0.Proto(), + }) + require.NoError(t, err) + + err = rep.InsertArchiveMessage(context.Background(), &archivemodel.Message{ + ArchiveId: "a1234", + Message: m1.Proto(), + }) + require.NoError(t, err) + + err = rep.InsertArchiveMessage(context.Background(), &archivemodel.Message{ + ArchiveId: "a1234", + Message: m2.Proto(), + }) + require.NoError(t, err) + + require.Equal(t, 3, countBucketElements(t, tx, archiveBucket("a1234"))) + + err = rep.DeleteArchiveOldestMessages(context.Background(), "a1234", 2) + require.NoError(t, err) + + require.Equal(t, 2, countBucketElements(t, tx, archiveBucket("a1234"))) + + return nil + }) + require.NoError(t, err) +} + +func TestBoltDB_DeleteArchive(t *testing.T) { + t.Parallel() + + db := setupDB(t) + t.Cleanup(func() { cleanUp(db) }) + + err := db.Update(func(tx *bolt.Tx) error { + rep := boltDBArchiveRep{tx: tx} + + m0 := testMessageStanza("message 0") + m1 := testMessageStanza("message 1") + + err := rep.InsertArchiveMessage(context.Background(), &archivemodel.Message{ + ArchiveId: "a1234", + Message: m0.Proto(), + }) + require.NoError(t, err) + + err = rep.InsertArchiveMessage(context.Background(), &archivemodel.Message{ + ArchiveId: "a1234", + Message: m1.Proto(), + }) + require.NoError(t, err) + + require.Equal(t, 2, countBucketElements(t, tx, archiveBucket("a1234"))) + + err = rep.DeleteArchive(context.Background(), "a1234") + require.NoError(t, err) + + require.Equal(t, 0, countBucketElements(t, tx, archiveBucket("a1234"))) + + return nil + }) + require.NoError(t, err) +} diff --git a/pkg/storage/boltdb/op.go b/pkg/storage/boltdb/op.go index f92bd3257..dac939cbf 100644 --- a/pkg/storage/boltdb/op.go +++ b/pkg/storage/boltdb/op.go @@ -60,8 +60,7 @@ func (op insertSeqOp) do() error { if err != nil { return err } - k := fmt.Sprintf("%d", seq) - return b.Put([]byte(k), p) + return b.Put([]byte(fmt.Sprintf("%d", seq)), p) } type delBucketOp struct { diff --git a/pkg/storage/boltdb/repository.go b/pkg/storage/boltdb/repository.go index aaa9faa58..dedf3dcaf 100644 --- a/pkg/storage/boltdb/repository.go +++ b/pkg/storage/boltdb/repository.go @@ -39,6 +39,7 @@ type Repository struct { repository.Private repository.Roster repository.VCard + repository.Archive repository.Locker cfg Config diff --git a/pkg/storage/boltdb/repository_test.go b/pkg/storage/boltdb/repository_test.go new file mode 100644 index 000000000..6b576966c --- /dev/null +++ b/pkg/storage/boltdb/repository_test.go @@ -0,0 +1,36 @@ +// Copyright 2022 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package boltdb + +import ( + "testing" + + bolt "go.etcd.io/bbolt" +) + +func countBucketElements(t *testing.T, tx *bolt.Tx, bucket string) int { + t.Helper() + + b := tx.Bucket([]byte(bucket)) + if b == nil { + return 0 + } + var count int + c := b.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + count++ + } + return count +} diff --git a/pkg/storage/boltdb/tx.go b/pkg/storage/boltdb/tx.go index 2d7cbc29e..8988ffeb1 100644 --- a/pkg/storage/boltdb/tx.go +++ b/pkg/storage/boltdb/tx.go @@ -28,6 +28,7 @@ type repTx struct { repository.Private repository.Roster repository.VCard + repository.Archive repository.Locker } @@ -41,6 +42,7 @@ func newRepTx(tx *bolt.Tx) *repTx { Private: newPrivateRep(tx), Roster: newRosterRep(tx), VCard: newVCardRep(tx), + Archive: newArchiveRep(tx), Locker: newLockerRep(), } } diff --git a/pkg/storage/cached/cached.go b/pkg/storage/cached/cached.go index 5a150b395..798fe99a4 100644 --- a/pkg/storage/cached/cached.go +++ b/pkg/storage/cached/cached.go @@ -68,6 +68,7 @@ type CachedRepository struct { repository.Private repository.Roster repository.VCard + repository.Archive repository.Locker rep repository.Repository @@ -91,6 +92,7 @@ func New(cfg Config, rep repository.Repository, logger kitlog.Logger) (repositor BlockList: &cachedBlockListRep{c: c, rep: rep, logger: logger}, Roster: &cachedRosterRep{c: c, rep: rep, logger: logger}, VCard: &cachedVCardRep{c: c, rep: rep, logger: logger}, + Archive: rep, Offline: rep, Locker: rep, rep: rep, diff --git a/pkg/storage/cached/tx.go b/pkg/storage/cached/tx.go index 5c2084a01..230af7c7d 100644 --- a/pkg/storage/cached/tx.go +++ b/pkg/storage/cached/tx.go @@ -27,6 +27,7 @@ type cachedTx struct { repository.Private repository.Roster repository.VCard + repository.Archive repository.Locker } @@ -39,6 +40,7 @@ func newCacheTx(c Cache, tx repository.Transaction) *cachedTx { BlockList: &cachedBlockListRep{c: c, rep: tx}, Roster: &cachedRosterRep{c: c, rep: tx}, VCard: &cachedVCardRep{c: c, rep: tx}, + Archive: tx, Offline: tx, Locker: tx, } diff --git a/pkg/storage/measured/archive.go b/pkg/storage/measured/archive.go new file mode 100644 index 000000000..9f513703a --- /dev/null +++ b/pkg/storage/measured/archive.go @@ -0,0 +1,56 @@ +// Copyright 2022 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package measuredrepository + +import ( + "context" + "time" + + archivemodel "github.com/ortuman/jackal/pkg/model/archive" + "github.com/ortuman/jackal/pkg/storage/repository" +) + +type measuredArchiveRep struct { + rep repository.Archive + inTx bool +} + +func (m *measuredArchiveRep) InsertArchiveMessage(ctx context.Context, message *archivemodel.Message) error { + t0 := time.Now() + err := m.rep.InsertArchiveMessage(ctx, message) + reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx) + return err +} + +func (m *measuredArchiveRep) FetchArchiveMetadata(ctx context.Context, archiveID string) (metadata *archivemodel.Metadata, err error) { + t0 := time.Now() + metadata, err = m.rep.FetchArchiveMetadata(ctx, archiveID) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) + return +} + +func (m *measuredArchiveRep) DeleteArchiveOldestMessages(ctx context.Context, archiveID string, maxElements int) error { + t0 := time.Now() + err := m.rep.DeleteArchiveOldestMessages(ctx, archiveID, maxElements) + reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx) + return err +} + +func (m *measuredArchiveRep) DeleteArchive(ctx context.Context, archiveID string) error { + t0 := time.Now() + err := m.rep.DeleteArchive(ctx, archiveID) + reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx) + return err +} diff --git a/pkg/storage/measured/archive_test.go b/pkg/storage/measured/archive_test.go new file mode 100644 index 000000000..a2f262aab --- /dev/null +++ b/pkg/storage/measured/archive_test.go @@ -0,0 +1,84 @@ +// Copyright 2022 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package measuredrepository + +import ( + "context" + "testing" + + archivemodel "github.com/ortuman/jackal/pkg/model/archive" + "github.com/stretchr/testify/require" +) + +func TestMeasuredArchiveRep_InsertArchiveMessage(t *testing.T) { + // given + repMock := &repositoryMock{} + repMock.InsertArchiveMessageFunc = func(ctx context.Context, message *archivemodel.Message) error { + return nil + } + m := &measuredArchiveRep{rep: repMock} + + // when + _ = m.InsertArchiveMessage(context.Background(), &archivemodel.Message{ArchiveId: "a1234"}) + + // then + require.Len(t, repMock.InsertArchiveMessageCalls(), 1) +} + +func TestMeasuredArchiveRep_FetchArchiveMetadata(t *testing.T) { + // given + repMock := &repositoryMock{} + repMock.FetchArchiveMetadataFunc = func(ctx context.Context, archiveID string) (*archivemodel.Metadata, error) { + return nil, nil + } + m := &measuredArchiveRep{rep: repMock} + + // when + _, _ = m.FetchArchiveMetadata(context.Background(), "a1234") + + // then + require.Len(t, repMock.FetchArchiveMetadataCalls(), 1) +} + +func TestMeasuredArchiveRep_DeleteArchiveOldestMessages(t *testing.T) { + // given + repMock := &repositoryMock{} + repMock.DeleteArchiveOldestMessagesFunc = func(ctx context.Context, archiveID string, maxElements int) error { + return nil + } + m := &measuredArchiveRep{rep: repMock} + + // when + err := m.DeleteArchiveOldestMessages(context.Background(), "a1234", 10) + + // then + require.Len(t, repMock.DeleteArchiveOldestMessagesCalls(), 1) + require.NoError(t, err) +} + +func TestMeasuredArchiveRep_DeleteArchive(t *testing.T) { + // given + repMock := &repositoryMock{} + repMock.DeleteArchiveFunc = func(ctx context.Context, archiveId string) error { + return nil + } + m := &measuredArchiveRep{rep: repMock} + + // when + _ = m.DeleteArchive(context.Background(), "a1234") + + // then + require.Len(t, repMock.DeleteArchiveCalls(), 1) +} diff --git a/pkg/storage/measured/measured.go b/pkg/storage/measured/measured.go index 01a7b3d1b..914613f24 100644 --- a/pkg/storage/measured/measured.go +++ b/pkg/storage/measured/measured.go @@ -40,6 +40,7 @@ type Measured struct { measuredPrivateRep measuredRosterRep measuredVCardRep + measuredArchiveRep measuredLocker rep repository.Repository } @@ -55,6 +56,7 @@ func New(rep repository.Repository) repository.Repository { measuredPrivateRep: measuredPrivateRep{rep: rep}, measuredRosterRep: measuredRosterRep{rep: rep}, measuredVCardRep: measuredVCardRep{rep: rep}, + measuredArchiveRep: measuredArchiveRep{rep: rep}, measuredLocker: measuredLocker{rep: rep}, rep: rep, } diff --git a/pkg/storage/measured/tx.go b/pkg/storage/measured/tx.go index 992b0379d..316287f56 100644 --- a/pkg/storage/measured/tx.go +++ b/pkg/storage/measured/tx.go @@ -25,6 +25,7 @@ type measuredTx struct { repository.Private repository.Roster repository.VCard + repository.Archive repository.Locker } @@ -38,6 +39,7 @@ func newMeasuredTx(tx repository.Transaction) *measuredTx { Private: &measuredPrivateRep{rep: tx, inTx: true}, Roster: &measuredRosterRep{rep: tx, inTx: true}, VCard: &measuredVCardRep{rep: tx, inTx: true}, + Archive: &measuredArchiveRep{rep: tx, inTx: true}, Locker: &measuredLocker{rep: tx, inTx: true}, } } diff --git a/pkg/storage/pgsql/archive.go b/pkg/storage/pgsql/archive.go new file mode 100644 index 000000000..3328670ab --- /dev/null +++ b/pkg/storage/pgsql/archive.go @@ -0,0 +1,114 @@ +// Copyright 2022 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pgsqlrepository + +import ( + "context" + "database/sql" + "time" + + sq "github.com/Masterminds/squirrel" + kitlog "github.com/go-kit/log" + "github.com/jackal-xmpp/stravaganza/jid" + archivemodel "github.com/ortuman/jackal/pkg/model/archive" +) + +const ( + archiveTableName = "archives" + + archiveStampFormat = "2006-01-02T15:04:05Z" +) + +type pgSQLArchiveRep struct { + conn conn + logger kitlog.Logger +} + +func (r *pgSQLArchiveRep) InsertArchiveMessage(ctx context.Context, message *archivemodel.Message) error { + b, err := message.MarshalBinary() + if err != nil { + return err + } + fromJID, _ := jid.NewWithString(message.FromJid, true) + toJID, _ := jid.NewWithString(message.ToJid, true) + + q := sq.Insert(archiveTableName). + Prefix(noLoadBalancePrefix). + Columns("archive_id", "id", `"from"`, "from_bare", `"to"`, "to_bare", "message"). + Values( + message.ArchiveId, + message.Id, + fromJID.String(), + fromJID.ToBareJID().String(), + toJID.String(), + toJID.ToBareJID().String(), + b, + ) + + _, err = q.RunWith(r.conn).ExecContext(ctx) + return err +} + +func (r *pgSQLArchiveRep) FetchArchiveMetadata(ctx context.Context, archiveID string) (*archivemodel.Metadata, error) { + fromExpr := `FROM ` + fromExpr += `(SELECT "id", created_at FROM archives WHERE serial = (SELECT MIN(serial) FROM archives WHERE archive_id = $1)) AS min,` + fromExpr += `(SELECT "id", created_at FROM archives WHERE serial = (SELECT MAX(serial) FROM archives WHERE archive_id = $1)) AS max` + + q := sq.Select("min.id, min.created_at, max.id, max.created_at").Suffix(fromExpr, archiveID) + + var start, end time.Time + var metadata archivemodel.Metadata + + err := q.RunWith(r.conn). + QueryRowContext(ctx). + Scan( + &metadata.StartId, + &start, + &metadata.EndId, + &end, + ) + + switch err { + case nil: + metadata.StartTimestamp = start.UTC().Format(archiveStampFormat) + metadata.EndTimestamp = end.UTC().Format(archiveStampFormat) + return &metadata, nil + + case sql.ErrNoRows: + return nil, nil + + default: + return nil, err + } +} + +func (r *pgSQLArchiveRep) DeleteArchiveOldestMessages(ctx context.Context, archiveID string, maxElements int) error { + q := sq.Delete(archiveTableName). + Prefix(noLoadBalancePrefix). + Where(sq.And{ + sq.Eq{"archive_id": archiveID}, + sq.Expr(`"id" NOT IN (SELECT "id" FROM archives WHERE archive_id = $2 ORDER BY created_at DESC LIMIT $3 OFFSET 0)`, archiveID, maxElements), + }) + _, err := q.RunWith(r.conn).ExecContext(ctx) + return err +} + +func (r *pgSQLArchiveRep) DeleteArchive(ctx context.Context, archiveID string) error { + q := sq.Delete(archiveTableName). + Prefix(noLoadBalancePrefix). + Where(sq.Eq{"archive_id": archiveID}) + _, err := q.RunWith(r.conn).ExecContext(ctx) + return err +} diff --git a/pkg/storage/pgsql/archive_test.go b/pkg/storage/pgsql/archive_test.go new file mode 100644 index 000000000..dfca2a786 --- /dev/null +++ b/pkg/storage/pgsql/archive_test.go @@ -0,0 +1,118 @@ +// Copyright 2022 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pgsqlrepository + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/jackal-xmpp/stravaganza" + archivemodel "github.com/ortuman/jackal/pkg/model/archive" + "github.com/stretchr/testify/require" +) + +func TestPgSQLArchive_InsertArchiveMessage(t *testing.T) { + // given + b := stravaganza.NewMessageBuilder() + b.WithAttribute("from", "noelia@jackal.im/yard") + b.WithAttribute("to", "ortuman@jackal.im/balcony") + b.WithChild( + stravaganza.NewBuilder("body"). + WithText("I'll give thee a wind."). + Build(), + ) + msg, _ := b.BuildMessage() + + aMsg := &archivemodel.Message{ + ArchiveId: "ortuman", + Id: "id1234", + FromJid: "ortuman@jackal.im/local", + ToJid: "ortuman@jabber.org/remote", + Message: msg.Proto(), + } + msgBytes, _ := aMsg.MarshalBinary() + + s, mock := newArchiveMock() + mock.ExpectExec(`INSERT INTO archives \(archive_id,id,"from",from_bare,"to",to_bare,message\) VALUES \(\$1,\$2,\$3,\$4,\$5,\$6,\$7\)`). + WithArgs("ortuman", "id1234", "ortuman@jackal.im/local", "ortuman@jackal.im", "ortuman@jabber.org/remote", "ortuman@jabber.org", msgBytes). + WillReturnResult(sqlmock.NewResult(1, 1)) + + // when + err := s.InsertArchiveMessage(context.Background(), aMsg) + + // then + require.Nil(t, err) + require.Nil(t, mock.ExpectationsWereMet()) +} + +func TestPgSQLArchive_FetchArchiveMetadata(t *testing.T) { + // given + s, mock := newArchiveMock() + mock.ExpectQuery(`SELECT min.id, min.created_at, max.id, max.created_at FROM \(SELECT "id", created_at FROM archives WHERE serial = \(SELECT MIN\(serial\) FROM archives WHERE archive_id = \$1\)\) AS min,\(SELECT "id", created_at FROM archives WHERE serial = \(SELECT MAX\(serial\) FROM archives WHERE archive_id = \$1\)\) AS max`). + WithArgs("ortuman"). + WillReturnRows( + sqlmock.NewRows([]string{"min.id", "min.created_at", "max.id", "max.created_at"}).AddRow("YWxwaGEg", "2008-08-22T21:09:04Z", "b21lZ2Eg", "2020-04-20T14:34:21Z"), + ) + + // when + metadata, err := s.FetchArchiveMetadata(context.Background(), "ortuman") + + // then + require.Nil(t, err) + require.NotNil(t, metadata) + + require.Equal(t, "YWxwaGEg", metadata.StartId) + require.Equal(t, "2008-08-22T21:09:04Z", metadata.StartTimestamp) + require.Equal(t, "b21lZ2Eg", metadata.EndId) + require.Equal(t, "2020-04-20T14:34:21Z", metadata.EndTimestamp) + + require.Nil(t, mock.ExpectationsWereMet()) +} + +func TestPgSQLArchive_DeleteArchiveOldestMessages(t *testing.T) { + // given + s, mock := newArchiveMock() + mock.ExpectExec(`DELETE FROM archives WHERE \(archive_id = \$1 AND "id" NOT IN \(SELECT "id" FROM archives WHERE archive_id = \$2 ORDER BY created_at DESC LIMIT \$3 OFFSET 0\)\)`). + WithArgs("ortuman", "ortuman", 1234). + WillReturnResult(sqlmock.NewResult(1, 1)) + + // when + err := s.DeleteArchiveOldestMessages(context.Background(), "ortuman", 1234) + + // then + require.Nil(t, err) + require.Nil(t, mock.ExpectationsWereMet()) +} + +func TestPgSQLArchive_DeleteArchive(t *testing.T) { + // given + s, mock := newArchiveMock() + mock.ExpectExec(`DELETE FROM archives WHERE archive_id = \$1`). + WithArgs("ortuman"). + WillReturnResult(sqlmock.NewResult(1, 1)) + + // when + err := s.DeleteArchive(context.Background(), "ortuman") + + // then + require.Nil(t, err) + require.Nil(t, mock.ExpectationsWereMet()) +} + +func newArchiveMock() (*pgSQLArchiveRep, sqlmock.Sqlmock) { + s, sqlMock := newPgSQLMock() + return &pgSQLArchiveRep{conn: s}, sqlMock +} diff --git a/pkg/storage/pgsql/repository.go b/pkg/storage/pgsql/repository.go index 3a0acb9f9..32a665e83 100644 --- a/pkg/storage/pgsql/repository.go +++ b/pkg/storage/pgsql/repository.go @@ -57,6 +57,7 @@ type Repository struct { repository.Private repository.Roster repository.VCard + repository.Archive repository.Locker host string @@ -120,6 +121,7 @@ func (r *Repository) Start(ctx context.Context) error { r.Private = &pgSQLPrivateRep{conn: db, logger: r.logger} r.Roster = &pgSQLRosterRep{conn: db, logger: r.logger} r.VCard = &pgSQLVCardRep{conn: db, logger: r.logger} + r.Archive = &pgSQLArchiveRep{conn: db, logger: r.logger} r.Locker = &pgSQLLocker{conn: db} return nil } diff --git a/pkg/storage/pgsql/tx.go b/pkg/storage/pgsql/tx.go index 99d6be102..18ccde9a5 100644 --- a/pkg/storage/pgsql/tx.go +++ b/pkg/storage/pgsql/tx.go @@ -29,6 +29,7 @@ type repTx struct { repository.Private repository.Roster repository.VCard + repository.Archive repository.Locker } @@ -42,6 +43,7 @@ func newRepTx(tx *sql.Tx) *repTx { Private: &pgSQLPrivateRep{conn: tx}, Roster: &pgSQLRosterRep{conn: tx}, VCard: &pgSQLVCardRep{conn: tx}, + Archive: &pgSQLArchiveRep{conn: tx}, Locker: &pgSQLLocker{conn: tx}, } } diff --git a/pkg/storage/repository/archive.go b/pkg/storage/repository/archive.go new file mode 100644 index 000000000..6f85c7bd7 --- /dev/null +++ b/pkg/storage/repository/archive.go @@ -0,0 +1,36 @@ +// Copyright 2022 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package repository + +import ( + "context" + + archivemodel "github.com/ortuman/jackal/pkg/model/archive" +) + +// Archive defines storage operations for message archive +type Archive interface { + // InsertArchiveMessage inserts a new message element into an archive queue. + InsertArchiveMessage(ctx context.Context, message *archivemodel.Message) error + + // FetchArchiveMetadata returns the metadata value associated to an archive. + FetchArchiveMetadata(ctx context.Context, archiveID string) (*archivemodel.Metadata, error) + + // DeleteArchiveOldestMessages trims archive oldest messages up to a maxElements total count. + DeleteArchiveOldestMessages(ctx context.Context, archiveID string, maxElements int) error + + // DeleteArchive clears an archive queue. + DeleteArchive(ctx context.Context, archiveID string) error +} diff --git a/pkg/storage/repository/repository.go b/pkg/storage/repository/repository.go index 51b9c913b..e5a6fe100 100644 --- a/pkg/storage/repository/repository.go +++ b/pkg/storage/repository/repository.go @@ -38,6 +38,7 @@ type Transaction interface { } type baseRepository interface { + Archive User Last Capabilities diff --git a/pkg/util/xmpp/xmpp.go b/pkg/util/xmpp/xmpp.go index 8aca645d3..bde89ecf1 100644 --- a/pkg/util/xmpp/xmpp.go +++ b/pkg/util/xmpp/xmpp.go @@ -17,6 +17,8 @@ package xmpputil import ( "time" + "github.com/google/uuid" + "github.com/jackal-xmpp/stravaganza" stanzaerror "github.com/jackal-xmpp/stravaganza/errors/stanza" "github.com/jackal-xmpp/stravaganza/jid" @@ -64,3 +66,17 @@ func MakeDelayMessage(stanza stravaganza.Stanza, stamp time.Time, from, text str dMsg, _ := sb.BuildMessage() return dMsg } + +// MakeStanzaIDMessage creates a new message adding stanza-id element. +func MakeStanzaIDMessage(message *stravaganza.Message) *stravaganza.Message { + msg, _ := stravaganza.NewBuilderFromElement(message). + WithChild( + stravaganza.NewBuilder("stanza-id"). + WithAttribute(stravaganza.Namespace, "urn:xmpp:sid:0"). + WithAttribute("by", message.ToJID().ToBareJID().String()). + WithAttribute("id", uuid.New().String()). + Build(), + ). + BuildMessage() + return msg +} diff --git a/proto/model/v1/archive.proto b/proto/model/v1/archive.proto new file mode 100644 index 000000000..faa2ada21 --- /dev/null +++ b/proto/model/v1/archive.proto @@ -0,0 +1,64 @@ +// Copyright 2022 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax="proto3"; + +import "google/protobuf/timestamp.proto"; + +import "github.com/jackal-xmpp/stravaganza/stravaganza.proto"; + +package model.archive.v1; + +option go_package = "pkg/model/archive/;archivemodel"; + +// Message represents an archive message entity. +message Message { + // archived_id is the message archive identifier. + string archive_id = 1; + + // id is the message archive unique identifier. + string id = 2; + + // from_jid is the message from jid value. + string from_jid = 3; + + // to_jid is the message from jid value. + string to_jid = 4; + + // message is the archived message. + stravaganza.PBElement message = 5; + + // stamp is the timestamp in which the message was archived. + google.protobuf.Timestamp stamp = 9; +} + +// Messages represents a set of archive messages. +message Messages { + repeated Message archive_messages = 1; +} + +// Metadata represents an archive metadata information. +message Metadata { + // start_timestamp is the identifier of the first archive message. + string start_id = 1; + + // start_timestamp is the timestamp value of the first archive message. + string start_timestamp = 2; + + // end_id is the identifier of the last archive message. + string end_id = 3; + + // end_timestamp is the timestamp value of the last archive message. + string end_timestamp = 4; +} diff --git a/scripts/genproto.sh b/scripts/genproto.sh index bf7be5bfd..dca80d4d0 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -15,6 +15,7 @@ FILES=( "admin/v1/users.proto" "c2s/v1/resourceinfo.proto" "cluster/v1/cluster.proto" + "model/v1/archive.proto" "model/v1/user.proto" "model/v1/last.proto" "model/v1/blocklist.proto" diff --git a/sql/postgres.up.psql b/sql/postgres.up.psql index 3be5cf8da..39d0ad1c1 100644 --- a/sql/postgres.up.psql +++ b/sql/postgres.up.psql @@ -170,3 +170,25 @@ CREATE TABLE IF NOT EXISTS vcards ( ); SELECT enable_updated_at('vcards'); + +-- archives + +CREATE TABLE IF NOT EXISTS archives ( + serial SERIAL PRIMARY KEY, + archive_id VARCHAR(1023), + id VARCHAR(255) NOT NULL, + "from" TEXT NOT NULL, + from_bare TEXT NOT NULL, + "to" TEXT NOT NULL, + to_bare TEXT NOT NULL, + message BYTEA NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS i_archives_archive_id ON archives(archive_id); +CREATE INDEX IF NOT EXISTS i_archives_id ON archives(id); +CREATE INDEX IF NOT EXISTS i_archives_to ON archives("to"); +CREATE INDEX IF NOT EXISTS i_archives_to_bare ON archives(to_bare); +CREATE INDEX IF NOT EXISTS i_archives_from ON archives("from"); +CREATE INDEX IF NOT EXISTS i_archives_from_bare ON archives(from_bare); +CREATE INDEX IF NOT EXISTS i_archives_created_at ON archives(created_at);