Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Serialize downloads #1031

Merged
merged 6 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/libaktualizr/package_manager/debianmanager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ TEST(PackageManagerFactory, Debian_Install_Good) {
storage->savePrimaryInstalledVersion(target_test, InstalledVersionUpdateMode::kCurrent);
std::unique_ptr<StorageTargetWHandle> fhandle = storage->allocateTargetFile(false, target);
std::stringstream("ab") >> *fhandle;
fhandle->wcommit();

EXPECT_EQ(pacman->install(target).first, data::UpdateResultCode::kOk);
EXPECT_EQ(pacman->getCurrent(), target);
Expand All @@ -54,6 +55,7 @@ TEST(PackageManagerFactory, Debian_Install_Bad) {

std::unique_ptr<StorageTargetWHandle> fhandle = storage->allocateTargetFile(false, target);
std::stringstream("ab") >> *fhandle;
fhandle->wcommit();

auto result = pacman->install(target);
EXPECT_EQ(result.first, data::UpdateResultCode::kInstallFailed);
Expand Down
11 changes: 5 additions & 6 deletions src/libaktualizr/primary/sotauptaneclient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,6 @@ DownloadResult SotaUptaneClient::downloadImages(const std::vector<Uptane::Target
// Uptane step 4 - download all the images and verify them against the metadata (for OSTree - pull without
// deploying)
std::lock_guard<std::mutex> guard(download_mutex);
std::vector<std::future<std::pair<bool, Uptane::Target>>> download_futures;
DownloadResult result;
std::vector<Uptane::Target> downloaded_targets;
for (auto it = targets.cbegin(); it != targets.cend(); ++it) {
Expand All @@ -729,14 +728,14 @@ DownloadResult SotaUptaneClient::downloadImages(const std::vector<Uptane::Target
sendEvent<event::AllDownloadsComplete>(result);
return result;
}
download_futures.push_back(std::async(std::launch::async, &SotaUptaneClient::downloadImage, this, *it));
}
for (auto &f : download_futures) {
auto fut_result = f.get();
if (fut_result.first) {
downloaded_targets.push_back(fut_result.second);
for (auto it = targets.cbegin(); it != targets.cend(); ++it) {
auto res = downloadImage(*it);
if (res.first) {
downloaded_targets.push_back(res.second);
}
}

if (!targets.empty()) {
if (targets.size() == downloaded_targets.size()) {
result = DownloadResult(downloaded_targets, DownloadStatus::kSuccess, "");
Expand Down
65 changes: 29 additions & 36 deletions src/libaktualizr/storage/sqlstorage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include "logging/logging.h"
#include "sql_utils.h"
#include "utilities/utils.h"
std::mutex sql_mutex;
boost::filesystem::path SQLStorage::dbPath() const { return config_.sqldb_path.get(config_.path); }

// find metadata with version set to -1 (e.g. after migration) and assign proper version to it
Expand Down Expand Up @@ -844,7 +843,6 @@ void SQLStorage::clearMisconfiguredEcus() {

void SQLStorage::saveInstalledVersion(const std::string& ecu_serial, const Uptane::Target& target,
InstalledVersionUpdateMode update_mode) {
std::lock_guard<std::mutex> lock(sql_mutex);
SQLite3Guard db = dbConnection();

if (!db.beginTransaction()) {
Expand Down Expand Up @@ -900,7 +898,6 @@ void SQLStorage::saveInstalledVersion(const std::string& ecu_serial, const Uptan

bool SQLStorage::loadInstalledVersions(const std::string& ecu_serial, std::vector<Uptane::Target>* installed_versions,
size_t* current_version, size_t* pending_version) {
std::lock_guard<std::mutex> lock(sql_mutex);
SQLite3Guard db = dbConnection();

// empty serial: use primary
Expand Down Expand Up @@ -1053,7 +1050,6 @@ void SQLStorage::clearInstallationResult() {
}

boost::optional<std::pair<int64_t, size_t>> SQLStorage::checkTargetFile(const Uptane::Target& target) const {
std::lock_guard<std::mutex> lock(sql_mutex);
SQLite3Guard db = dbConnection();

auto statement = db.prepareStatement<std::string>(
Expand Down Expand Up @@ -1101,9 +1097,8 @@ boost::optional<std::pair<int64_t, size_t>> SQLStorage::checkTargetFile(const Up
class SQLTargetWHandle : public StorageTargetWHandle {
public:
SQLTargetWHandle(const SQLStorage& storage, Uptane::Target target)
: db_(storage.dbPath()), target_(std::move(target)), closed_(false), blob_(nullptr), row_id_(0) {
: db_(storage.dbPath()), target_(std::move(target)), closed_(false), row_id_(0) {
StorageTargetWHandle::WriteError exc("could not save file " + target_.filename() + " to sql storage");
std::lock_guard<std::mutex> lock(sql_mutex);
if (!db_.beginTransaction()) {
throw exc;
}
Expand All @@ -1127,28 +1122,27 @@ class SQLTargetWHandle : public StorageTargetWHandle {
}

row_id_ = sqlite3_last_insert_rowid(db_.get());
db_.commitTransaction();

if (!db_.commitTransaction()) {
throw exc;
}
}

~SQLTargetWHandle() override {
if (!closed_) {
LOG_WARNING << "Handle for file " << target_.filename() << " has not been committed or aborted, forcing abort";
SQLTargetWHandle::wabort();
SQLTargetWHandle::wcommit();
}
}

size_t wfeed(const uint8_t* buf, size_t size) override {
std::lock_guard<std::mutex> lock(sql_mutex);
StorageTargetWHandle::WriteError exc("could not save file " + target_.filename() + " to sql storage");

if (!db_.beginTransaction()) {
throw exc;
}

if (sqlite3_blob_open(db_.get(), "main", "target_images", "image_data", row_id_, 1, &blob_) != SQLITE_OK) {
LOG_ERROR << "Could not open blob " << db_.errmsg();
wabort();
throw exc;
if (blob_ == nullptr) {
if (sqlite3_blob_open(db_.get(), "main", "target_images", "image_data", row_id_, 1, &blob_) != SQLITE_OK) {
LOG_ERROR << "Could not open blob " << db_.errmsg();
wabort();
throw exc;
}
}

if (sqlite3_blob_write(blob_, buf, static_cast<int>(size), static_cast<int>(written_size_)) != SQLITE_OK) {
Expand All @@ -1158,28 +1152,27 @@ class SQLTargetWHandle : public StorageTargetWHandle {
}
written_size_ += size;

auto statement = db_.prepareStatement<int64_t, int64_t>("update target_images SET real_size = ? where rowid = ?;",
static_cast<int64_t>(written_size_), row_id_);

int err = statement.step();
if (err != SQLITE_DONE) {
LOG_ERROR << "Could not save size in db: " << db_.errmsg();
throw exc;
}

wcommit();
db_.commitTransaction();
return size;
}

void wcommit() override {
if (blob_ != nullptr) {
sqlite3_blob_close(blob_);
blob_ = nullptr;
auto statement = db_.prepareStatement<int64_t, int64_t>("UPDATE target_images SET real_size = ? WHERE rowid = ?;",
static_cast<int64_t>(written_size_), row_id_);

int err = statement.step();
if (err != SQLITE_DONE) {
LOG_ERROR << "Could not save size in db: " << db_.errmsg();
throw StorageTargetWHandle::WriteError("could not update size of " + target_.filename() + " in sql storage");
}
}

closed_ = true;
sqlite3_blob_close(blob_);
blob_ = nullptr;
}

void wabort() noexcept override {
closed_ = true;
if (blob_ != nullptr) {
sqlite3_blob_close(blob_);
blob_ = nullptr;
Expand All @@ -1189,15 +1182,15 @@ class SQLTargetWHandle : public StorageTargetWHandle {
db_.prepareStatement<std::string>("DELETE FROM target_images WHERE filename=?;", target_.filename());
statement.step();
}
closed_ = true;
}

friend class SQLTargetRHandle;

private:
SQLTargetWHandle(const boost::filesystem::path& db_path, Uptane::Target target, const sqlite3_int64& row_id,
const size_t& start_from = 0)
: db_(db_path), target_(std::move(target)), closed_(false), blob_(nullptr), row_id_(row_id) {
std::lock_guard<std::mutex> lock(sql_mutex);

: db_(db_path), target_(std::move(target)), closed_(false), row_id_(row_id) {
if (db_.get_rc() != SQLITE_OK) {
LOG_ERROR << "Can't open database: " << db_.errmsg();
throw StorageTargetWHandle::WriteError("could not save file " + target_.filename() + " to sql storage");
Expand All @@ -1208,8 +1201,8 @@ class SQLTargetWHandle : public StorageTargetWHandle {
SQLite3Guard db_;
Uptane::Target target_;
bool closed_;
sqlite3_blob* blob_;
sqlite3_int64 row_id_;
sqlite3_blob* blob_{nullptr};
};

std::unique_ptr<StorageTargetWHandle> SQLStorage::allocateTargetFile(bool from_director, const Uptane::Target& target) {
Expand Down
25 changes: 21 additions & 4 deletions src/libaktualizr/storage/storage_common_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ TEST(storage, partial) {

Json::Value target_json;
target_json["hashes"]["sha256"] = "hash1";
target_json["length"] = 2;
target_json["length"] = 3;
Uptane::Target target("some.deb", target_json);

// write partial target
Expand All @@ -519,7 +519,6 @@ TEST(storage, partial) {
}

// read and check partial target

{
std::unique_ptr<StorageTargetRHandle> rhandle = storage->openTargetFile(target);
uint8_t rb[2] = {0};
Expand All @@ -530,18 +529,36 @@ TEST(storage, partial) {
EXPECT_STREQ(reinterpret_cast<char *>(rb), "a");
}

// Append partial
// Append without committing, should commit in whandle destructor
{
std::unique_ptr<StorageTargetWHandle> whandle = storage->openTargetFile(target)->toWriteHandle();
const uint8_t wb[] = "b";
whandle->wfeed(wb, 1);
}

// read and check partial target
{
std::unique_ptr<StorageTargetRHandle> rhandle = storage->openTargetFile(target);
uint8_t rb[3] = {0};
EXPECT_EQ(rhandle->rsize(), 2);
EXPECT_TRUE(rhandle->isPartial());
rhandle->rread(rb, 2);
rhandle->rclose();
EXPECT_STREQ(reinterpret_cast<char *>(rb), "ab");
}

// Append partial
{
std::unique_ptr<StorageTargetWHandle> whandle = storage->openTargetFile(target)->toWriteHandle();
const uint8_t wb[] = "c";
whandle->wfeed(wb, 1);
whandle->wcommit();
}

// Check full target
{
std::unique_ptr<StorageTargetRHandle> rhandle = storage->openTargetFile(target);
EXPECT_EQ(rhandle->rsize(), 2);
EXPECT_EQ(rhandle->rsize(), 3);
EXPECT_FALSE(rhandle->isPartial());
}
}
Expand Down
13 changes: 12 additions & 1 deletion src/libaktualizr/uptane/fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,25 @@ bool Fetcher::fetchVerifyTarget(const Target& target) {
throw Exception("image", "Could not download file, error: " + response.error_message);
}
if (pause_) {
std::lock_guard<std::mutex> lock(*pause_mutex_);
// entering pause, save the file and free the handler
ds.fhandle->wcommit();
fhandle.reset();

std::lock_guard<std::mutex> lock(*pause_mutex_); // waiting on this mutex while paused

// exiting pause, restore the file context
auto target_handle = storage->openTargetFile(target);
fhandle = target_handle->toWriteHandle();
ds.fhandle = fhandle.get();
retry = true;
}
} while (retry);

if (!target.MatchWith(Hash(ds.hash_type, ds.hasher().getHexDigest()))) {
ds.fhandle->wabort();
throw TargetHashMismatch(target.filename());
}
ds.fhandle->wcommit();
result = true;
} else {
#ifdef BUILD_OSTREE
Expand Down