Skip to content

Commit

Permalink
Implement parallel downloads for COM scenarios (#1588)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chacón authored Nov 17, 2021
1 parent 7d178a6 commit 39034e1
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ Testrun
testsettingname
TEXTFORMAT
TEXTINCLUDE
there're
Threadpool
Timeline
todo
tokenizer
Expand Down
6 changes: 4 additions & 2 deletions src/AppInstallerCLICore/Commands/COMInstallCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ namespace AppInstaller::CLI
// IMPORTANT: To use this command, the caller should have already retrieved the package manifest (GetManifest()) and added it to the Context Data
struct COMDownloadCommand final : public Command
{
COMDownloadCommand(std::string_view parent) : Command("download", parent) {}
constexpr static std::string_view CommandName = "download"sv;
COMDownloadCommand(std::string_view parent) : Command(CommandName, parent) {}

protected:
void ExecuteInternal(Execution::Context& context) const override;
Expand All @@ -17,7 +18,8 @@ namespace AppInstaller::CLI
// IMPORTANT: To use this command, the caller should have already retrieved the package manifest (GetManifest()) and added it to the Context Data
struct COMInstallCommand final : public Command
{
COMInstallCommand(std::string_view parent) : Command("install", parent) {}
constexpr static std::string_view CommandName = "install"sv;
COMInstallCommand(std::string_view parent) : Command(CommandName, parent) {}

protected:
void ExecuteInternal(Execution::Context& context) const override;
Expand Down
255 changes: 187 additions & 68 deletions src/AppInstallerCLICore/ContextOrchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@

namespace AppInstaller::CLI::Execution
{
namespace
{
// Callback function used by worker threads in the queue.
// context must be a pointer to a queue item.
void CALLBACK OrchestratorQueueWorkCallback(PTP_CALLBACK_INSTANCE, PVOID context, PTP_WORK)
{
auto queueItem = reinterpret_cast<OrchestratorQueueItem*>(context);
auto queue = queueItem->GetCurrentQueue();
if (queue)
{
queue->RunItem(queueItem->GetId());
}
}
}

ContextOrchestrator& ContextOrchestrator::Instance()
{
static ContextOrchestrator s_instance;
Expand All @@ -21,90 +36,195 @@ namespace AppInstaller::CLI::Execution
ProgressCallback progress;
m_installingWriteableSource = Repository::Source(Repository::PredefinedSource::Installing);
m_installingWriteableSource.Open(progress);

// Decide how many threads to use for each command.
// We always allow only one install at a time.
// For download, if we can find the number of supported concurrent threads,
// use that as the maximum (up to 3); otherwise use a single thread.
const auto supportedConcurrentThreads = std::thread::hardware_concurrency();
const UINT32 maxDownloadThreads = 3;
const UINT32 installThreads = 1;
const UINT32 downloadThreads = std::min(supportedConcurrentThreads ? supportedConcurrentThreads - 1 : 1, maxDownloadThreads);

AddCommandQueue(COMDownloadCommand::CommandName, downloadThreads);
AddCommandQueue(COMInstallCommand::CommandName, installThreads);
}

_Requires_lock_held_(m_queueLock)
std::deque<std::shared_ptr<OrchestratorQueueItem>>::iterator ContextOrchestrator::FindIteratorById(const OrchestratorQueueItemId& comparisonQueueItemId)
void ContextOrchestrator::AddCommandQueue(std::string_view commandName, UINT32 allowedThreads)
{
return std::find_if(m_queueItems.begin(), m_queueItems.end(), [&comparisonQueueItemId](const std::shared_ptr<OrchestratorQueueItem>& item) {return (item->GetId().IsSame(comparisonQueueItemId)); });

m_commandQueues.emplace(commandName, std::make_unique<OrchestratorQueue>(commandName, allowedThreads));
}

_Requires_lock_held_(m_queueLock)
std::shared_ptr<OrchestratorQueueItem> ContextOrchestrator::FindById(const OrchestratorQueueItemId& comparisonQueueItemId)
{
auto itr = FindIteratorById(comparisonQueueItemId);
if (itr != m_queueItems.end())
for (const auto& queue : m_commandQueues)
{
return *itr;
auto item = queue.second->FindById(comparisonQueueItemId);
if (item)
{
return item;
}
}

return {};
}
void ContextOrchestrator::EnqueueItem(std::shared_ptr<OrchestratorQueueItem> item)

void ContextOrchestrator::EnqueueAndRunItem(std::shared_ptr<OrchestratorQueueItem> item)
{
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };
std::lock_guard<std::mutex> lockQueue{ m_queueLock };

if (item->IsOnFirstCommand())
{
THROW_HR_IF(HRESULT_FROM_WIN32(ERROR_INSTALL_ALREADY_RUNNING), FindById(item->GetId()));
m_queueItems.push_back(item);
}

// Add the package to the Installing source so that it can be queried using the Source interface.
const auto& manifest = item->GetContext().Get<Execution::Data::Manifest>();
m_installingWriteableSource.AddPackageVersion(manifest, std::filesystem::path{ manifest.Id + '.' + manifest.Version });
m_commandQueues.at(std::string(item->GetNextCommand().Name()))->EnqueueAndRunItem(item);
}

void ContextOrchestrator::RemoveItemInState(const OrchestratorQueueItem& item, OrchestratorQueueItemState state)
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };
for (const auto& queue : m_commandQueues)
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };
item->SetState(OrchestratorQueueItemState::Queued);
if (queue.second->RemoveItemInState(item, state, true))
{
return;
}
}
}

