Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parallel downloads for COM scenarios #1588

Merged
merged 14 commits into from
Nov 17, 2021
6 changes: 4 additions & 2 deletions src/AppInstallerCLICore/Commands/COMInstallCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ namespace AppInstaller::CLI
// IMPORTANT: To use this command, the caller should have already retrieved the package manifest (GetManifest()) and added it to the Context Data
struct COMDownloadCommand final : public Command
{
COMDownloadCommand(std::string_view parent) : Command("download", parent) {}
constexpr static std::string_view CommandName = "download"sv;
COMDownloadCommand(std::string_view parent) : Command(CommandName, parent) {}

protected:
void ExecuteInternal(Execution::Context& context) const override;
Expand All @@ -17,7 +18,8 @@ namespace AppInstaller::CLI
// IMPORTANT: To use this command, the caller should have already retrieved the package manifest (GetManifest()) and added it to the Context Data
struct COMInstallCommand final : public Command
{
COMInstallCommand(std::string_view parent) : Command("install", parent) {}
constexpr static std::string_view CommandName = "install"sv;
COMInstallCommand(std::string_view parent) : Command(CommandName, parent) {}

protected:
void ExecuteInternal(Execution::Context& context) const override;
Expand Down
189 changes: 126 additions & 63 deletions src/AppInstallerCLICore/ContextOrchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,89 +21,159 @@ namespace AppInstaller::CLI::Execution
::AppInstaller::ProgressCallback progress;
std::shared_ptr<::AppInstaller::Repository::ISource> installingSource = ::AppInstaller::Repository::OpenPredefinedSource(::AppInstaller::Repository::PredefinedSource::Installing, progress);
m_installingWriteableSource = std::dynamic_pointer_cast<::AppInstaller::Repository::IMutablePackageSource>(installingSource);

AddCommandQueue(COMDownloadCommand::CommandName, 5);
florelis marked this conversation as resolved.
Show resolved Hide resolved
AddCommandQueue(COMInstallCommand::CommandName, 1);
}

_Requires_lock_held_(m_queueLock)
std::deque<std::shared_ptr<OrchestratorQueueItem>>::iterator ContextOrchestrator::FindIteratorById(const OrchestratorQueueItemId& comparisonQueueItemId)
void ContextOrchestrator::AddCommandQueue(std::string_view commandName, UINT32 allowedThreads)
{
return std::find_if(m_queueItems.begin(), m_queueItems.end(), [&comparisonQueueItemId](const std::shared_ptr<OrchestratorQueueItem>& item) {return (item->GetId().IsSame(comparisonQueueItemId)); });

m_commandQueues.emplace(commandName, std::make_unique<OrchestratorQueue>(commandName, allowedThreads));
}

_Requires_lock_held_(m_queueLock)
std::shared_ptr<OrchestratorQueueItem> ContextOrchestrator::FindById(const OrchestratorQueueItemId& comparisonQueueItemId)
{
for (const auto& queue : m_commandQueues)
{
auto item = queue.second->FindById(comparisonQueueItemId);
if (item)
{
return item;
}
}

return {};
}

void ContextOrchestrator::EnqueueAndRunItem(std::shared_ptr<OrchestratorQueueItem> item)
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };
THROW_HR_IF(HRESULT_FROM_WIN32(ERROR_INSTALL_ALREADY_RUNNING), FindById(item->GetId()));
m_commandQueues.at(std::string(item->GetNextCommand().Name()))->EnqueueAndRunItem(item, true);
}

void ContextOrchestrator::RequeueItem(std::shared_ptr<OrchestratorQueueItem> item)
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };
m_commandQueues.at(std::string(item->GetNextCommand().Name()))->EnqueueAndRunItem(item, false);
}

void ContextOrchestrator::RemoveItemInState(const OrchestratorQueueItem& item, OrchestratorQueueItemState state)
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };
for (const auto& queue : m_commandQueues)
{
if (queue.second->RemoveItemInState(item, state, true))
{
return;
}
}
}

void ContextOrchestrator::CancelQueueItem(const OrchestratorQueueItem& item)
{
// Always cancel the item, even if it isn't running yet, to get the terminationHR set correctly.
item.GetContext().Cancel(false, true);

RemoveItemInState(item, OrchestratorQueueItemState::Queued);
}

