Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Antivirus #3730

Merged
merged 3 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/antivirus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Antivirus

Support antivirus functionality (needs seperate antivirus service)

https://github.com/cs3org/reva/pull/3730
109 changes: 109 additions & 0 deletions pkg/bytesize/bytesize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2018-2022 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

// Package bytesize provides easy conversions from human readable strings (eg. 10MB) to bytes
package bytesize

import (
"fmt"
"strconv"
"strings"
)

// ByteSize is the size in bytes
type ByteSize uint64

// List of available byte sizes
// NOTE: max is exabyte as we convert to uint64
const (
KB ByteSize = 1000
MB ByteSize = 1000 * KB
GB ByteSize = 1000 * MB
TB ByteSize = 1000 * GB
PB ByteSize = 1000 * TB
EB ByteSize = 1000 * PB

KiB ByteSize = 1024
MiB ByteSize = 1024 * KiB
GiB ByteSize = 1024 * MiB
TiB ByteSize = 1024 * GiB
PiB ByteSize = 1024 * TiB
EiB ByteSize = 1024 * PiB
)

// Parse parses a Bytesize from a string
func Parse(s string) (ByteSize, error) {
sanitized := strings.TrimSpace(s)
if !strings.HasSuffix(sanitized, "B") {
u, err := strconv.Atoi(sanitized)
return ByteSize(u), err
}

var (
value int
unit string
)

template := "%d%s"
_, err := fmt.Sscanf(sanitized, template, &value, &unit)
if err != nil {
return 0, err
}

bytes := ByteSize(value)
switch unit {
case "KB":
bytes *= KB
case "MB":
bytes *= MB
case "GB":
bytes *= GB
case "TB":
bytes *= TB
case "PB":
bytes *= PB
case "EB":
bytes *= EB
case "KiB":
bytes *= KiB
case "MiB":
bytes *= MiB
case "GiB":
bytes *= GiB
case "TiB":
bytes *= TiB
case "PiB":
bytes *= PiB
case "EiB":
bytes *= EiB
default:
return 0, fmt.Errorf("unknown unit '%s'. Use common abbreviations such as KB, MiB, GB", unit)
}

return bytes, nil
}

// Bytes converts the ByteSize to an uint64
func (b ByteSize) Bytes() uint64 {
return uint64(b)
}

// String converts the ByteSize to a string
func (b ByteSize) String() string {
return strconv.FormatUint(uint64(b), 10)
}
124 changes: 124 additions & 0 deletions pkg/bytesize/bytesize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2018-2022 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package bytesize_test

import (
"fmt"
"testing"

"github.com/cs3org/reva/v2/pkg/bytesize"
"github.com/test-go/testify/require"
)

func TestParseSpecial(t *testing.T) {
testCases := []struct {
Alias string
Input string
ExpectedOutput uint64
ExpectError bool
}{
{
Alias: "it assumes bytes",
Input: "100",
ExpectedOutput: 100,
},
{
Alias: "it accepts a space between value and unit",
Input: "1 MB",
ExpectedOutput: 1000000,
},
{
Alias: "it accepts also more spaces between value and unit",
Input: "1 MB",
ExpectedOutput: 1000000,
},
{
Alias: "it ignores trailing and leading spaces",
Input: " 1MB ",
ExpectedOutput: 1000000,
},
{
Alias: "it errors on unknown units",
Input: "1SB",
ExpectedOutput: 0,
ExpectError: true,
},
{
Alias: "it multiplies correctly",
Input: "16MB",
ExpectedOutput: 16000000,
},
{
Alias: "it errors when no value is given",
Input: "GB",
ExpectedOutput: 0,
ExpectError: true,
},
{
Alias: "it errors when bad input is given",
Input: ",as!@@delta",
ExpectedOutput: 0,
ExpectError: true,
},
{
Alias: "it errors when using floats",
Input: "1.024GB",
ExpectedOutput: 0,
ExpectError: true,
},
}

for _, tc := range testCases {
actual, err := bytesize.Parse(tc.Input)
if tc.ExpectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}

require.Equal(t, tc.ExpectError, err != nil, tc.Alias)
require.Equal(t, int(tc.ExpectedOutput), int(actual), tc.Alias)
}
}