void ContextOrchestrator::RequeueItem(OrchestratorQueueItem& item)
void ContextOrchestrator::CancelQueueItem(const OrchestratorQueueItem& item)
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };
// Always cancel the item, even if it isn't running yet, to get the terminationHR set correctly.
item.GetContext().Cancel(false, true);

item.SetState(OrchestratorQueueItemState::Queued);
RemoveItemInState(item, OrchestratorQueueItemState::Queued);
}

void ContextOrchestrator::EnqueueAndRunItem(std::shared_ptr<OrchestratorQueueItem> item)
std::shared_ptr<OrchestratorQueueItem> ContextOrchestrator::GetQueueItem(const OrchestratorQueueItemId& queueItemId)
{
EnqueueItem(item);
std::lock_guard<std::mutex> lock{ m_queueLock };

std::thread runnerThread(&ContextOrchestrator::RunItems, this);
runnerThread.detach();
return FindById(queueItemId);
}

std::shared_ptr<OrchestratorQueueItem> ContextOrchestrator::GetNextItem()
void ContextOrchestrator::AddItemManifestToInstallingSource(const OrchestratorQueueItem& queueItem)
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };
const auto& manifest = queueItem.GetContext().Get<Execution::Data::Manifest>();
m_installingWriteableSource.AddPackageVersion(manifest, std::filesystem::path{ manifest.Id + '.' + manifest.Version });
}

void ContextOrchestrator::RemoveItemManifestFromInstallingSource(const OrchestratorQueueItem& queueItem)
{
const auto& manifest = queueItem.GetContext().Get<Execution::Data::Manifest>();
m_installingWriteableSource.RemovePackageVersion(manifest, std::filesystem::path{ manifest.Id + '.' + manifest.Version });
}

if (m_queueItems.empty())
_Requires_lock_held_(m_queueLock)
std::deque<std::shared_ptr<OrchestratorQueueItem>>::iterator OrchestratorQueue::FindIteratorById(const OrchestratorQueueItemId& comparisonQueueItemId)
{
return std::find_if(m_queueItems.begin(), m_queueItems.end(), [&comparisonQueueItemId](const std::shared_ptr<OrchestratorQueueItem>& item) {return (item->GetId().IsSame(comparisonQueueItemId)); });
}

_Requires_lock_held_(m_queueLock)
std::shared_ptr<OrchestratorQueueItem> OrchestratorQueue::FindById(const OrchestratorQueueItemId& comparisonQueueItemId)
{
auto itr = FindIteratorById(comparisonQueueItemId);
if (itr != m_queueItems.end())
{
return {};
return *itr;
}

std::shared_ptr<OrchestratorQueueItem> item = m_queueItems.front();
return {};
}

void OrchestratorQueue::EnqueueItem(std::shared_ptr<OrchestratorQueueItem> item)
{
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };
m_queueItems.push_back(item);
}

// Check if item can be dequeued.
// Since only one item can be installed at a time currently the logic is very simple,
// and can just check if the first item is ready to run. This logic will need to become
// more complicated if multiple operation types (e.g. Download & Install) are added that can
// run simultaneously.
if (item->GetState() != OrchestratorQueueItemState::Queued)
// Add the package to the Installing source so that it can be queried using the Source interface.
// Only do this the first time the item is queued.
if (item->IsOnFirstCommand())
{
return {};
ContextOrchestrator::Instance().AddItemManifestToInstallingSource(*item);
}

// Running state must be set inside the queueLock so that multiple threads don't try to run the same item.
item->SetState(OrchestratorQueueItemState::Running);
return item;
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };
item->SetState(OrchestratorQueueItemState::Queued);
}
}

OrchestratorQueue::OrchestratorQueue(std::string_view commandName, UINT32 allowedThreads) :
m_commandName(commandName), m_allowedThreads(allowedThreads)
{
m_threadPool.reset(CreateThreadpool(nullptr));
THROW_LAST_ERROR_IF_NULL(m_threadPool);
m_threadPoolCleanupGroup.reset(CreateThreadpoolCleanupGroup());
THROW_LAST_ERROR_IF_NULL(m_threadPoolCleanupGroup);
InitializeThreadpoolEnvironment(&m_threadPoolCallbackEnviron);
SetThreadpoolCallbackPool(&m_threadPoolCallbackEnviron, m_threadPool.get());
SetThreadpoolCallbackCleanupGroup(&m_threadPoolCallbackEnviron, m_threadPoolCleanupGroup.get(), nullptr);

THROW_LAST_ERROR_IF(!SetThreadpoolThreadMinimum(m_threadPool.get(), 1));
SetThreadpoolThreadMaximum(m_threadPool.get(), m_allowedThreads);
}