std::shared_ptr<OrchestratorQueueItem> ContextOrchestrator::GetQueueItem(const OrchestratorQueueItemId& queueItemId)
{
std::lock_guard<std::mutex> lock{ m_queueLock };

return FindById(queueItemId);
}

void ContextOrchestrator::AddItemManifestToInstallingSource(const OrchestratorQueueItem& queueItem)
{
const auto& manifest = queueItem.GetContext().Get<Execution::Data::Manifest>();
m_installingWriteableSource->AddPackageVersion(manifest, std::filesystem::path{ manifest.Id + '.' + manifest.Version });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pre-existing issue, but there is the potential for Id collisions from different sources here. Not an immediate concern but we might want to consider how we can store them all in the same index, or use a separate index per source. Most options are probably annoying and relatively expensive to implement, and the current potential for problems is low. Just leaving this here as a reminder to you and me both.

}

void ContextOrchestrator::RemoveItemManifestFromInstallingSource(const OrchestratorQueueItem& queueItem)
{
const auto& manifest = queueItem.GetContext().Get<Execution::Data::Manifest>();
m_installingWriteableSource->RemovePackageVersion(manifest, std::filesystem::path{ manifest.Id + '.' + manifest.Version });
}

_Requires_lock_held_(m_queueLock)
std::deque<std::shared_ptr<OrchestratorQueueItem>>::iterator OrchestratorQueue::FindIteratorById(const OrchestratorQueueItemId& comparisonQueueItemId)
{
return std::find_if(m_queueItems.begin(), m_queueItems.end(), [&comparisonQueueItemId](const std::shared_ptr<OrchestratorQueueItem>& item) {return (item->GetId().IsSame(comparisonQueueItemId)); });
}

_Requires_lock_held_(m_queueLock)
std::shared_ptr<OrchestratorQueueItem> OrchestratorQueue::FindById(const OrchestratorQueueItemId& comparisonQueueItemId)
{
auto itr = FindIteratorById(comparisonQueueItemId);
if (itr != m_queueItems.end())
{
return *itr;
}

return {};
}
void ContextOrchestrator::EnqueueItem(std::shared_ptr<OrchestratorQueueItem> item)

void OrchestratorQueue::EnqueueItem(std::shared_ptr<OrchestratorQueueItem> item, bool isFirstCommand)
{
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };

THROW_HR_IF(HRESULT_FROM_WIN32(ERROR_INSTALL_ALREADY_RUNNING), FindById(item->GetId()));
m_queueItems.push_back(item);
}

// Add the package to the Installing source so that it can be queried using the ISource interface.
const auto& manifest = item->GetContext().Get<Execution::Data::Manifest>();
m_installingWriteableSource->AddPackageVersion(manifest, std::filesystem::path{ manifest.Id + '.' + manifest.Version });
// Only do this the first time the item is queued.
if (isFirstCommand)
florelis marked this conversation as resolved.
Show resolved Hide resolved
{
ContextOrchestrator::Instance().AddItemManifestToInstallingSource(*item);
}

{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };
item->SetState(OrchestratorQueueItemState::Queued);
}
}

void ContextOrchestrator::RequeueItem(OrchestratorQueueItem& item)
void OrchestratorQueue::EnqueueAndRunItem(std::shared_ptr<OrchestratorQueueItem> item, bool isFirstCommand)
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };

item.SetState(OrchestratorQueueItemState::Queued);
}
EnqueueItem(item, isFirstCommand);

void ContextOrchestrator::EnqueueAndRunItem(std::shared_ptr<OrchestratorQueueItem> item)
{
EnqueueItem(item);

std::thread runnerThread(&ContextOrchestrator::RunItems, this);
runnerThread.detach();
{
// Start a runner for this item if there is capacity for it
std::lock_guard<std::mutex> lockQueue{ m_threadsLock };
if (m_runningThreads < m_allowedThreads)
florelis marked this conversation as resolved.
Show resolved Hide resolved
{
++m_runningThreads;
std::thread runnerThread(&OrchestratorQueue::RunItems, this);
runnerThread.detach();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably be keeping the std::thread objects because I don't think that keeping tracking of running threads with a counter will work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically I think that the use of a counter here is fragile; if the thread terminates abnormally it won't decrement. With proper handling you can make it safer and more robust, but you could also put that effort into using the kernel to know the answer. And it has just a bit more mileage than any code we write here 😄

So I would write a thread wrapper type and hold onto them. You can WaitForSingleObject on the native handle (probably) as suggested https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-getexitcodethread

That way we could say for a fact how many threads were still running at this point.

}
}
}

