Skip to content

Commit

Permalink
Fix: change group id in the file coordinator service
Browse files Browse the repository at this point in the history
  • Loading branch information
liel-almog committed Mar 17, 2024
1 parent d81a81a commit 09a0d89
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
2 changes: 1 addition & 1 deletion storage-coordinator/file-delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func fileDelete(connString string) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: strings.Split(brokers, ","),
Topic: fileDeleteTopic,
GroupID: "upload-permanent-backup",
GroupID: "delete-permanent-backup",
})

for {
Expand Down
8 changes: 4 additions & 4 deletions storage-coordinator/file-upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func fileUpload(connString string) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: strings.Split(brokers, ","),
Topic: fileUplaodFinilizationTopic,
GroupID: "delete-permanent-backup",
GroupID: "upload-permanent-backup",
})

for {
Expand Down Expand Up @@ -148,12 +148,12 @@ func fileUpload(connString string) {
wg.Done()
}()

wg.Wait()
r.CommitMessages(context.Background(), m)
url := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", stgAccountName, permanentContainerName, fileName)

_, err = db.Pool.Exec(context.Background(), "UPDATE files SET status = 'uploaded', url = $1 WHERE file_id = $2", url, *payload.FileId)

wg.Wait()
r.CommitMessages(context.Background(), m)

if err != nil {
panic(err)
}
Expand Down

0 comments on commit 09a0d89

Please sign in to comment.