diff --git a/changelog/unreleased/restart-postprocessing.md b/changelog/unreleased/restart-postprocessing.md new file mode 100644 index 0000000000..85dcb268fe --- /dev/null +++ b/changelog/unreleased/restart-postprocessing.md @@ -0,0 +1,5 @@ +Bugfix: Restart Postprocessing + +Resend the `BytesReady` event if instructed. + +https://github.com/cs3org/reva/pull/4039 diff --git a/pkg/events/postprocessing.go b/pkg/events/postprocessing.go index 0b01050112..c6ab698321 100644 --- a/pkg/events/postprocessing.go +++ b/pkg/events/postprocessing.go @@ -184,3 +184,16 @@ func (ResumePostprocessing) Unmarshal(v []byte) (interface{}, error) { err := json.Unmarshal(v, &e) return e, err } + +// RestartPostprocessing will be emitted by postprocessing service if it doesn't know about an upload +type RestartPostprocessing struct { + UploadID string + Timestamp *types.Timestamp +} + +// Unmarshal to fulfill umarshaller interface +func (RestartPostprocessing) Unmarshal(v []byte) (interface{}, error) { + e := RestartPostprocessing{} + err := json.Unmarshal(v, &e) + return e, err +} diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index f1b82da867..2340aa4a0c 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -34,6 +34,7 @@ import ( "strings" "time" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" @@ -65,7 +66,15 @@ import ( "golang.org/x/sync/errgroup" ) -var tracer trace.Tracer +var ( + tracer trace.Tracer + + _registeredEvents = []events.Unmarshaller{ + events.PostprocessingFinished{}, + events.PostprocessingStepFinished{}, + events.RestartPostprocessing{}, + } +) func init() { tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs") @@ -206,7 +215,7 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event return nil, errors.New("need nats for async file processing") } - ch, err := events.Consume(fs.stream, "dcfs", events.PostprocessingFinished{}, events.PostprocessingStepFinished{}) + ch, err := events.Consume(fs.stream, "dcfs", _registeredEvents...) if err != nil { return nil, err } @@ -304,7 +313,34 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { ); err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") } - + case events.RestartPostprocessing: + up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + continue + } + n, err := node.ReadNode(ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false, nil, true) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node") + continue + } + s, err := up.URL(up.Ctx) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url") + continue + } + // restart postprocessing + if err := events.Publish(fs.stream, events.BytesReceived{ + UploadID: up.Info.ID, + URL: s, + SpaceOwner: n.SpaceOwnerOrManager(up.Ctx), + ExecutingUser: &user.User{Id: &user.UserId{OpaqueId: "postprocessing-restart"}}, // send nil instead? + ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}, + Filename: up.Info.Storage["NodeName"], + Filesize: uint64(up.Info.Size), + }); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish BytesReceived event") + } case events.PostprocessingStepFinished: if ev.FinishedStep != events.PPStepAntivirus { // atm we are only interested in antivirus results