Skip to content

Commit

Permalink
set scan data after scan is finished
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed Mar 17, 2023
1 parent eb1b232 commit 2e7e415
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 32 deletions.
35 changes: 12 additions & 23 deletions pkg/events/postprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type (
var (
// PPStepAntivirus is the step that scans for viruses
PPStepAntivirus Postprocessingstep = "virusscan"
// PPStepFTS is the step that indexes files for full text search
PPStepFTS Postprocessingstep = "fts"
// PPStepPolicies is the step the step that enforces policies
PPStepPolicies Postprocessingstep = "policies"
// PPStepDelay is the step that processing. Useful for testing or user annoyment
PPStepDelay Postprocessingstep = "delay"

Expand Down Expand Up @@ -68,26 +68,6 @@ func (BytesReceived) Unmarshal(v []byte) (interface{}, error) {
return e, err
}

// VirusscanFinished is emitted by the server when it has completed an antivirus scan
type VirusscanFinished struct {
Infected bool
Outcome PostprocessingOutcome
UploadID string
Filename string
ExecutingUser *user.User
Description string
Scandate time.Time
ResourceID *provider.ResourceId
ErrorMsg string // empty when no error
}

// Unmarshal to fulfill umarshaller interface
func (VirusscanFinished) Unmarshal(v []byte) (interface{}, error) {
e := VirusscanFinished{}
err := json.Unmarshal(v, &e)
return e, err
}

// StartPostprocessingStep can be issued by the server to start a postprocessing step
type StartPostprocessingStep struct {
UploadID string
Expand Down Expand Up @@ -116,7 +96,7 @@ type PostprocessingStepFinished struct {
Filename string

FinishedStep Postprocessingstep // name of the step
Result interface{} // result information
Result interface{} // result information see VirusscanResult for example
Error error // possible error of the step
Outcome PostprocessingOutcome // some services may cause postprocessing to stop
}
Expand All @@ -128,6 +108,15 @@ func (PostprocessingStepFinished) Unmarshal(v []byte) (interface{}, error) {
return e, err
}

// VirusscanResult is the Result of a PostprocessingStepFinished event from the antivirus
type VirusscanResult struct {
Infected bool
Description string
Scandate time.Time
ResourceID *provider.ResourceId
ErrorMsg string // empty when no error
}

// PostprocessingFinished is emitted by *some* service which can decide that
type PostprocessingFinished struct {
UploadID string
Expand Down
4 changes: 3 additions & 1 deletion pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,10 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
case "DELETE":
handler.DelFile(w, r)
case "GET":
// NOTE: this is breaking change - allthought it does not seem to be used
// We can make a switch here depending on some header value if that is needed
// download.GetOrHeadFile(w, r, fs, "")
handler.GetFile(w, r)
//download.GetOrHeadFile(w, r, fs, "")
default:
w.WriteHeader(http.StatusNotImplemented)
}
Expand Down
22 changes: 14 additions & 8 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event
return nil, errors.New("need nats for async file processing")
}

ch, err := events.Consume(fs.stream, "dcfs", events.PostprocessingFinished{}, events.VirusscanFinished{})
ch, err := events.Consume(fs.stream, "dcfs", events.PostprocessingFinished{}, events.PostprocessingStepFinished{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -266,9 +266,14 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event")
}

/* LETS KEEP THIS COMMENTED UNTIL VIRUSSCANNING IS BACKMERGED
case events.VirusscanFinished:
if ev.ErrorMsg != "" {
case events.PostprocessingStepFinished:
if ev.FinishedStep != events.PPStepAntivirus {
// atm we are only interested in antivirus results
continue
}

res := ev.Result.(events.VirusscanResult)
if res.ErrorMsg != "" {
// scan failed somehow
// Should we handle this here?
continue
Expand All @@ -278,6 +283,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
switch ev.UploadID {
case "":
// uploadid is empty -> this was an on-demand scan
/* ON DEMAND SCANNING NOT SUPPORTED ATM
ctx := ctxpkg.ContextSetUser(context.Background(), ev.ExecutingUser)
ref := &provider.Reference{ResourceId: ev.ResourceID}
Expand Down Expand Up @@ -352,6 +358,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
continue
}
*/
default:
// uploadid is not empty -> this is an async upload
up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
Expand All @@ -360,7 +367,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
continue
}

no, err := node.ReadNode(up.Ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false)
no, err := node.ReadNode(up.Ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false, nil, false)
if err != nil {
log.Error().Err(err).Interface("uploadID", ev.UploadID).Msg("Failed to get node after scan")
continue
Expand All @@ -369,14 +376,13 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
n = no
}

if err := n.SetScanData(ev.Description, ev.Scandate); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("Failed to set scan results")
if err := n.SetScanData(res.Description, res.Scandate); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", res.ResourceID).Msg("Failed to set scan results")
continue
}

// remove cache entry in gateway
fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
*/
default:
log.Error().Interface("event", ev).Msg("Unknown event")
}
Expand Down

0 comments on commit 2e7e415

Please sign in to comment.