Skip to content

Commit

Permalink
Merge pull request #4039 from kobergj/RestartPostprocessing
Browse files Browse the repository at this point in the history
Allow to restart postprocessing
  • Loading branch information
kobergj authored Jul 6, 2023
2 parents 838071e + a2c5dad commit 3626d94
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 3 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/restart-postprocessing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Restart Postprocessing

Resend the `BytesReady` event if instructed.

https://github.com/cs3org/reva/pull/4039
13 changes: 13 additions & 0 deletions pkg/events/postprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,16 @@ func (ResumePostprocessing) Unmarshal(v []byte) (interface{}, error) {
err := json.Unmarshal(v, &e)
return e, err
}

// RestartPostprocessing will be emitted by postprocessing service if it doesn't know about an upload
type RestartPostprocessing struct {
UploadID string
Timestamp *types.Timestamp
}

// Unmarshal to fulfill umarshaller interface
func (RestartPostprocessing) Unmarshal(v []byte) (interface{}, error) {
e := RestartPostprocessing{}
err := json.Unmarshal(v, &e)
return e, err
}
42 changes: 39 additions & 3 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"strings"
"time"

user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
Expand Down Expand Up @@ -65,7 +66,15 @@ import (
"golang.org/x/sync/errgroup"
)

var tracer trace.Tracer
var (
tracer trace.Tracer

_registeredEvents = []events.Unmarshaller{
events.PostprocessingFinished{},
events.PostprocessingStepFinished{},
events.RestartPostprocessing{},
}
)

func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs")
Expand Down Expand Up @@ -206,7 +215,7 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event
return nil, errors.New("need nats for async file processing")
}

ch, err := events.Consume(fs.stream, "dcfs", events.PostprocessingFinished{}, events.PostprocessingStepFinished{})
ch, err := events.Consume(fs.stream, "dcfs", _registeredEvents...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -304,7 +313,34 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event")
}

case events.RestartPostprocessing:
up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload")
continue
}
n, err := node.ReadNode(ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false, nil, true)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node")
continue
}
s, err := up.URL(up.Ctx)
if err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url")
continue
}
// restart postprocessing
if err := events.Publish(fs.stream, events.BytesReceived{
UploadID: up.Info.ID,
URL: s,
SpaceOwner: n.SpaceOwnerOrManager(up.Ctx),
ExecutingUser: &user.User{Id: &user.UserId{OpaqueId: "postprocessing-restart"}}, // send nil instead?
ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID},
Filename: up.Info.Storage["NodeName"],
Filesize: uint64(up.Info.Size),
}); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish BytesReceived event")
}
case events.PostprocessingStepFinished:
if ev.FinishedStep != events.PPStepAntivirus {
// atm we are only interested in antivirus results
Expand Down

0 comments on commit 3626d94

Please sign in to comment.