Skip to content

Commit

Permalink
Merge pull request #124 from hsel-netsys/protobuf-changes
Browse files Browse the repository at this point in the history
Extend protobuf interface with idle time
  • Loading branch information
JKRhb authored Feb 11, 2025
2 parents 660d1c1 + 5533407 commit 564a960
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 25 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ set(CMAKE_CXX_STANDARD 20)

option(USE_GRPC "Enable gRPC support" ON)

find_package(Boost)
find_package(Boost CONFIG)
find_package(PkgConfig)
find_package(OpenSSL REQUIRED)

Expand Down
2 changes: 1 addition & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
project(${CMAKE_PROJECT_NAME})

find_package(yaml-cpp REQUIRED)
find_package(Boost)
find_package(Boost CONFIG)
find_package(PkgConfig)
pkg_check_modules(NDN_CXX REQUIRED libndn-cxx)
pkg_check_modules(SVS REQUIRED libndn-svs)
Expand Down
8 changes: 7 additions & 1 deletion include/iceflow/consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <vector>

#include "congestion-reporter.hpp"
#include "stats.hpp"

namespace iceflow {

Expand All @@ -50,7 +51,7 @@ class IceflowConsumer {

std::vector<u_int32_t> getPartitions();

u_int32_t getConsumptionStats();
EdgeConsumptionStats getConsumptionStats();

private:
void validatePartitionConfiguration(uint32_t numberOfPartitions,
Expand Down Expand Up @@ -82,6 +83,9 @@ class IceflowConsumer {

void reportCongestion(CongestionReason congestionReason);

uint64_t determineIdleTime(
std::chrono::time_point<std::chrono::steady_clock> referenceTimepoint);

private:
const std::weak_ptr<ndn::svs::SVSPubSub> m_svsPubSub;
const std::string m_subTopic;
Expand All @@ -101,6 +105,8 @@ class IceflowConsumer {
std::optional<std::shared_ptr<CongestionReporter>> m_congestionReporter;

const std::string &m_upstreamEdgeName;

std::chrono::time_point<std::chrono::steady_clock> m_idleSince;
};
} // namespace iceflow

Expand Down
6 changes: 1 addition & 5 deletions include/iceflow/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define ICEFLOW_NODE_EXECUTOR_HPP

#include "iceflow.hpp"
#include "stats.hpp"

#include "node-executor.grpc.pb.h"
#include "node-instance.grpc.pb.h"
Expand All @@ -28,11 +29,6 @@

namespace iceflow {

struct EdgeStats {
std::optional<uint64_t> produced;
std::optional<uint64_t> consumed;
};

class IceflowExecutor : public std::enable_shared_from_this<IceflowExecutor> {
public:
IceflowExecutor(const std::string &serverAddress,
Expand Down
4 changes: 2 additions & 2 deletions include/iceflow/iceflow.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ class IceFlow {
void repartitionProducer(const std::string &downstreamEdgeName,
uint64_t numberOfPartitions);

std::unordered_map<std::string, uint32_t> getConsumerStats();
std::unordered_map<std::string, EdgeConsumptionStats> getConsumerStats();

std::unordered_map<std::string, uint32_t> getProducerStats();
std::unordered_map<std::string, EdgeProductionStats> getProducerStats();

void reportCongestion(const std::string &edgeName,
CongestionReason congestionReason);
Expand Down
8 changes: 7 additions & 1 deletion include/iceflow/producer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#endif // USE_GRPC

#include "congestion-reporter.hpp"
#include "stats.hpp"

namespace iceflow {

Expand All @@ -57,7 +58,7 @@ class IceflowProducer {

void setTopicPartitions(uint64_t numberOfPartitions);

uint32_t getProductionStats();
EdgeProductionStats getProductionStats();

private:
uint32_t getNextPartitionNumber();
Expand All @@ -71,6 +72,9 @@ class IceflowProducer {

ndn::Name prepareDataName(uint32_t partitionNumber);

uint64_t determineIdleTime(
std::chrono::time_point<std::chrono::steady_clock> referenceTimepoint);

private:
const std::weak_ptr<ndn::svs::SVSPubSub> m_svsPubSub;
const std::string m_pubTopic;
Expand All @@ -91,6 +95,8 @@ class IceflowProducer {
std::chrono::seconds m_maxProductionTimestampAge = std::chrono::seconds(1);

const std::string &m_downstreamEdgeName;

std::chrono::time_point<std::chrono::steady_clock> m_idleSince;
};
} // namespace iceflow

Expand Down
43 changes: 43 additions & 0 deletions include/iceflow/stats.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2025 The IceFlow Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef ICEFLOW_STATS_HPP
#define ICEFLOW_STATS_HPP

#include <optional>

namespace iceflow {

struct EdgeProductionStats {
uint64_t unitsProduced;
uint64_t idleTime;
};

struct EdgeConsumptionStats {
uint64_t unitsConsumed;
uint64_t idleTime;
};

struct EdgeStats {
std::optional<EdgeProductionStats> productionStats;
std::optional<EdgeConsumptionStats> consumptionStats;
};

} // namespace iceflow

#endif // ICEFLOW_STATS_HPP
2 changes: 2 additions & 0 deletions proto/node-instance.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ message StatsResponse {
message ProductionStats {
string edge_name = 1;
uint32 units_produced = 2;
uint64 idle_time = 3;
}
message ConsumptionStats {
string edge_name = 1;
uint32 units_consumed = 2;
uint64 idle_time = 3;
}
repeated ProductionStats production_stats = 1;
repeated ConsumptionStats consumption_stats = 2;
Expand Down
18 changes: 15 additions & 3 deletions src/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@ IceflowConsumer::IceflowConsumer(
std::optional<std::shared_ptr<CongestionReporter>> congestionReporter)
: m_svsPubSub(svsPubSub), m_subTopic(syncPrefix + "/" + upstreamEdgeName),
m_upstreamEdgeName(upstreamEdgeName),
m_congestionReporter(congestionReporter){};
m_congestionReporter(congestionReporter) {
m_idleSince = std::chrono::steady_clock::now();
};

IceflowConsumer::~IceflowConsumer() { unsubscribeFromAllPartitions(); }

void IceflowConsumer::saveTimestamp() {
auto timestamp = std::chrono::steady_clock::now();

m_consumptionTimestamps.push_back(timestamp);
m_idleSince = timestamp;

cleanUpTimestamps(timestamp);
}
Expand Down Expand Up @@ -74,12 +77,21 @@ bool IceflowConsumer::repartition(std::vector<uint32_t> partitions) {
return true;
}

uint32_t IceflowConsumer::getConsumptionStats() {
uint64_t IceflowConsumer::determineIdleTime(
std::chrono::time_point<std::chrono::steady_clock> referenceTimepoint) {
return static_cast<uint64_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(referenceTimepoint -
m_idleSince)
.count());
}

EdgeConsumptionStats IceflowConsumer::getConsumptionStats() {
auto referenceTimestamp = std::chrono::steady_clock::now();
auto idleTime = determineIdleTime(referenceTimestamp);

cleanUpTimestamps(referenceTimestamp);

return m_consumptionTimestamps.size();
return EdgeConsumptionStats{m_consumptionTimestamps.size(), idleTime};
}

void IceflowConsumer::subscribeCallBack(
Expand Down
14 changes: 11 additions & 3 deletions src/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,27 @@ std::unordered_map<std::string, EdgeStats> IceflowExecutor::queryEdgeStats() {
for (auto productionStat : productionStats) {
auto edgeName = productionStat.edge_name();
auto unitsProduced = productionStat.units_produced();
auto idleTime = productionStat.idle_time();

auto edgeStats = EdgeStats{std::optional(unitsProduced), std::nullopt};
auto internalProductionStats =
std::optional{EdgeProductionStats{unitsProduced, idleTime}};

auto edgeStats = EdgeStats{internalProductionStats, std::nullopt};
result[edgeName] = edgeStats;
}

for (auto consumptionStat : consumptionStats) {
auto edgeName = consumptionStat.edge_name();
auto unitsConsumed = consumptionStat.units_consumed();
auto idleTime = consumptionStat.idle_time();

auto internalConsumptionStats =
std::optional{EdgeConsumptionStats{unitsConsumed, idleTime}};

if (result.contains(edgeName)) {
result[edgeName].consumed = unitsConsumed;
result[edgeName].consumptionStats = internalConsumptionStats;
} else {
auto edgeStats = EdgeStats{std::nullopt, std::optional(unitsConsumed)};
auto edgeStats = EdgeStats{std::nullopt, internalConsumptionStats};
result[edgeName] = edgeStats;
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/iceflow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ const std::string &IceFlow::getNodePrefix() { return m_nodePrefix; }

const std::string &IceFlow::getSyncPrefix() { return m_syncPrefix; }

std::unordered_map<std::string, uint32_t> IceFlow::getConsumerStats() {
auto result = std::unordered_map<std::string, uint32_t>();
std::unordered_map<std::string, EdgeConsumptionStats>
IceFlow::getConsumerStats() {
auto result = std::unordered_map<std::string, EdgeConsumptionStats>();

for (auto consumer : m_iceflowConsumers) {
result.emplace(consumer.first, consumer.second.getConsumptionStats());
Expand All @@ -212,8 +213,9 @@ std::unordered_map<std::string, uint32_t> IceFlow::getConsumerStats() {
return result;
}

std::unordered_map<std::string, uint32_t> IceFlow::getProducerStats() {
auto result = std::unordered_map<std::string, uint32_t>();
std::unordered_map<std::string, EdgeProductionStats>
IceFlow::getProducerStats() {
auto result = std::unordered_map<std::string, EdgeProductionStats>();

for (auto producer : m_iceflowProducers) {
result.emplace(producer.first, producer.second.getProductionStats());
Expand Down
13 changes: 11 additions & 2 deletions src/producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,21 @@ void IceflowProducer::setTopicPartitions(uint64_t numberOfPartitions) {
}
}

uint32_t IceflowProducer::getProductionStats() {
uint64_t IceflowProducer::determineIdleTime(
std::chrono::time_point<std::chrono::steady_clock> referenceTimepoint) {
return static_cast<uint64_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(referenceTimepoint -
m_idleSince)
.count());
}

EdgeProductionStats IceflowProducer::getProductionStats() {
auto referenceTimestamp = std::chrono::steady_clock::now();
auto idleTime = determineIdleTime(referenceTimestamp);

cleanUpTimestamps(referenceTimestamp);

return m_productionTimestamps.size();
return EdgeProductionStats{m_productionTimestamps.size(), idleTime};
}

uint32_t IceflowProducer::getNextPartitionNumber() {
Expand Down
6 changes: 4 additions & 2 deletions src/services/node-instance-service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ grpc::Status NodeInstanceService::QueryStats(grpc::ServerContext *context,
for (auto consumerStat : consumerStats) {
auto consumptionStats = response->add_consumption_stats();
consumptionStats->set_edge_name(consumerStat.first);
consumptionStats->set_units_consumed(consumerStat.second);
consumptionStats->set_units_consumed(consumerStat.second.unitsConsumed);
consumptionStats->set_idle_time(consumerStat.second.idleTime);
}

for (auto producerStat : producerStats) {
auto productionStats = response->add_production_stats();
productionStats->set_edge_name(producerStat.first);
productionStats->set_units_produced(producerStat.second);
productionStats->set_units_produced(producerStat.second.unitsProduced);
productionStats->set_idle_time(producerStat.second.idleTime);
}

return grpc::Status::OK;
Expand Down

0 comments on commit 564a960

Please sign in to comment.