Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1198 from advancedtelematic/refact/OTA-2487/ipsec…
Browse files Browse the repository at this point in the history
…ondaries-initialization

Refact/ota 2487/ipsecondaries initialization
  • Loading branch information
pattivacek authored May 6, 2019
2 parents 4f4818d + a7b825d commit cf9c980
Show file tree
Hide file tree
Showing 14 changed files with 314 additions and 75 deletions.
132 changes: 118 additions & 14 deletions src/aktualizr_primary/secondary.cc
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>

#include <unordered_map>
#include <unordered_set>

#include "ipuptanesecondary.h"
#include "secondary.h"
#include "secondary_config.h"

#include "ipuptanesecondary.h"

namespace Primary {

using SecondaryFactoryRegistry =
std::unordered_map<std::string, std::function<std::shared_ptr<Uptane::SecondaryInterface>(const SecondaryConfig&)>>;
using Secondaries = std::vector<std::shared_ptr<Uptane::SecondaryInterface>>;
using SecondaryFactoryRegistry = std::unordered_map<std::string, std::function<Secondaries(const SecondaryConfig&)>>;

static Secondaries createIPSecondaries(const IPSecondariesConfig& config);

static SecondaryFactoryRegistry sec_factory_registry = {
{IPSecondaryConfig::Type,
{IPSecondariesConfig::Type,
[](const SecondaryConfig& config) {
auto ip_sec_cgf = dynamic_cast<const IPSecondaryConfig&>(config);
return Uptane::IpUptaneSecondary::create(ip_sec_cgf.ip, ip_sec_cgf.port);
auto ip_sec_cgf = dynamic_cast<const IPSecondariesConfig&>(config);
return createIPSecondaries(ip_sec_cgf);
}},
// {
// Add another secondary factory here
// }
};

static std::shared_ptr<Uptane::SecondaryInterface> createSecondary(const SecondaryConfig& config) {
static Secondaries createSecondaries(const SecondaryConfig& config) {
return (sec_factory_registry.at(config.type()))(config);
}

Expand All @@ -34,18 +42,114 @@ void initSecondaries(Aktualizr& aktualizr, const boost::filesystem::path& config

for (auto& config : secondary_configs) {
try {
LOG_INFO << "Creating Secondary of type: " << config->type();
std::shared_ptr<Uptane::SecondaryInterface> secondary = createSecondary(*config);

LOG_INFO << "Adding Secondary to Aktualizr."
<< "HW_ID: " << secondary->getHwId() << " Serial: " << secondary->getSerial();
aktualizr.AddSecondary(secondary);
LOG_INFO << "Creating " << config->type() << " secondaries...";
Secondaries secondaries = createSecondaries(*config);

for (const auto& secondary : secondaries) {
LOG_INFO << "Adding Secondary to Aktualizr."
<< "HW_ID: " << secondary->getHwId() << " Serial: " << secondary->getSerial();
aktualizr.AddSecondary(secondary);
}
} catch (const std::exception& exc) {
LOG_ERROR << "Failed to initialize a secondary: " << exc.what();
LOG_ERROR << "Continue with initialization of the remaining secondaries, if any left.";
// otherwise rethrow the exception
}
}
}

class SecondaryWaiter {
public:
SecondaryWaiter(uint16_t wait_port, size_t wait_timeout, Secondaries& secondaries)
: endpoint_{boost::asio::ip::tcp::v4(), wait_port},
timeout_{static_cast<boost::posix_time::seconds>(wait_timeout)},
timer_{io_context_},
connected_secondaries_(secondaries) {}

void addSecoondary(const std::string& ip, uint16_t port) { secondaries_to_wait_for_.insert(key(ip, port)); }

void wait() {
if (secondaries_to_wait_for_.empty()) {
return;
}

timer_.expires_from_now(timeout_);
timer_.async_wait([&](const boost::system::error_code& error_code) {
if (error_code) {
LOG_ERROR << "Wait for secondaries has failed: " << error_code;
} else {
LOG_ERROR << "Timeout while waiting for secondaries: " << error_code;
}
io_context_.stop();
});
accept();
io_context_.run();
}

private:
void accept() {
LOG_INFO << "Waiting for connection from " << secondaries_to_wait_for_.size() << " secondaries...";
acceptor_.async_accept(con_socket_,
boost::bind(&SecondaryWaiter::connectionHdlr, this, boost::asio::placeholders::error));
}

void connectionHdlr(const boost::system::error_code& error_code) {
if (!error_code) {
auto sec_ip = con_socket_.remote_endpoint().address().to_string();
auto sec_port = con_socket_.remote_endpoint().port();

LOG_INFO << "Accepted connection from a secondary: (" << sec_ip << ":" << sec_port << ")";
try {
auto sec_creation_res = Uptane::IpUptaneSecondary::create(sec_ip, sec_port, con_socket_.native_handle());
if (sec_creation_res.first) {
connected_secondaries_.push_back(sec_creation_res.second);
}
} catch (const std::exception& exc) {
LOG_ERROR << "Failed to initialize a secondary: " << exc.what();
}
con_socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
con_socket_.close();

secondaries_to_wait_for_.erase(key(sec_ip, sec_port));
if (!secondaries_to_wait_for_.empty()) {
accept();
} else {
io_context_.stop();
}
} else {
LOG_ERROR << "Failed to accept connection from a secondary";
}
}

static std::string key(const std::string& ip, uint16_t port) { return (ip + std::to_string(port)); }

private:
boost::asio::io_service io_context_;
boost::asio::ip::tcp::endpoint endpoint_;
boost::asio::ip::tcp::acceptor acceptor_{io_context_, endpoint_};
boost::asio::ip::tcp::socket con_socket_{io_context_};
boost::posix_time::seconds timeout_;
boost::asio::deadline_timer timer_;

Secondaries& connected_secondaries_;
std::unordered_set<std::string> secondaries_to_wait_for_;
};

static Secondaries createIPSecondaries(const IPSecondariesConfig& config) {
Secondaries result;
SecondaryWaiter sec_waiter{config.secondaries_wait_port, config.secondaries_wait_timeout, result};

for (auto& ip_sec_cfg : config.secondaries_cfg) {
auto sec_creation_res = Uptane::IpUptaneSecondary::connectAndCreate(ip_sec_cfg.ip, ip_sec_cfg.port);
if (sec_creation_res.first) {
result.push_back(sec_creation_res.second);
} else {
sec_waiter.addSecoondary(ip_sec_cfg.ip, ip_sec_cfg.port);
}
}

sec_waiter.wait();
return result;
}

} // namespace Primary
46 changes: 33 additions & 13 deletions src/aktualizr_primary/secondary_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace Primary {

const char* const IPSecondaryConfig::Type = "ip";
const char* const IPSecondariesConfig::Type = "IP";

SecondaryConfigParser::Configs SecondaryConfigParser::parse_config_file(const boost::filesystem::path& config_file) {
if (!boost::filesystem::exists(config_file)) {
Expand All @@ -30,16 +30,24 @@ SecondaryConfigParser::Configs SecondaryConfigParser::parse_config_file(const bo
config file example
{
"ip": [
{"addr": "127.0.0.1:9031"},
{"addr": "127.0.0.1:9032"}
],
"socketcan": [
{"key": "value", "key1": "value1"},
{"key": "value", "key1": "value1"}
]
"IP": {
"secondaries_wait_port": 9040,
"secondaries_wait_timeout": 20,
"secondaries": [
{"addr": "127.0.0.1:9031"}
{"addr": "127.0.0.1:9032"}
]
},
"socketcan": {
"common-key": "common-value",
"secondaries": [
{"key": "value", "key1": "value1"},
{"key": "value", "key1": "value1"}
]
}
}
*/

JsonConfigParser::JsonConfigParser(const boost::filesystem::path& config_file) {
Expand Down Expand Up @@ -81,11 +89,23 @@ static std::pair<std::string, uint16_t> getIPAndPort(const std::string& addr) {
return std::make_pair(ip, port);
}

void JsonConfigParser::createIPSecondaryConfig(Configs& configs, Json::Value& ip_sec_cfgs) {
for (const auto& ip_sec_cfg : ip_sec_cfgs) {
auto addr = getIPAndPort(ip_sec_cfg[IPSecondaryConfig::AddrField].asString());
configs.emplace_back(std::make_shared<IPSecondaryConfig>(addr.first, addr.second));
void JsonConfigParser::createIPSecondariesCfg(Configs& configs, Json::Value& json_ip_sec_cfg) {
auto resultant_cfg = std::make_shared<IPSecondariesConfig>(
static_cast<uint16_t>(json_ip_sec_cfg[IPSecondariesConfig::PortField].asUInt()),
json_ip_sec_cfg[IPSecondariesConfig::TimeoutField].asUInt());
auto secondaries = json_ip_sec_cfg[IPSecondariesConfig::SecondariesField];

LOG_INFO << "Found IP secondaries config: " << *resultant_cfg;

for (const auto& secondary : secondaries) {
auto addr = getIPAndPort(secondary[IPSecondaryConfig::AddrField].asString());
IPSecondaryConfig sec_cfg{addr.first, addr.second};

LOG_INFO << " found IP secondary config: " << sec_cfg;
resultant_cfg->secondaries_cfg.push_back(sec_cfg);
}

configs.push_back(resultant_cfg);
}

} // namespace Primary
36 changes: 30 additions & 6 deletions src/aktualizr_primary/secondary_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,43 @@ class SecondaryConfig {
const char* const type_;
};

class IPSecondaryConfig : public SecondaryConfig {
class IPSecondaryConfig {
public:
static const char* const Type;
static constexpr const char* const AddrField{"addr"};

IPSecondaryConfig(std::string addr_ip, uint16_t addr_port)
: SecondaryConfig(Type), ip(std::move(addr_ip)), port(addr_port) {}
IPSecondaryConfig(std::string addr_ip, uint16_t addr_port) : ip(std::move(addr_ip)), port(addr_port) {}

friend std::ostream& operator<<(std::ostream& os, const IPSecondaryConfig& cfg) {
os << "(addr: " << cfg.ip << ":" << cfg.port << ")";
return os;
}

public:
const std::string ip;
const uint16_t port;
};

class IPSecondariesConfig : public SecondaryConfig {
public:
static const char* const Type;
static constexpr const char* const PortField{"secondaries_wait_port"};
static constexpr const char* const TimeoutField{"secondaries_wait_timeout"};
static constexpr const char* const SecondariesField{"secondaries"};

IPSecondariesConfig(uint16_t wait_port, size_t wait_timeout)
: SecondaryConfig(Type), secondaries_wait_port{wait_port}, secondaries_wait_timeout{wait_timeout} {}

friend std::ostream& operator<<(std::ostream& os, const IPSecondariesConfig& cfg) {
os << "(wait_port: " << cfg.secondaries_wait_port << " wait_timeout: " << cfg.secondaries_wait_timeout << ")";
return os;
}

public:
const uint16_t secondaries_wait_port;
const size_t secondaries_wait_timeout;
std::vector<IPSecondaryConfig> secondaries_cfg;
};

class SecondaryConfigParser {
public:
using Configs = std::vector<std::shared_ptr<SecondaryConfig>>;
Expand All @@ -48,14 +72,14 @@ class JsonConfigParser : public SecondaryConfigParser {
Configs parse() override;

private:
static void createIPSecondaryConfig(Configs& configs, Json::Value& ip_sec_cfgs);
static void createIPSecondariesCfg(Configs& configs, Json::Value& json_ip_sec_cfg);
// add here a factory method for another type of secondary config

private:
using SecondaryConfigFactoryRegistry = std::unordered_map<std::string, std::function<void(Configs&, Json::Value&)>>;

SecondaryConfigFactoryRegistry sec_cfg_factory_registry_ = {
{IPSecondaryConfig::Type, createIPSecondaryConfig}
{IPSecondariesConfig::Type, createIPSecondariesCfg}
// add here factory method for another type of secondary config
};

Expand Down
21 changes: 20 additions & 1 deletion src/aktualizr_secondary/aktualizr_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ AktualizrSecondary::AktualizrSecondary(const AktualizrSecondaryConfig& config,
}
}

void AktualizrSecondary::run() { socket_server_.Run(); }
void AktualizrSecondary::run() {
connectToPrimary();
socket_server_.Run();
}

void AktualizrSecondary::stop() { /* TODO? */
}
Expand Down Expand Up @@ -185,3 +188,19 @@ void AktualizrSecondary::extractCredentialsArchive(const std::string& archive, s
*treehub_server = Utils::readFileFromArchive(as, "server.url", true);
}
}

void AktualizrSecondary::connectToPrimary() {
Socket socket(config_.network.primary_ip, config_.network.primary_port);

if (socket.bind(config_.network.port) != 0) {
LOG_ERROR << "Failed to bind a connection socket to the secondary's port";
return;
}

if (socket.connect() == 0) {
LOG_INFO << "Connected to Primary, sending info about this secondary...";
socket_server_.HandleOneConnection(socket.getFD());
} else {
LOG_INFO << "Failed to connect to Primary";
}
}
3 changes: 3 additions & 0 deletions src/aktualizr_secondary/aktualizr_secondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class AktualizrSecondary : public AktualizrSecondaryInterface, private Aktualizr
static void extractCredentialsArchive(const std::string& archive, std::string* ca, std::string* cert,
std::string* pkey, std::string* treehub_server);

private:
void connectToPrimary();

private:
SocketServer socket_server_;
};
Expand Down
4 changes: 4 additions & 0 deletions src/aktualizr_secondary/aktualizr_secondary_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@

void AktualizrSecondaryNetConfig::updateFromPropertyTree(const boost::property_tree::ptree& pt) {
CopyFromConfig(port, "port", pt);
CopyFromConfig(primary_ip, "primary_ip", pt);
CopyFromConfig(primary_port, "primary_port", pt);
}

void AktualizrSecondaryNetConfig::writeToStream(std::ostream& out_stream) const {
writeOption(out_stream, port, "port");
writeOption(out_stream, primary_ip, "primary_ip");
writeOption(out_stream, primary_port, "primary_port");
}

void AktualizrSecondaryUptaneConfig::updateFromPropertyTree(const boost::property_tree::ptree& pt) {
Expand Down
2 changes: 2 additions & 0 deletions src/aktualizr_secondary/aktualizr_secondary_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

struct AktualizrSecondaryNetConfig {
in_port_t port{9030};
std::string primary_ip;
in_port_t primary_port{9030};

void updateFromPropertyTree(const boost::property_tree::ptree& pt);
void writeToStream(std::ostream& out_stream) const;
Expand Down
10 changes: 5 additions & 5 deletions src/aktualizr_secondary/socket_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
#include <sys/types.h>

void SocketServer::Run() {
if (listen(*socket_, SOMAXCONN) < 0) {
throw std::system_error(errno, std::system_category(), "listen");
}
LOG_INFO << "Listening on " << Utils::ipGetSockaddr(*socket_);

while (true) {
int con_fd;
sockaddr_storage peer_sa{};
Expand Down Expand Up @@ -178,10 +183,5 @@ SocketHandle SocketFromSystemdOrPort(in_port_t port) {
throw std::system_error(errno, std::system_category(), "bind");
}

if (listen(*hdl, SOMAXCONN) < 0) {
throw std::system_error(errno, std::system_category(), "listen");
}

LOG_INFO << "Listening on " << Utils::ipGetSockaddr(*hdl);
return hdl;
}
Loading

0 comments on commit cf9c980

Please sign in to comment.