std::shared_ptr<OrchestratorQueueItem> ContextOrchestrator::GetNextItem()
std::shared_ptr<OrchestratorQueueItem> OrchestratorQueue::GetNextItem()
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };

if (m_queueItems.empty())
// Find next item that can be dequeued.
for (const auto& item : m_queueItems)
{
return {};
}

std::shared_ptr<OrchestratorQueueItem> item = m_queueItems.front();

// Check if item can be dequeued.
// Since only one item can be installed at a time currently the logic is very simple,
// and can just check if the first item is ready to run. This logic will need to become
// more complicated if multiple operation types (e.g. Download & Install) are added that can
// run simultaneously.
if (item->GetState() != OrchestratorQueueItemState::Queued)
{
return {};
if (item->GetState() == OrchestratorQueueItemState::Queued)
{
// Running state must be set inside the queueLock so that multiple threads don't try to run the same item.
item->SetState(OrchestratorQueueItemState::Running);
return item;
}
}

// Running state must be set inside the queueLock so that multiple threads don't try to run the same item.
item->SetState(OrchestratorQueueItemState::Running);
return item;
return {};
}

void ContextOrchestrator::RunItems()
void OrchestratorQueue::RunItems()
florelis marked this conversation as resolved.
Show resolved Hide resolved
{
std::shared_ptr<OrchestratorQueueItem> item = GetNextItem();
while(item != nullptr)
while (item != nullptr)
{
HRESULT terminationHR = S_OK;
try
Expand Down Expand Up @@ -133,21 +203,29 @@ namespace AppInstaller::CLI::Execution

if (FAILED(terminationHR) || item->IsComplete())
{
RemoveItemInState(*item, OrchestratorQueueItemState::Running);
RemoveItemInState(*item, OrchestratorQueueItemState::Running, true);
}
else
{
RequeueItem(*item);
// Remove item from this queue and add it to the queue for the next command.
RemoveItemInState(*item, OrchestratorQueueItemState::Running, false);
ContextOrchestrator::Instance().RequeueItem(item);
}

item = GetNextItem();
}

{
std::lock_guard<std::mutex> lockQueue{ m_threadsLock };
--m_runningThreads;
}
}

void ContextOrchestrator::RemoveItemInState(const OrchestratorQueueItem& item, OrchestratorQueueItemState state)

bool OrchestratorQueue::RemoveItemInState(const OrchestratorQueueItem& item, OrchestratorQueueItemState state, bool isGlobalRemove)
{
// OrchestratorQueueItemState::Running items should only be removed by the thread that ran the item.
// Queued items can be removed by any thread.
// Queued items can be removed by any thread.
// NotQueued items should not be removed since, if found in the queue, they are in the process of being queued by another thread.
bool foundItem = false;

Expand All @@ -163,28 +241,13 @@ namespace AppInstaller::CLI::Execution
}
}

if (foundItem)
if (foundItem && isGlobalRemove)
{
const auto& manifest = item.GetContext().Get<Execution::Data::Manifest>();
m_installingWriteableSource->RemovePackageVersion(manifest, std::filesystem::path{ manifest.Id + '.' + manifest.Version });

ContextOrchestrator::Instance().RemoveItemManifestFromInstallingSource(item);
item.GetCompletedEvent().SetEvent();
}
}

void ContextOrchestrator::CancelQueueItem(const OrchestratorQueueItem& item)
{
// Always cancel the item, even if it isn't running yet, to get the terminationHR set correctly.
item.GetContext().Cancel(false, true);

RemoveItemInState(item, OrchestratorQueueItemState::Queued);
}

std::shared_ptr<OrchestratorQueueItem> ContextOrchestrator::GetQueueItem(const OrchestratorQueueItemId& queueItemId)
{
std::lock_guard<std::mutex> lock{ m_queueLock };

return FindById(queueItemId);
return foundItem;
}

