Skip to content

Commit

Permalink
Refacor: Change the way we delete files. Create a transaction and onl…
Browse files Browse the repository at this point in the history
…y commit after a message has been sent to the kafka topic.
  • Loading branch information
liel-almog committed Mar 17, 2024
1 parent 78884a6 commit 896aae8
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 25 deletions.
39 changes: 34 additions & 5 deletions backend/repositories/file_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/lielalmog/SkyArchive/backend/database"
"github.com/lielalmog/SkyArchive/backend/models"
"gopkg.in/errgo.v2/errors"
)

type FileRepository interface {
Expand All @@ -16,7 +17,7 @@ type FileRepository interface {
UpdateFavorite(ctx context.Context, fileId *int64, userId *int64, updateFavorite *models.UpdateFavoriteDTO) (int64, error)
UpdateDisplayName(ctx context.Context, fileId *int64, userId *int64, updateDisplayName *models.UpdateDisplayNameDTO) (int64, error)
GetFileByUser(ctx context.Context, fileId *int64, userId *int64) (*models.File, error)
DeleteFile(ctx context.Context, fileId *int64, userId *int64) (int64, error)
DeleteFile(ctx context.Context, ch <-chan error, fileId *int64, userId *int64) (int64, error)
}

type fileRepositoryImpl struct {
Expand Down Expand Up @@ -121,19 +122,47 @@ func (u *fileRepositoryImpl) GetFileByUser(ctx context.Context, fileId *int64, u
return file, nil
}

func (u *fileRepositoryImpl) DeleteFile(ctx context.Context, fileId *int64, userId *int64) (int64, error) {
func (u *fileRepositoryImpl) DeleteFile(ctx context.Context, ch <-chan error, fileId *int64, userId *int64) (int64, error) {
queryCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

commandTag, err := u.db.Pool.Exec(queryCtx,
`DELETE FROM files
tx, err := u.db.Pool.Begin(queryCtx)
if err != nil {
return 0, err
}

commandTag, err := tx.Exec(queryCtx,
`DELETE FROM files
WHERE file_id = $1 AND user_id = $2`,
fileId, userId)

if err != nil {
tx.Rollback(queryCtx)
return 0, err
}

return commandTag.RowsAffected(), nil
select {
case err := <-ch:
if err != nil {
tx.Rollback(queryCtx)
return 0, err
}

err = tx.Commit(queryCtx)
if err != nil {
return 0, err
}

return commandTag.RowsAffected(), nil

case <-time.After(5 * time.Second):
tx.Rollback(queryCtx)
return 0, errors.New("timeout")

case <-queryCtx.Done():
tx.Rollback(queryCtx)
return 0, errors.New("timeout")
}
}

func newFileRepository() *fileRepositoryImpl {
Expand Down
50 changes: 30 additions & 20 deletions backend/services/file_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,36 +160,46 @@ func (u *fileServiceImpl) UpdateDisplayName(ctx context.Context, fileId *int64,
}

func (u *fileServiceImpl) DeleteFile(ctx context.Context, fileId *int64, userId *int64) error {
n, err := u.fileRepository.DeleteFile(ctx, fileId, userId)
if err != nil {
return err
}
writer := kafka.GetKafkaProducer()

if n == 0 {
return fiber.NewError(fiber.StatusNotFound, "file not found")
}
kafkaResult := make(chan error)

writer := kafka.GetKafkaProducer()
go func() {
defer close(kafkaResult)

payload, err := json.Marshal(models.KafkaFileUploadFinalizationMessage{
FileId: fileId,
})
if err != nil {
return err
}
payload, err := json.Marshal(models.KafkaFileUploadFinalizationMessage{
FileId: fileId,
})
if err != nil {
kafkaResult <- err
return
}

writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

err = writer.WriteMessages(writeCtx, segKafka.Message{
Value: payload,
Topic: kafka.FileDeleteTopic,
})
err = writer.WriteMessages(writeCtx, segKafka.Message{
Value: payload,
Topic: kafka.FileDeleteTopic,
})

if err != nil {
kafkaResult <- err
return
}

kafkaResult <- nil
}()

n, err := u.fileRepository.DeleteFile(ctx, kafkaResult, fileId, userId)
if err != nil {
return err
}

if n == 0 {
return fiber.NewError(fiber.StatusNotFound, "file not found")
}

return nil
}

Expand Down

0 comments on commit 896aae8

Please sign in to comment.