func TestParseHappy(t *testing.T) {
testCases := []struct {
Input string
Expected uint64
}{
{Input: "1", Expected: 1},
{Input: "1KB", Expected: 1000},
{Input: "1MB", Expected: 1000000},
{Input: "1GB", Expected: 1000000000},
{Input: "1TB", Expected: 1000000000000},
{Input: "1PB", Expected: 1000000000000000},
{Input: "1EB", Expected: 1000000000000000000},
{Input: "1MiB", Expected: 1048576},
{Input: "1GiB", Expected: 1073741824},
{Input: "1TiB", Expected: 1099511627776},
{Input: "1PiB", Expected: 1125899906842624},
{Input: "1EiB", Expected: 1152921504606846976},
}

for _, tc := range testCases {
actual, err := bytesize.Parse(tc.Input)
require.NoError(t, err)
require.Equal(t, int(tc.Expected), int(actual), fmt.Sprintf("case %s", tc.Input))
}
}
35 changes: 12 additions & 23 deletions pkg/events/postprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type (
var (
// PPStepAntivirus is the step that scans for viruses
PPStepAntivirus Postprocessingstep = "virusscan"
// PPStepFTS is the step that indexes files for full text search
PPStepFTS Postprocessingstep = "fts"
// PPStepPolicies is the step the step that enforces policies
PPStepPolicies Postprocessingstep = "policies"
// PPStepDelay is the step that processing. Useful for testing or user annoyment
PPStepDelay Postprocessingstep = "delay"

Expand Down Expand Up @@ -68,26 +68,6 @@ func (BytesReceived) Unmarshal(v []byte) (interface{}, error) {
return e, err
}

// VirusscanFinished is emitted by the server when it has completed an antivirus scan
type VirusscanFinished struct {
Infected bool
Outcome PostprocessingOutcome
UploadID string
Filename string
ExecutingUser *user.User
Description string
Scandate time.Time
ResourceID *provider.ResourceId
ErrorMsg string // empty when no error
}

// Unmarshal to fulfill umarshaller interface
func (VirusscanFinished) Unmarshal(v []byte) (interface{}, error) {
e := VirusscanFinished{}
err := json.Unmarshal(v, &e)
return e, err
}

// StartPostprocessingStep can be issued by the server to start a postprocessing step
type StartPostprocessingStep struct {
UploadID string
Expand Down Expand Up @@ -116,7 +96,7 @@ type PostprocessingStepFinished struct {
Filename string

FinishedStep Postprocessingstep // name of the step
Result interface{} // result information
Result interface{} // result information see VirusscanResult for example
Error error // possible error of the step
Outcome PostprocessingOutcome // some services may cause postprocessing to stop
}
Expand All @@ -128,6 +108,15 @@ func (PostprocessingStepFinished) Unmarshal(v []byte) (interface{}, error) {
return e, err
}

// VirusscanResult is the Result of a PostprocessingStepFinished event from the antivirus
type VirusscanResult struct {
Infected bool
Description string
Scandate time.Time
ResourceID *provider.ResourceId
ErrorMsg string // empty when no error
}

// PostprocessingFinished is emitted by *some* service which can decide that
type PostprocessingFinished struct {
UploadID string
Expand Down
6 changes: 4 additions & 2 deletions pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/registry"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/cache"
"github.com/cs3org/reva/v2/pkg/utils"
Expand Down Expand Up @@ -156,7 +155,10 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
case "DELETE":
handler.DelFile(w, r)
case "GET":
download.GetOrHeadFile(w, r, fs, "")
// NOTE: this is breaking change - allthought it does not seem to be used
// We can make a switch here depending on some header value if that is needed
// download.GetOrHeadFile(w, r, fs, "")
handler.GetFile(w, r)
default:
w.WriteHeader(http.StatusNotImplemented)
}
Expand Down
22 changes: 14 additions & 8 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event
return nil, errors.New("need nats for async file processing")
}

ch, err := events.Consume(fs.stream, "dcfs", events.PostprocessingFinished{}, events.VirusscanFinished{})
ch, err := events.Consume(fs.stream, "dcfs", events.PostprocessingFinished{}, events.PostprocessingStepFinished{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -266,9 +266,14 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event")
}

/* LETS KEEP THIS COMMENTED UNTIL VIRUSSCANNING IS BACKMERGED
case events.VirusscanFinished:
if ev.ErrorMsg != "" {
case events.PostprocessingStepFinished:
if ev.FinishedStep != events.PPStepAntivirus {
// atm we are only interested in antivirus results
continue
}

res := ev.Result.(events.VirusscanResult)
if res.ErrorMsg != "" {
// scan failed somehow
// Should we handle this here?
continue
Expand All @@ -278,6 +283,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
switch ev.UploadID {
case "":
// uploadid is empty -> this was an on-demand scan
/* ON DEMAND SCANNING NOT SUPPORTED ATM
ctx := ctxpkg.ContextSetUser(context.Background(), ev.ExecutingUser)
ref := &provider.Reference{ResourceId: ev.ResourceID}

Expand Down Expand Up @@ -352,6 +358,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
continue
}
*/
default:
// uploadid is not empty -> this is an async upload
up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
Expand All @@ -360,7 +367,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
continue
}

no, err := node.ReadNode(up.Ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false)
no, err := node.ReadNode(up.Ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false, nil, false)
if err != nil {
log.Error().Err(err).Interface("uploadID", ev.UploadID).Msg("Failed to get node after scan")
continue
Expand All @@ -369,14 +376,13 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
n = no
}

if err := n.SetScanData(ev.Description, ev.Scandate); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("Failed to set scan results")
if err := n.SetScanData(res.Description, res.Scandate); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", res.ResourceID).Msg("Failed to set scan results")
continue
}

// remove cache entry in gateway
fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
*/
default:
log.Error().Interface("event", ev).Msg("Unknown event")
}
Expand Down