bool OrchestratorQueueItemId::IsSame(const OrchestratorQueueItemId& comparedId) const
Expand Down
60 changes: 54 additions & 6 deletions src/AppInstallerCLICore/ContextOrchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ namespace AppInstaller::CLI::Execution
const wil::unique_event& GetCompletedEvent() const { return m_completedEvent; }
const OrchestratorQueueItemId& GetId() const { return m_id; }
void AddCommand(std::unique_ptr<Command> command) { m_commands.push_back(std::move(command)); }
const Command& GetNextCommand() const { return *m_commands.front(); }
std::unique_ptr<Command> PopNextCommand()
{
std::unique_ptr<Command> command = std::move(m_commands.front());
m_commands.pop_front();
return command;
}
bool IsComplete() const { return m_commands.empty(); }

private:
OrchestratorQueueItemState m_state = OrchestratorQueueItemState::NotQueued;
std::unique_ptr<COMContext> m_context;
Expand All @@ -62,6 +64,8 @@ namespace AppInstaller::CLI::Execution
static std::unique_ptr<OrchestratorQueueItem> CreateItemForInstall(std::wstring packageId, std::wstring sourceId, std::unique_ptr<COMContext> context);
};

struct OrchestratorQueue;

struct ContextOrchestrator
{
ContextOrchestrator();
Expand All @@ -72,20 +76,64 @@ namespace AppInstaller::CLI::Execution

std::shared_ptr<OrchestratorQueueItem> GetQueueItem(const OrchestratorQueueItemId& queueItemId);

void RequeueItem(std::shared_ptr<OrchestratorQueueItem> queueItem);
void AddItemManifestToInstallingSource(const OrchestratorQueueItem& queueItem);
void RemoveItemManifestFromInstallingSource(const OrchestratorQueueItem& queueItem);

private:
std::mutex m_queueLock;
void RunItems();
std::shared_ptr<OrchestratorQueueItem> GetNextItem();
void EnqueueItem(std::shared_ptr<OrchestratorQueueItem> item);
void RequeueItem(OrchestratorQueueItem& item);
void AddCommandQueue(std::string_view commandName, UINT32 allowedThreads);
void RemoveItemInState(const OrchestratorQueueItem& item, OrchestratorQueueItemState state);

_Requires_lock_held_(m_queueLock)
std::deque<std::shared_ptr<OrchestratorQueueItem>>::iterator FindIteratorById(const OrchestratorQueueItemId& queueItemId);
_Requires_lock_held_(m_queueLock)
std::shared_ptr<OrchestratorQueueItem> FindById(const OrchestratorQueueItemId& queueItemId);

std::shared_ptr<::AppInstaller::Repository::IMutablePackageSource> m_installingWriteableSource = nullptr;
std::map<std::string, std::unique_ptr<OrchestratorQueue>> m_commandQueues;
};

// One of the queues used by the orchestrator.
// All items in the queue execute the same command.
// The queue allows multiple items to run at the same time, up to a limit.
struct OrchestratorQueue
{
OrchestratorQueue(std::string_view commandName, UINT32 allowedThreads) : m_commandName(commandName), m_allowedThreads(allowedThreads) {}

// Name of the command this queue can execute
std::string_view CommandName() const { return m_commandName; }

// Enqueues an item. If the queue has capacity, immediately starts running it.
void EnqueueAndRunItem(std::shared_ptr<OrchestratorQueueItem> item, bool isFirstCommand);

// Removes an item by id, provided that it is in the given state.
// Returns true if an item was removed.
// The item can be removed globally from the orchestrator, or from just this queue.
bool RemoveItemInState(const OrchestratorQueueItem& item, OrchestratorQueueItemState state, bool isGlobalRemove);

// Finds an item by id, if it is in the queue.
_Requires_lock_held_(m_queueLock)
std::shared_ptr<OrchestratorQueueItem> FindById(const OrchestratorQueueItemId& queueItemId);

private:
// Enqueues an item.
void EnqueueItem(std::shared_ptr<OrchestratorQueueItem> item, bool isFirstCommand);

// Runs items while there are more in the queue.
void RunItems();

// Gets the next item to run.
std::shared_ptr<OrchestratorQueueItem> GetNextItem();

_Requires_lock_held_(m_queueLock)
std::deque<std::shared_ptr<OrchestratorQueueItem>>::iterator FindIteratorById(const OrchestratorQueueItemId& comparisonQueueItemId);

std::string_view m_commandName;

std::mutex m_threadsLock;
const UINT32 m_allowedThreads;
UINT32 m_runningThreads = 0;

std::mutex m_queueLock;
std::deque<std::shared_ptr<OrchestratorQueueItem>> m_queueItems;
};
}
Loading