Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow generating legal hold bundles in filestore directly #70

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 133 additions & 1 deletion server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"path/filepath"
"time"

"github.com/gorilla/mux"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/pkg/errors"

"github.com/mattermost/mattermost-plugin-legal-hold/server/model"
"github.com/mattermost/mattermost-plugin-legal-hold/server/store/filebackend"
)

const requestBodyMaxSizeBytes = 1024 * 1024 // 1MB
Expand Down Expand Up @@ -44,6 +46,7 @@ func (p *Plugin) ServeHTTP(_ *plugin.Context, w http.ResponseWriter, r *http.Req
router.HandleFunc("/api/v1/legalhold/{legalhold_id:[A-Za-z0-9]+}/release", p.releaseLegalHold).Methods(http.MethodPost)
router.HandleFunc("/api/v1/legalhold/{legalhold_id:[A-Za-z0-9]+}/update", p.updateLegalHold).Methods(http.MethodPost)
router.HandleFunc("/api/v1/legalhold/{legalhold_id:[A-Za-z0-9]+}/download", p.downloadLegalHold).Methods(http.MethodGet)
router.HandleFunc("/api/v1/legalhold/{legalhold_id:[A-Za-z0-9]+}/bundle", p.bundleLegalHold).Methods(http.MethodPost)
router.HandleFunc("/api/v1/test_amazon_s3_connection", p.testAmazonS3Connection).Methods(http.MethodPost)

// Other routes
Expand Down Expand Up @@ -245,8 +248,8 @@ func (p *Plugin) downloadLegalHold(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", "legalholddata.zip"))
w.WriteHeader(http.StatusOK)

// Write the files to the download on-the-fly.
zipWriter := zip.NewWriter(w)
// Write the files to the download on-the-fly.
for _, entry := range files {
header := &zip.FileHeader{
Name: entry,
Expand Down Expand Up @@ -295,6 +298,135 @@ func (p *Plugin) downloadLegalHold(w http.ResponseWriter, r *http.Request) {
}
}

func (p *Plugin) bundleLegalHold(w http.ResponseWriter, r *http.Request) {
// Get the LegalHold.
legalholdID, err := RequireLegalHoldID(r)
if err != nil {
http.Error(w, "failed to parse LegalHold ID", http.StatusBadRequest)
p.Client.Log.Error(err.Error())
return
}

legalHold, err := p.KVStore.GetLegalHoldByID(legalholdID)
if err != nil {
http.Error(w, "failed to download legal hold", http.StatusInternalServerError)
p.Client.Log.Error(err.Error())
return
}

// Get the list of files to include in the download.
files, err := p.FileBackend.ListDirectoryRecursively(legalHold.BasePath())
if err != nil {
http.Error(w, "failed to download legal hold", http.StatusInternalServerError)
p.Client.Log.Error(err.Error())
return
}

mattermostUserID := r.Header.Get("Mattermost-User-Id")

channel, appErr := p.API.GetDirectChannel(p.botUserID, mattermostUserID)
if appErr != nil {
http.Error(w, "failed to download legal hold", http.StatusInternalServerError)
p.Client.Log.Error(appErr.Error())
return
}

initialPost, appErr := p.API.CreatePost(&mattermostModel.Post{
UserId: p.botUserID,
ChannelId: channel.Id,
Message: "Generating legal hold bundle in the background as per the request. You will be notified once the download is ready.",
})
if appErr != nil {
http.Error(w, "failed to download legal hold", http.StatusInternalServerError)
p.Client.Log.Error(appErr.Error())
return
}

p.Client.Log.Info("Generating legal hold bundle on S3")
go func() {
errGoro := p.KVStore.LockLegalHold(legalholdID, "bundle")
if errGoro != nil {
p.Client.Log.Error("failed to lock legal hold before download task", errGoro.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}

defer func() {
if errGoro := p.KVStore.UnlockLegalHold(legalholdID, "bundle"); errGoro != nil {
p.Client.Log.Error("failed to unlock legal hold after download task", errGoro.Error())
}
}()

// Use a custom writter to use the Write/Append functions while we generate the boject on
fmartingr marked this conversation as resolved.
Show resolved Hide resolved
// the fly and avoiding to store it on the local disk.
// Also usign a buffer to avoid writing to the S3 object in small chunks, since the minimal
fmartingr marked this conversation as resolved.
Show resolved Hide resolved
// size for a source in the underneath `ComposeObject` call is 5MB, so using 5MB as buffer size.
filename := filepath.Join(model.FilestoreBundlePath, fmt.Sprintf("%s_%d.zip", legalholdID, time.Now().Unix()))
zipWriter := zip.NewWriter(
bufio.NewWriterSize(
filebackend.NewFileBackendWritter(p.FileBackend, filename),
1024*1024*5, // 5MB
))

bytesWritten := int64(0)
for _, entry := range files {
header := &zip.FileHeader{
Name: entry,
Method: zip.Deflate, // deflate also works, but at a cost
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment makes it seem like we're not using Deflate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this verbatim from the original code, it's also confusing to me tbh.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a comment that outlasted a code change that it shouldn't have. I'd remove it.

Modified: time.Now(),
}

entryWriter, errGoro := zipWriter.CreateHeader(header)
if errGoro != nil {
p.Client.Log.Error(errGoro.Error())
return
}

backendReader, errGoro := p.FileBackend.Reader(entry)
if errGoro != nil {
p.Client.Log.Error(errGoro.Error())
return
}

fileReader := bufio.NewReader(backendReader)

loopBytesWritten, errGoro := io.Copy(entryWriter, fileReader)
if errGoro != nil {
p.Client.Log.Error(errGoro.Error())
return
}
bytesWritten += loopBytesWritten
}

if errGoro := zipWriter.Close(); errGoro != nil {
p.Client.Log.Error(errGoro.Error())
return
}

_, appErr = p.API.CreatePost(&mattermostModel.Post{
UserId: p.botUserID,
ChannelId: channel.Id,
RootId: initialPost.Id,
Message: fmt.Sprintf("Legal hold bundle is ready for download. You can find it under `%s` in your storage provider.", filename),
})
if appErr != nil {
p.Client.Log.Error(appErr.Error())
return
}
fmartingr marked this conversation as resolved.
Show resolved Hide resolved
}()

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
err = json.NewEncoder(w).Encode(struct {
Message string `json:"message"`
}{
Message: "Legal hold bundle is being generated. You will be notified once it is ready.",
})
if err != nil {
p.API.LogError("failed to write http response", err.Error())
}
}

func (p *Plugin) runJobFromAPI(w http.ResponseWriter, _ *http.Request) {
_, err := w.Write([]byte("Processing all Legal Holds. Please check the MM server logs for more details."))
if err != nil {
Expand Down
47 changes: 47 additions & 0 deletions server/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"path/filepath"
"strconv"
"time"

"github.com/mattermost/mattermost-plugin-legal-hold/server/model"
)

// JobCleanupOldBundlesFromFilestore is a job that cleans old legal hold bundles from the filestore by
// checking the timestamp in the filename and ensuring that bundles older than 24h are deleted.
func (p *Plugin) jobCleanupOldBundlesFromFilestore() {
p.API.LogDebug("Starting legal hold cleanup job")

files, jobErr := p.FileBackend.ListDirectory(model.FilestoreBundlePath)
if jobErr != nil {
p.Client.Log.Error("failed to list directory", "err", jobErr)
return
}

for _, file := range files {
parts := model.FilestoreBundleRegex.FindStringSubmatch(filepath.Base(file))
if len(parts) != 3 {
p.Client.Log.Error("Skipping file", "file", file, "reason", "does not match regex", "parts", parts)
continue
}

// golang parse unix time
parsedTimestamp, errStrConv := strconv.ParseInt(parts[2], 10, 64)
if errStrConv != nil {
p.Client.Log.Error("Skipping file", "file", file, "reason", "failed to parse timestamp", "err", errStrConv)
continue
}
fileCreationTime := time.Unix(parsedTimestamp, 0)
if time.Since(fileCreationTime) > time.Hour*24 {
p.Client.Log.Debug("Deleting file", "file", file)
if err := p.FileBackend.RemoveFile(file); err != nil {
p.Client.Log.Error("Failed to delete file", "file", file, "err", err)
}
}

p.Client.Log.Debug("Checking file", "file", file, "parts", parts)
}

p.API.LogDebug("Finished legal hold cleanup job")
}
14 changes: 14 additions & 0 deletions server/model/legal_hold.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,25 @@ package model

import (
"fmt"
"path/filepath"
"regexp"

mattermostModel "github.com/mattermost/mattermost-server/v6/model"
"github.com/pkg/errors"

"github.com/mattermost/mattermost-plugin-legal-hold/server/utils"
)

const (
baseFilestorePath = "legal_hold"
)

var FilestoreBundlePath = filepath.Join(baseFilestorePath, "download")

// FilestoreBundleRegex is a regular expression that matches the filename of a
// legal hold bundle: <bundle_id>_<unix timestamp>.zip
var FilestoreBundleRegex = regexp.MustCompile(`^([a-z0-9]+)_([0-9]+)\.zip$`)

// LegalHold represents one legal hold.
type LegalHold struct {
ID string `json:"id"`
Expand All @@ -23,6 +35,7 @@ type LegalHold struct {
LastExecutionEndedAt int64 `json:"last_execution_ended_at"`
ExecutionLength int64 `json:"execution_length"`
Secret string `json:"secret"`
Locks []string `json:"locks"`
}

// DeepCopy creates a deep copy of the LegalHold.
Expand All @@ -43,6 +56,7 @@ func (lh *LegalHold) DeepCopy() LegalHold {
LastExecutionEndedAt: lh.LastExecutionEndedAt,
ExecutionLength: lh.ExecutionLength,
Secret: lh.Secret,
Locks: lh.Locks,
}

if len(lh.UserIDs) > 0 {
Expand Down
31 changes: 27 additions & 4 deletions server/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"fmt"
"reflect"
"sync"
"time"

"github.com/gorilla/mux"
pluginapi "github.com/mattermost/mattermost-plugin-api"
"github.com/mattermost/mattermost-server/v6/model"
"github.com/mattermost/mattermost-plugin-api/cluster"
mattermostModel "github.com/mattermost/mattermost-server/v6/model"
"github.com/mattermost/mattermost-server/v6/plugin"
"github.com/mattermost/mattermost-server/v6/shared/filestore"
"github.com/pkg/errors"
Expand Down Expand Up @@ -53,6 +55,12 @@ type Plugin struct {

// router holds the HTTP router for the plugin's rest API
router *mux.Router

// Bot user ID
botUserID string

// cleanupJob is the job that cleans up old legal hold jobs bunldes from the filestore
cleanupJob *cluster.Job
}

func (p *Plugin) OnActivate() error {
Expand Down Expand Up @@ -84,6 +92,15 @@ func (p *Plugin) OnActivate() error {

p.KVStore = kvstore.NewKVStore(p.Client)

// Create bot user
p.botUserID, err = p.API.EnsureBotUser(&mattermostModel.Bot{
Username: "legal_hold_bot",
DisplayName: "Legal Hold Bot",
})
if err != nil {
return err
}

// Create job manager
p.jobManager = jobs.NewJobManager(&p.Client.Log)

Expand Down Expand Up @@ -195,16 +212,22 @@ func (p *Plugin) Reconfigure() error {
if err != nil {
return fmt.Errorf("cannot create legal hold job: %w", err)
}
if err := p.jobManager.AddJob(p.legalHoldJob); err != nil {
if err = p.jobManager.AddJob(p.legalHoldJob); err != nil {
return fmt.Errorf("cannot add legal hold job: %w", err)
}
_ = p.jobManager.OnConfigurationChange(p.getConfiguration())

// Setup cluster cleanup job
p.cleanupJob, err = cluster.Schedule(p.API, "legal_hold_bundle_cleanup", cluster.MakeWaitForRoundedInterval(time.Minute), p.jobCleanupOldBundlesFromFilestore)
if err != nil {
return fmt.Errorf("error setting up cluster cleanup job: %w", err)
}

return nil
}

func FixedFileSettingsToFileBackendSettings(fileSettings model.FileSettings, enableComplianceFeature bool, skipVerify bool) filestore.FileBackendSettings {
if *fileSettings.DriverName == model.ImageDriverLocal {
func FixedFileSettingsToFileBackendSettings(fileSettings mattermostModel.FileSettings, enableComplianceFeature bool, skipVerify bool) filestore.FileBackendSettings {
if *fileSettings.DriverName == mattermostModel.ImageDriverLocal {
return filestore.FileBackendSettings{
DriverName: *fileSettings.DriverName,
Directory: *fileSettings.Directory,
Expand Down
37 changes: 37 additions & 0 deletions server/store/filebackend/writter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package filebackend

import (
"bytes"
"io"
"log"

"github.com/mattermost/mattermost-server/v6/shared/filestore"
)

// fileBackendWritter is a simple io.Writer that writes to a file using a filestore.FileBackend
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this is spelled Writter rather than Writer?

type fileBackendWritter struct {
filePath string
fileBackend filestore.FileBackend
// created is used to know if the file has been created or not, to use either WriteFile or AppendFile
created bool
}

func (s *fileBackendWritter) Write(p []byte) (n int, err error) {
var written int64
if !s.created {
s.created = true
log.Println("writeFile")
fmartingr marked this conversation as resolved.
Show resolved Hide resolved
written, err = s.fileBackend.WriteFile(bytes.NewReader(p), s.filePath)
} else {
log.Println("appendFile")
fmartingr marked this conversation as resolved.
Show resolved Hide resolved
written, err = s.fileBackend.AppendFile(bytes.NewReader(p), s.filePath)
}
return int(written), err
}

func NewFileBackendWritter(fileBackend filestore.FileBackend, filePath string) io.Writer {
return &fileBackendWritter{
filePath: filePath,
fileBackend: fileBackend,
}
}
3 changes: 3 additions & 0 deletions server/store/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ type KVStore interface {
GetLegalHoldByID(id string) (*model.LegalHold, error)
UpdateLegalHold(lh, oldValue model.LegalHold) (*model.LegalHold, error)
DeleteLegalHold(id string) error
LockLegalHold(id, lockType string) error
UnlockLegalHold(id, lockType string) error
IsLockedLegalHold(id, lockType string) (bool, error)
}
Loading
Loading