OrchestratorQueue::~OrchestratorQueue()
{
CloseThreadpoolCleanupGroupMembers(m_threadPoolCleanupGroup.get(), false, nullptr);
}

void OrchestratorQueue::EnqueueAndRunItem(std::shared_ptr<OrchestratorQueueItem> item)
{
EnqueueItem(item);

item->SetCurrentQueue(this);
auto work = CreateThreadpoolWork(OrchestratorQueueWorkCallback, item.get(), &m_threadPoolCallbackEnviron);
SubmitThreadpoolWork(work);
}

void ContextOrchestrator::RunItems()
void OrchestratorQueue::RunItem(const OrchestratorQueueItemId& itemId)
{
std::shared_ptr<OrchestratorQueueItem> item = GetNextItem();
while(item != nullptr)
try
{
std::shared_ptr<OrchestratorQueueItem> item;
bool isCancelled = false;

// Try to find the item in the queue.
{
std::lock_guard<std::mutex> lockQueue{ m_queueLock };
item = FindById(itemId);

if (!item)
{
// Item should be in the queue; this shouldn't happen.
return;
}

// Only run if the item is queued and not cancelled.
if (item->GetState() == OrchestratorQueueItemState::Queued)
{
// Mark it as running so that it cannot be cancelled by other threads.
item->SetState(OrchestratorQueueItemState::Running);
}
else if (item->GetState() == OrchestratorQueueItemState::Cancelled)
{
isCancelled = true;
}
}

if (isCancelled)
{
// Do this separate from above block as the Remove function needs to manage the lock.
RemoveItemInState(*item, OrchestratorQueueItemState::Cancelled, true);
}

// Get the item's command and execute it.
HRESULT terminationHR = S_OK;
try
{
Expand Down Expand Up @@ -133,21 +253,24 @@ namespace AppInstaller::CLI::Execution

if (FAILED(terminationHR) || item->IsComplete())
{
RemoveItemInState(*item, OrchestratorQueueItemState::Running);
RemoveItemInState(*item, OrchestratorQueueItemState::Running, true);
}
else
{
RequeueItem(*item);
// Remove item from this queue and add it to the queue for the next command.
RemoveItemInState(*item, OrchestratorQueueItemState::Running, false);
ContextOrchestrator::Instance().EnqueueAndRunItem(item);
}

item = GetNextItem();
}
catch (...)
{
}
}

void ContextOrchestrator::RemoveItemInState(const OrchestratorQueueItem& item, OrchestratorQueueItemState state)
bool OrchestratorQueue::RemoveItemInState(const OrchestratorQueueItem& item, OrchestratorQueueItemState state, bool isGlobalRemove)
{
// OrchestratorQueueItemState::Running items should only be removed by the thread that ran the item.
// Queued items can be removed by any thread.
// Queued items can be removed by any thread.
// NotQueued items should not be removed since, if found in the queue, they are in the process of being queued by another thread.
bool foundItem = false;

Expand All @@ -159,32 +282,29 @@ namespace AppInstaller::CLI::Execution
if (itr != m_queueItems.end() && (*itr)->GetState() == state)
{
foundItem = true;
m_queueItems.erase(itr);

// The item must only be removed from the queue by the thread that runs
// it, because the callback uses it. If any other thread tries to remove
// it, we simply mark it as cancelled.
if (state == OrchestratorQueueItemState::Running || state == OrchestratorQueueItemState::Cancelled)
{
(*itr)->SetCurrentQueue(nullptr);
m_queueItems.erase(itr);
}
else if (state == OrchestratorQueueItemState::Queued)
{
(*itr)->SetState(OrchestratorQueueItemState::Cancelled);
}
}
}

if (foundItem)
if (foundItem && isGlobalRemove)
{
const auto& manifest = item.GetContext().Get<Execution::Data::Manifest>();
m_installingWriteableSource.RemovePackageVersion(manifest, std::filesystem::path{ manifest.Id + '.' + manifest.Version });

ContextOrchestrator::Instance().RemoveItemManifestFromInstallingSource(item);
item.GetCompletedEvent().SetEvent();
}
}

void ContextOrchestrator::CancelQueueItem(const OrchestratorQueueItem& item)
{
// Always cancel the item, even if it isn't running yet, to get the terminationHR set correctly.
item.GetContext().Cancel(false, true);

RemoveItemInState(item, OrchestratorQueueItemState::Queued);
}

std::shared_ptr<OrchestratorQueueItem> ContextOrchestrator::GetQueueItem(const OrchestratorQueueItemId& queueItemId)
{
std::lock_guard<std::mutex> lock{ m_queueLock };

return FindById(queueItemId);
return foundItem;
}

bool OrchestratorQueueItemId::IsSame(const OrchestratorQueueItemId& comparedId) const
Expand All @@ -200,5 +320,4 @@ namespace AppInstaller::CLI::Execution
item->AddCommand(std::make_unique<::AppInstaller::CLI::COMInstallCommand>(RootCommand::CommandName));
return item;
}

}
Loading

0 comments on commit 39034e1

Please sign in to comment.