Skip to content

Commit

Permalink
Transfer - Snapshot may be incomplete (#1016)
Browse files Browse the repository at this point in the history
  • Loading branch information
yahavi authored Nov 5, 2023
1 parent 13b1588 commit d18fb59
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 25 deletions.
40 changes: 23 additions & 17 deletions artifactory/commands/transferfiles/fulltransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,15 @@ func (m *fullTransferPhase) run() error {
if ShouldStop(&m.phaseBase, &delayHelper, errorsChannelMng) {
return nil
}
folderHandler := m.createFolderFullTransferHandlerFunc(*pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
_, err := pcWrapper.chunkBuilderProducerConsumer.AddTaskWithError(folderHandler(folderParams{relativePath: "."}), pcWrapper.errorsQueue.AddError)

// Get the directory's node from the snapshot manager, and use information from previous transfer attempts if such exists.
node, done, err := m.getAndHandleDirectoryNode(".")
if err != nil || done {
return err
}

folderHandler := m.createFolderFullTransferHandlerFunc(node, *pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
_, err = pcWrapper.chunkBuilderProducerConsumer.AddTaskWithError(folderHandler(folderParams{relativePath: "."}), pcWrapper.errorsQueue.AddError)
return err
}
delayAction := func(phase phaseBase, addedDelayFiles []string) error {
Expand All @@ -117,17 +124,17 @@ type folderParams struct {
relativePath string
}

func (m *fullTransferPhase) createFolderFullTransferHandlerFunc(pcWrapper producerConsumerWrapper, uploadChunkChan chan UploadedChunk,
func (m *fullTransferPhase) createFolderFullTransferHandlerFunc(node *reposnapshot.Node, pcWrapper producerConsumerWrapper, uploadChunkChan chan UploadedChunk,
delayHelper delayUploadHelper, errorsChannelMng *ErrorsChannelMng) folderFullTransferHandlerFunc {
return func(params folderParams) parallel.TaskFunc {
return func(threadId int) error {
logMsgPrefix := clientUtils.GetLogMsgPrefix(threadId, false)
return m.transferFolder(params, logMsgPrefix, pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
return m.transferFolder(node, params, logMsgPrefix, pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
}
}
}

func (m *fullTransferPhase) transferFolder(params folderParams, logMsgPrefix string, pcWrapper producerConsumerWrapper,
func (m *fullTransferPhase) transferFolder(node *reposnapshot.Node, params folderParams, logMsgPrefix string, pcWrapper producerConsumerWrapper,
uploadChunkChan chan UploadedChunk, delayHelper delayUploadHelper, errorsChannelMng *ErrorsChannelMng) (err error) {
log.Debug(logMsgPrefix+"Handling folder:", path.Join(m.repoKey, params.relativePath))

Expand All @@ -139,12 +146,6 @@ func (m *fullTransferPhase) transferFolder(params folderParams, logMsgPrefix str
return
}

// Get the directory's node from the snapshot manager, and use information from previous transfer attempts if such exist.
node, done, err := m.getAndHandleDirectoryNode(params, logMsgPrefix)
if err != nil || done {
return err
}

curUploadChunk, err := m.searchAndHandleFolderContents(params, pcWrapper,
uploadChunkChan, delayHelper, errorsChannelMng, node)
if err != nil {
Expand Down Expand Up @@ -227,7 +228,13 @@ func (m *fullTransferPhase) handleFoundChildFolder(params folderParams, pcWrappe
item servicesUtils.ResultItem) (err error) {
newRelativePath := getFolderRelativePath(item.Name, params.relativePath)

folderHandler := m.createFolderFullTransferHandlerFunc(pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
// Get the directory's node from the snapshot manager, and use information from previous transfer attempts if such exists.
node, done, err := m.getAndHandleDirectoryNode(newRelativePath)
if err != nil || done {
return err
}

folderHandler := m.createFolderFullTransferHandlerFunc(node, pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
_, err = pcWrapper.chunkBuilderProducerConsumer.AddTaskWithError(folderHandler(folderParams{relativePath: newRelativePath}), pcWrapper.errorsQueue.AddError)
return
}
Expand Down Expand Up @@ -289,15 +296,14 @@ func generateFolderContentAqlQuery(repoKey, relativePath string, paginationOffse
// node - A node in the repository snapshot tree, which represents the current directory.
// completed - Whether handling the node directory was completed. If it wasn't fully transferred, we start exploring and transferring it from scratch.
// previousChildren - If the directory requires exploring, previously known children will be added from this map in order to preserve their states and references.
func (m *fullTransferPhase) getAndHandleDirectoryNode(params folderParams, logMsgPrefix string) (node *reposnapshot.Node, completed bool, err error) {
node, err = m.stateManager.LookUpNode(params.relativePath)
func (m *fullTransferPhase) getAndHandleDirectoryNode(relativePath string) (node *reposnapshot.Node, completed bool, err error) {
node, err = m.stateManager.LookUpNode(relativePath)
if err != nil {
return
}

// If data was not loaded from snapshot, we know that the node is visited for the first time and was not explored.
loadedFromSnapshot, err := m.stateManager.WasSnapshotLoaded()
if err != nil || !loadedFromSnapshot {
if !m.stateManager.WasSnapshotLoaded() {
return
}

Expand All @@ -306,7 +312,7 @@ func (m *fullTransferPhase) getAndHandleDirectoryNode(params folderParams, logMs
return
}
if completed {
log.Debug(logMsgPrefix+"Skipping completed folder:", path.Join(m.repoKey, params.relativePath))
log.Debug("Skipping completed folder:", path.Join(m.repoKey, relativePath))
return
}
// If the node was not completed, we will start exploring it from the beginning.
Expand Down
8 changes: 2 additions & 6 deletions artifactory/commands/transferfiles/state/transfersnapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,8 @@ func (ts *TransferStateManager) LookUpNode(relativePath string) (requestedNode *
return
}

func (ts *TransferStateManager) WasSnapshotLoaded() (wasLoaded bool, err error) {
err = ts.snapshotAction(func(rts *RepoTransferSnapshot) error {
wasLoaded = rts.loadedFromSnapshot
return nil
})
return
func (ts *TransferStateManager) WasSnapshotLoaded() bool {
return ts.repoTransferSnapshot.loadedFromSnapshot
}

func (ts *TransferStateManager) GetDirectorySnapshotNodeWithLru(relativePath string) (node *reposnapshot.Node, err error) {
Expand Down
5 changes: 3 additions & 2 deletions utils/reposnapshot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,10 @@ func (node *Node) IncrementFilesCount(fileSize uint64) error {

func (node *Node) DecrementFilesCount() error {
return node.action(func(node *Node) error {
if node.filesCount > 0 {
node.filesCount--
if node.filesCount == 0 {
return errorutils.CheckErrorf("attempting to decrease file count in node '%s', but the files count is already 0", node.name)
}
node.filesCount--
return nil
})
}
Expand Down

0 comments on commit d18fb59

Please sign in to comment.