diff --git a/libconfluo/CMakeLists.txt b/libconfluo/CMakeLists.txt index 0e275e655..a043de6b6 100644 --- a/libconfluo/CMakeLists.txt +++ b/libconfluo/CMakeLists.txt @@ -135,8 +135,6 @@ add_library(confluo STATIC src/aggregate/aggregate_manager.cc src/aggregate/aggregate_ops.cc src/compression/confluo_encoder.cc - src/conf/configuration_params.cc - src/conf/defaults.cc src/container/data_log.cc src/container/reflog.cc src/parser/aggregate_parser.cc @@ -188,7 +186,7 @@ add_library(confluo STATIC src/container/cursor/alert_cursor.cc src/container/cursor/offset_cursors.cc src/container/cursor/record_cursors.cc - src/conf/configuration_parser.cc) + ) target_link_libraries(confluo confluoutils) if (BUILD_TESTS) diff --git a/libconfluo/confluo/archival/monolog_linear_archiver.h b/libconfluo/confluo/archival/monolog_linear_archiver.h index a62eaa475..80a488dc5 100644 --- a/libconfluo/confluo/archival/monolog_linear_archiver.h +++ b/libconfluo/confluo/archival/monolog_linear_archiver.h @@ -36,7 +36,7 @@ class monolog_linear_archiver : public archiver { * @param log monolog to archive */ monolog_linear_archiver(const std::string &path, monolog *log) - : writer_(path, "monolog_linear", archival_configuration_params::MAX_FILE_SIZE), + : writer_(path, "monolog_linear", archival_configuration_params::MAX_FILE_SIZE()), archival_tail_(0), log_(log) { writer_.close(); @@ -80,15 +80,15 @@ class monolog_linear_archiver : public archiver { void archive_bucket(T *bucket) { auto metadata = ptr_metadata::get(bucket); auto encoded_bucket = confluo_encoder::encode(bucket, metadata->data_size_, - archival_configuration_params::DATA_LOG_ENCODING_TYPE); + archival_configuration_params::DATA_LOG_ENCODING_TYPE()); size_t enc_size = encoded_bucket.size(); auto off = writer_.append(metadata, 1, encoded_bucket.get(), enc_size); auto action = monolog_linear_archival_action(archival_tail_ + BUCKET_SIZE); writer_.commit(action); - ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::DATA_LOG_ENCODING_TYPE); - void *archived_bucket = ALLOCATOR.mmap(off.path(), off.offset(), enc_size, aux); + ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::DATA_LOG_ENCODING_TYPE()); + void *archived_bucket = ALLOCATOR.mmap(off.path(), static_cast(off.offset()), enc_size, aux); log_->data()[archival_tail_ / BUCKET_SIZE].swap_ptr(encoded_ptr(archived_bucket)); } @@ -116,8 +116,8 @@ class monolog_linear_load_utils { incremental_file_offset off = reader.tell(); size_t size = reader.read().data_size_; - ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::DATA_LOG_ENCODING_TYPE); - void *encoded_bucket = ALLOCATOR.mmap(off.path(), off.offset(), size, aux); + ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::DATA_LOG_ENCODING_TYPE()); + void *encoded_bucket = ALLOCATOR.mmap(off.path(), static_cast(off.offset()), size, aux); buckets[load_offset / BUCKET_SIZE].init_ptr(encoded_ptr(encoded_bucket)); log.reserve(BUCKET_SIZE); diff --git a/libconfluo/confluo/atomic_multilog.h b/libconfluo/confluo/atomic_multilog.h index 7b0ee2b7a..1edfac7bc 100644 --- a/libconfluo/confluo/atomic_multilog.h +++ b/libconfluo/confluo/atomic_multilog.h @@ -157,7 +157,7 @@ class atomic_multilog { * @param bucket_size The size of the bucket * @throw ex Management exception */ - void add_index(const std::string &field_name, double bucket_size = configuration_params::INDEX_BUCKET_SIZE); + void add_index(const std::string &field_name, double bucket_size = configuration_params::INDEX_BUCKET_SIZE()); /** * Removes index from the atomic multilog @@ -215,7 +215,7 @@ class atomic_multilog { */ void install_trigger(const std::string &name, const std::string &expr, - const uint64_t periodicity_ms = configuration_params::MONITOR_PERIODICITY_MS); + const uint64_t periodicity_ms = configuration_params::MONITOR_PERIODICITY_MS()); /** * Removes trigger from the atomic multilog diff --git a/libconfluo/confluo/conf/configuration_params.h b/libconfluo/confluo/conf/configuration_params.h index ca759952a..4e72a28bc 100644 --- a/libconfluo/confluo/conf/configuration_params.h +++ b/libconfluo/confluo/conf/configuration_params.h @@ -7,21 +7,51 @@ namespace confluo { +class conf { + public: + static utils::configuration_map &instance() { + static std::string + paths = utils::config_utils::read_from_env("CONFLUO_CONF", "/etc/conf/confluo.conf:./conf/confluo.conf"); + static utils::configuration_map confluo_conf(paths); + return confluo_conf; + } +}; + /** * Confluo archival configuration parameters */ class archival_configuration_params { public: - static uint64_t PERIODICITY_MS; - static uint64_t IN_MEMORY_DATALOG_WINDOW_BYTES; - static uint64_t IN_MEMORY_FILTER_WINDOW_NS; + static uint64_t PERIODICITY_MS() { + return conf::instance().get("archival_periodicity_ms", archival_defaults::DEFAULT_PERIODICITY_MS()); + } + + static uint64_t IN_MEMORY_DATALOG_WINDOW_BYTES() { + return conf::instance().get("archival_in_memory_datalog_window_bytes", + archival_defaults::DEFAULT_IN_MEMORY_DATALOG_WINDOW_BYTES()); + } + + static uint64_t IN_MEMORY_FILTER_WINDOW_NS() { + return conf::instance().get("archival_in_memory_filter_window_ns", + archival_defaults::DEFAULT_IN_MEMORY_FILTER_WINDOW_NS()); + } // Maximum archival file size in bytes. Cannot be smaller than a data log bucket. - static size_t MAX_FILE_SIZE; + static size_t MAX_FILE_SIZE() { + return conf::instance().get("max_archival_file_size", archival_defaults::DEFAULT_MAX_FILE_SIZE()); + } // Archival compression parameters; TODO parse types - static uint8_t DATA_LOG_ENCODING_TYPE; - static uint8_t REFLOG_ENCODING_TYPE; + static uint8_t DATA_LOG_ENCODING_TYPE() { + return configuration_parser::to_encoding_type( + conf::instance().get("data_log_archival_encoding", + archival_defaults::DEFAULT_DATA_LOG_ENCODING_TYPE())); + } + static uint8_t REFLOG_ENCODING_TYPE() { + return configuration_parser::to_encoding_type( + conf::instance().get("reflog_archival_encoding", + archival_defaults::DEFAULT_REFLOG_ENCODING_TYPE())); + } }; /** @@ -30,25 +60,36 @@ class archival_configuration_params { class configuration_params { public: /** Memory configuration parameters */ - static size_t MAX_MEMORY; + static size_t MAX_MEMORY() { + return conf::instance().get("max_memory", defaults::DEFAULT_MAX_MEMORY()); + } /** Thread configuration parameters */ - static int MAX_CONCURRENCY; + static int MAX_CONCURRENCY() { + return conf::instance().get("max_concurrency", defaults::HARDWARE_CONCURRENCY()); + } /** Index configuration parameters */ - static double INDEX_BUCKET_SIZE; + static double INDEX_BUCKET_SIZE() { + return conf::instance().get("index_block_size", defaults::DEFAULT_INDEX_BUCKET_SIZE()); + } /** Time resolution */ - static uint64_t TIME_RESOLUTION_NS; + static uint64_t TIME_RESOLUTION_NS() { + return conf::instance().get("time_resolution_ns", defaults::DEFAULT_TIME_RESOLUTION_NS()); + } /** Monitor configuration parameters */ - static uint64_t MONITOR_WINDOW_MS; + static uint64_t MONITOR_WINDOW_MS() { + return conf::instance().get("monitor_window_ms", defaults::DEFAULT_MONITOR_WINDOW_MS()); + } + /** Monitor periodicity in milliseconds */ - static uint64_t MONITOR_PERIODICITY_MS; + static uint64_t MONITOR_PERIODICITY_MS() { + return conf::instance().get("monitor_periodicity_ms", defaults::DEFAULT_MONITOR_PERIODICITY_MS()); + } }; -extern utils::configuration_map confluo_conf; - } #endif /* CONFLUO_CONF_CONFIGURATION_PARAMS_H_ */ diff --git a/libconfluo/confluo/conf/configuration_parser.h b/libconfluo/confluo/conf/configuration_parser.h index 95cb3a5b1..c8152627f 100644 --- a/libconfluo/confluo/conf/configuration_parser.h +++ b/libconfluo/confluo/conf/configuration_parser.h @@ -8,21 +8,26 @@ namespace confluo { class encoding_params { public: - static const std::string UNENCODED; - static const std::string LZ4; - static const std::string ELIAS_GAMMA; + static inline std::string UNENCODED() { + return "unencoded"; + } + + static inline std::string LZ4() { + return "lz4"; + } + static inline std::string ELIAS_GAMMA() { + return "elias_gamma"; + } }; class configuration_parser { - public: - static uint8_t to_encoding_type(const std::string ¶m) { - if (param == encoding_params::UNENCODED) { + if (param == encoding_params::UNENCODED()) { return storage::encoding_type::D_UNENCODED; - } else if (param == encoding_params::LZ4) { + } else if (param == encoding_params::LZ4()) { return storage::encoding_type::D_LZ4; - } else if (param == encoding_params::ELIAS_GAMMA) { + } else if (param == encoding_params::ELIAS_GAMMA()) { return storage::encoding_type::D_ELIAS_GAMMA; } else { THROW(illegal_state_exception, "Invalid encoding type!"); diff --git a/libconfluo/confluo/conf/defaults.h b/libconfluo/confluo/conf/defaults.h index 84c689952..20b4ea7c5 100644 --- a/libconfluo/confluo/conf/defaults.h +++ b/libconfluo/confluo/conf/defaults.h @@ -12,13 +12,30 @@ namespace confluo { */ class archival_defaults { public: - static const uint64_t DEFAULT_PERIODICITY_MS = static_cast(5 * 60 * 1e3); - static const size_t DEFAULT_MAX_FILE_SIZE = 1024 * 1024 * 1024; - static const std::string DEFAULT_DATA_LOG_ENCODING_TYPE; - static const std::string DEFAULT_REFLOG_ENCODING_TYPE; + static inline uint64_t DEFAULT_PERIODICITY_MS() { + return static_cast(5 * 60 * 1e3); + } + + static inline size_t DEFAULT_MAX_FILE_SIZE() { + return 1024 * 1024 * 1024; + } + + static inline std::string DEFAULT_DATA_LOG_ENCODING_TYPE() { + return encoding_params::LZ4(); + } + + static inline std::string DEFAULT_REFLOG_ENCODING_TYPE() { + return encoding_params::ELIAS_GAMMA(); + } + // TODO % of physical memory - static const uint64_t DEFAULT_IN_MEMORY_DATALOG_WINDOW_BYTES = static_cast(1e10); - static const uint64_t DEFAULT_IN_MEMORY_FILTER_WINDOW_NS = static_cast(10 * 1e3); + static inline uint64_t DEFAULT_IN_MEMORY_DATALOG_WINDOW_BYTES() { + return static_cast(1e10); + } + + static inline uint64_t DEFAULT_IN_MEMORY_FILTER_WINDOW_NS() { + return static_cast(10 * 1e3); + } }; /** @@ -27,19 +44,39 @@ class archival_defaults { class defaults { public: /** The thread hardware concurrency */ - static const int HARDWARE_CONCURRENCY; + static inline int HARDWARE_CONCURRENCY() { + return std::thread::hardware_concurrency(); + } + /** Default bucket size for index */ - static constexpr double DEFAULT_INDEX_BUCKET_SIZE = 1.0; + static inline double DEFAULT_INDEX_BUCKET_SIZE() { + return 1.0; + } + /** Default time resolution in nanoseconds */ - static constexpr uint64_t DEFAULT_TIME_RESOLUTION_NS = static_cast(1e6); + static inline uint64_t DEFAULT_TIME_RESOLUTION_NS() { + return static_cast(1e6); + } + /** Default maximum amount of memory */ - static constexpr size_t DEFAULT_MAX_MEMORY = static_cast(1e9); + static inline size_t DEFAULT_MAX_MEMORY() { + return static_cast(1e9); + } + /** Default memory monitor periodicity in milliseconds */ - static constexpr uint64_t DEFAULT_MEMORY_MONITOR_PERIODICITY_MS = 1; + static inline uint64_t DEFAULT_MEMORY_MONITOR_PERIODICITY_MS() { + return 1; + } + /** Default monitor window in milliseoncds */ - static constexpr uint64_t DEFAULT_MONITOR_WINDOW_MS = 10; + static inline uint64_t DEFAULT_MONITOR_WINDOW_MS() { + return 10; + } + /** Default periodicity for monitor in milliseconds */ - static constexpr uint64_t DEFAULT_MONITOR_PERIODICITY_MS = 1; + static inline uint64_t DEFAULT_MONITOR_PERIODICITY_MS() { + return 1; + } }; } diff --git a/libconfluo/confluo/container/monolog/monolog_linear_bucket.h b/libconfluo/confluo/container/monolog/monolog_linear_bucket.h index 5691a413a..65f4a9fab 100644 --- a/libconfluo/confluo/container/monolog/monolog_linear_bucket.h +++ b/libconfluo/confluo/container/monolog/monolog_linear_bucket.h @@ -87,7 +87,7 @@ class monolog_linear_bucket { * @return The storage size in bytes */ size_t storage_size() const { - if (data_.atomic_load() != nullptr) + if (data_.atomic_load().ptr() != nullptr) return (size_ + BUFFER_SIZE) * sizeof(T); return 0; } @@ -99,7 +99,8 @@ class monolog_linear_bucket { */ void flush(size_t offset, size_t len) { storage::encoded_ptr enc_ptr = data_.atomic_load(); - storage::STORAGE_FNS[mode_].flush(static_cast(enc_ptr.ptr()) + offset, len * sizeof(T)); + storage::storage_mode_functions::STORAGE_FNS()[mode_].flush(static_cast(enc_ptr.ptr()) + offset, + len * sizeof(T)); } /** @@ -251,7 +252,7 @@ class monolog_linear_bucket { block_state state = UNINIT; if (atomic::strong::cas(&state_, &state, INIT)) { size_t file_size = (size_ + BUFFER_SIZE) * sizeof(T); - void *data_ptr = storage::STORAGE_FNS[mode_].allocate_bucket(path_, file_size); + void *data_ptr = storage::storage_mode_functions::STORAGE_FNS()[mode_].allocate_bucket(path_, file_size); memset(data_ptr, '\0', sizeof(T) * file_size); storage::encoded_ptr enc_ptr(data_ptr); data_.atomic_init(enc_ptr); @@ -273,7 +274,7 @@ class monolog_linear_bucket { block_state state = UNINIT; if (atomic::strong::cas(&state_, &state, INIT)) { size_t file_size = (size_ + BUFFER_SIZE) * sizeof(T); - void *data_ptr = storage::STORAGE_FNS[mode_].allocate_bucket(path_, file_size); + void *data_ptr = storage::storage_mode_functions::STORAGE_FNS()[mode_].allocate_bucket(path_, file_size); memset(data_ptr, '\0', sizeof(T) * file_size); storage::encoded_ptr enc_ptr(data_ptr); data_.atomic_init(enc_ptr); diff --git a/libconfluo/confluo/container/reflog.h b/libconfluo/confluo/container/reflog.h index 9a5dea37e..4f07dda3b 100644 --- a/libconfluo/confluo/container/reflog.h +++ b/libconfluo/confluo/container/reflog.h @@ -19,10 +19,7 @@ class reflog_constants { * typedef for RefLog type -- a MonoLog of type uint64_t, * 18 bucket containers and bucket size of 1024. */ -typedef monolog_exp2_linear reflog; - +typedef monolog_exp2_linear reflog; typedef storage::read_only_encoded_ptr read_only_reflog_ptr; typedef storage::encoded_ptr encoded_reflog_ptr; typedef storage::decoded_ptr decoded_reflog_ptr; diff --git a/libconfluo/confluo/storage/storage.h b/libconfluo/confluo/storage/storage.h index a85a97b19..2d3058857 100644 --- a/libconfluo/confluo/storage/storage.h +++ b/libconfluo/confluo/storage/storage.h @@ -223,17 +223,20 @@ struct durable { } }; -/** Storage functionality for in memory mode */ -extern storage_functions IN_MEMORY_FNS; +class storage_mode_functions { + public: + /** Storage functionality for in memory mode */ + static storage_functions &IN_MEMORY_FNS(); -/** Storage functionality for durable relaxed mode */ -extern storage_functions DURABLE_RELAXED_FNS; + /** Storage functionality for durable relaxed mode */ + static storage_functions &DURABLE_RELAXED_FNS(); -/** Storage functionality for durable mode */ -extern storage_functions DURABLE_FNS; + /** Storage functionality for durable mode */ + static storage_functions &DURABLE_FNS(); -/** Contains the storage functions for all storage modes */ -extern storage_functions STORAGE_FNS[3]; + /** Contains the storage functions for all storage modes */ + static storage_functions *STORAGE_FNS(); +}; } } diff --git a/libconfluo/confluo/threads/thread_manager.h b/libconfluo/confluo/threads/thread_manager.h index cf901314d..7464a7d70 100644 --- a/libconfluo/confluo/threads/thread_manager.h +++ b/libconfluo/confluo/threads/thread_manager.h @@ -52,12 +52,6 @@ class thread_manager { */ static int get_max_concurrency(); - /** - * Sets the maximum number of threads to a new value - * @param max_concurrency The new maximum concurrency - */ - static void set_max_concurrency(int max_concurrency); - private: /** * Initializes info for each thread @@ -88,9 +82,10 @@ class thread_manager { static void unset(int i); /** The maximum amount of threads Confluo supports */ - static int MAX_CONCURRENCY; + static int MAX_CONCURRENCY(); + /** The thread info */ - static thread_info *THREAD_INFO; + static thread_info *THREAD_INFO(); }; } diff --git a/libconfluo/src/archival/filter_archiver.cc b/libconfluo/src/archival/filter_archiver.cc index 08cc196e7..ce0fc296c 100644 --- a/libconfluo/src/archival/filter_archiver.cc +++ b/libconfluo/src/archival/filter_archiver.cc @@ -5,8 +5,8 @@ namespace archival { filter_archiver::filter_archiver(const std::string &path, monitor::filter *filter) : filter_(filter), - refs_writer_(path, "filter_data", archival_configuration_params::MAX_FILE_SIZE), - aggs_writer_(path, "filter_aggs", archival_configuration_params::MAX_FILE_SIZE), + refs_writer_(path, "filter_data", archival_configuration_params::MAX_FILE_SIZE()), + aggs_writer_(path, "filter_aggs", archival_configuration_params::MAX_FILE_SIZE()), refs_tail_(0), ts_tail_(0) { refs_writer_.close(); @@ -21,8 +21,8 @@ void filter_archiver::archive(size_t offset) { auto &refs = *it; byte_string key = it.key(); ts_tail_ = key.template as(); - auto ts_tail_ns = ts_tail_ * configuration_params::TIME_RESOLUTION_NS; - if (time_utils::cur_ns() - ts_tail_ns < archival_configuration_params::IN_MEMORY_FILTER_WINDOW_NS) { + auto ts_tail_ns = ts_tail_ * configuration_params::TIME_RESOLUTION_NS(); + if (time_utils::cur_ns() - ts_tail_ns < archival_configuration_params::IN_MEMORY_FILTER_WINDOW_NS()) { break; } size_t data_log_archival_tail = archive_reflog(key, refs, offset); @@ -61,7 +61,7 @@ void filter_archiver::archive_bucket(byte_string key, reflog &refs, uint64_t *bu auto* metadata = ptr_metadata::get(bucket); size_t bucket_size = std::min(reflog_constants::BUCKET_SIZE, refs.size() - refs_tail_); auto encoded_bucket = confluo_encoder::encode(bucket, bucket_size * sizeof(uint64_t), - archival_configuration_params::REFLOG_ENCODING_TYPE); + archival_configuration_params::REFLOG_ENCODING_TYPE()); size_t enc_size = encoded_bucket.size(); auto archival_metadata = radix_tree_archival_metadata(key, refs_tail_, bucket_size); @@ -71,8 +71,8 @@ void filter_archiver::archive_bucket(byte_string key, reflog &refs, uint64_t *bu auto off = refs_writer_.append(metadata, 1, encoded_bucket.get(), enc_size); refs_writer_.commit(action.to_string()); - ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::REFLOG_ENCODING_TYPE); - void* archived_bucket = ALLOCATOR.mmap(off.path(), off.offset(), enc_size, aux); + ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::REFLOG_ENCODING_TYPE()); + void *archived_bucket = ALLOCATOR.mmap(off.path(), static_cast(off.offset()), enc_size, aux); archival_utils::swap_bucket_ptr(refs, refs_tail_, encoded_reflog_ptr(archived_bucket)); } @@ -113,7 +113,7 @@ size_t filter_load_utils::load_reflogs(const std::string &path, filter::idx_t &f size_t bucket_size = metadata.data_size_; auto *&refs = filter.get_or_create(cur_key); - ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::REFLOG_ENCODING_TYPE); + ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::REFLOG_ENCODING_TYPE()); void *encoded_bucket = ALLOCATOR.mmap(off.path(), static_cast(off.offset()), bucket_size, aux); init_bucket_ptr(refs, reflog_idx, encoded_reflog_ptr(encoded_bucket)); refs->reserve(archival_metadata.bucket_size()); diff --git a/libconfluo/src/archival/index_archiver.cc b/libconfluo/src/archival/index_archiver.cc index c26129732..be36cfcf8 100644 --- a/libconfluo/src/archival/index_archiver.cc +++ b/libconfluo/src/archival/index_archiver.cc @@ -7,7 +7,7 @@ index_archiver::index_archiver(const std::string &path, index::radix_index *inde : index_(index), reflog_tails_(), column_(column), - writer_(path, "index_data", archival_configuration_params::MAX_FILE_SIZE) { + writer_(path, "index_data", archival_configuration_params::MAX_FILE_SIZE()) { writer_.close(); } @@ -48,7 +48,7 @@ size_t index_archiver::archive_bucket(byte_string key, reflog &refs, size_t idx, auto metadata_copy = *(ptr_metadata::get(bucket)); size_t bucket_size = std::min(reflog_constants::BUCKET_SIZE, refs.size() - idx); auto raw_encoded_bucket = confluo_encoder::encode(bucket, bucket_size * sizeof(uint64_t), - archival_configuration_params::REFLOG_ENCODING_TYPE); + archival_configuration_params::REFLOG_ENCODING_TYPE()); size_t enc_size = raw_encoded_bucket.size(); metadata_copy.data_size_ = static_cast(enc_size); @@ -61,7 +61,7 @@ size_t index_archiver::archive_bucket(byte_string key, reflog &refs, size_t idx, // Only swap pointer for full buckets. if (bucket_size == reflog_constants::BUCKET_SIZE) { - ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::REFLOG_ENCODING_TYPE); + ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::REFLOG_ENCODING_TYPE()); void *enc_bucket = ALLOCATOR.mmap(off.path(), static_cast(off.offset()), enc_size, aux); archival_utils::swap_bucket_ptr(refs, idx, encoded_reflog_ptr(enc_bucket)); } @@ -85,7 +85,7 @@ size_t index_load_utils::load(const std::string &path, index::radix_index *index size_t bucket_size = metadata.data_size_; auto *&refs = index->get_or_create(cur_key); - ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::REFLOG_ENCODING_TYPE); + ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::REFLOG_ENCODING_TYPE()); void *encoded_bucket = ALLOCATOR.mmap(off.path(), static_cast(off.offset()), bucket_size, aux); init_bucket_ptr(refs, reflog_idx, encoded_reflog_ptr(encoded_bucket)); reader.advance(bucket_size); diff --git a/libconfluo/src/atomic_multilog.cc b/libconfluo/src/atomic_multilog.cc index a6b4b3b21..cb306e84f 100644 --- a/libconfluo/src/atomic_multilog.cc +++ b/libconfluo/src/atomic_multilog.cc @@ -24,10 +24,10 @@ atomic_multilog::atomic_multilog(const std::string &name, metadata_.write_storage_mode(s_mode); metadata_.write_archival_mode(a_mode); monitor_task_.start(std::bind(&atomic_multilog::monitor_task, this), - configuration_params::MONITOR_PERIODICITY_MS); + configuration_params::MONITOR_PERIODICITY_MS()); if (a_mode == archival_mode::ON) { archival_task_.start(std::bind(&atomic_multilog::archival_task, this), - archival_configuration_params::PERIODICITY_MS); + archival_configuration_params::PERIODICITY_MS()); } } @@ -57,10 +57,10 @@ atomic_multilog::atomic_multilog(const std::string &name, const std::string &pat rt_ = read_tail_type(path, s_mode); load(s_mode); monitor_task_.start(std::bind(&atomic_multilog::monitor_task, this), - configuration_params::MONITOR_PERIODICITY_MS); + configuration_params::MONITOR_PERIODICITY_MS()); if (a_mode == archival_mode::ON) { archival_task_.start(std::bind(&atomic_multilog::archival_task, this), - archival_configuration_params::PERIODICITY_MS); + archival_configuration_params::PERIODICITY_MS()); } } @@ -153,19 +153,19 @@ void atomic_multilog::remove_aggregate(const std::string &name) { } void atomic_multilog::install_trigger(const std::string &name, const std::string &expr, const uint64_t periodicity_ms) { - if (periodicity_ms < configuration_params::MONITOR_PERIODICITY_MS) { + if (periodicity_ms < configuration_params::MONITOR_PERIODICITY_MS()) { throw management_exception( "Trigger periodicity (" + std::to_string(periodicity_ms) + "ms) cannot be less than monitor periodicity (" - + std::to_string(configuration_params::MONITOR_PERIODICITY_MS) + + std::to_string(configuration_params::MONITOR_PERIODICITY_MS()) + "ms)"); } - if (periodicity_ms % configuration_params::MONITOR_PERIODICITY_MS != 0) { + if (periodicity_ms % configuration_params::MONITOR_PERIODICITY_MS() != 0) { throw management_exception( "Trigger periodicity (" + std::to_string(periodicity_ms) + "ms) must be a multiple of monitor periodicity (" - + std::to_string(configuration_params::MONITOR_PERIODICITY_MS) + + std::to_string(configuration_params::MONITOR_PERIODICITY_MS()) + "ms)"); } @@ -627,8 +627,8 @@ void atomic_multilog::remove_trigger_task(const std::string &name, optional ex; std::future ret = archival_pool_.submit([this] { - if (rt_.get() > archival_configuration_params::IN_MEMORY_DATALOG_WINDOW_BYTES) - archiver_.archive(rt_.get() - archival_configuration_params::IN_MEMORY_DATALOG_WINDOW_BYTES); + if (rt_.get() > archival_configuration_params::IN_MEMORY_DATALOG_WINDOW_BYTES()) + archiver_.archive(rt_.get() - archival_configuration_params::IN_MEMORY_DATALOG_WINDOW_BYTES()); }); ret.wait(); if (ex.has_value()) @@ -650,7 +650,7 @@ void atomic_multilog::monitor_task() { for (size_t tid = 0; tid < ntriggers; tid++) { trigger *t = a->get_trigger(tid); if (t->is_valid() && cur_ms % t->periodicity_ms() == 0) { - for (uint64_t ms = cur_ms - configuration_params::MONITOR_WINDOW_MS; ms <= cur_ms; ms++) { + for (uint64_t ms = cur_ms - configuration_params::MONITOR_WINDOW_MS(); ms <= cur_ms; ms++) { if (ms % t->periodicity_ms() == 0) { check_time_bucket(f, t, tid, cur_ms, version); } diff --git a/libconfluo/src/conf/configuration_params.cc b/libconfluo/src/conf/configuration_params.cc deleted file mode 100644 index 6ca619c47..000000000 --- a/libconfluo/src/conf/configuration_params.cc +++ /dev/null @@ -1,38 +0,0 @@ -#include "conf/configuration_params.h" - -namespace confluo { - -utils::configuration_map - confluo_conf(utils::config_utils::read_from_env("CONFLUO_CONF", "/etc/conf/confluo.conf:./conf/confluo.conf")); - -uint64_t archival_configuration_params::PERIODICITY_MS = confluo_conf.get( - "archival_periodicity_ms", archival_defaults::DEFAULT_PERIODICITY_MS); -uint64_t archival_configuration_params::IN_MEMORY_DATALOG_WINDOW_BYTES = confluo_conf.get( - "archival_in_memory_datalog_window_bytes", archival_defaults::DEFAULT_IN_MEMORY_DATALOG_WINDOW_BYTES); -uint64_t archival_configuration_params::IN_MEMORY_FILTER_WINDOW_NS = confluo_conf.get( - "archival_in_memory_filter_window_ns", archival_defaults::DEFAULT_IN_MEMORY_FILTER_WINDOW_NS); -size_t archival_configuration_params::MAX_FILE_SIZE = confluo_conf.get( - "max_archival_file_size", archival_defaults::DEFAULT_MAX_FILE_SIZE); -uint8_t archival_configuration_params::DATA_LOG_ENCODING_TYPE = configuration_parser::to_encoding_type( - confluo_conf.get("data_log_archival_encoding", - archival_defaults::DEFAULT_DATA_LOG_ENCODING_TYPE)); -uint8_t archival_configuration_params::REFLOG_ENCODING_TYPE = configuration_parser::to_encoding_type( - confluo_conf.get("reflog_archival_encoding", - archival_defaults::DEFAULT_REFLOG_ENCODING_TYPE)); - -size_t configuration_params::MAX_MEMORY = confluo_conf.get( - "max_memory", defaults::DEFAULT_MAX_MEMORY); -int configuration_params::MAX_CONCURRENCY = confluo_conf.get( - "max_concurrency", defaults::HARDWARE_CONCURRENCY); -double configuration_params::INDEX_BUCKET_SIZE = confluo_conf.get( - "index_block_size", defaults::DEFAULT_INDEX_BUCKET_SIZE); -uint64_t configuration_params::TIME_RESOLUTION_NS = confluo_conf.get( - "time_resolution_ns", defaults::DEFAULT_TIME_RESOLUTION_NS); -uint64_t configuration_params::MEMORY_MONITOR_PERIODICITY_MS = confluo_conf.get( - "memory_monitor_periodicity_ms", defaults::DEFAULT_MEMORY_MONITOR_PERIODICITY_MS); -uint64_t configuration_params::MONITOR_WINDOW_MS = confluo_conf.get( - "monitor_window_ms", defaults::DEFAULT_MONITOR_WINDOW_MS); -uint64_t configuration_params::MONITOR_PERIODICITY_MS = confluo_conf.get( - "monitor_periodicity_ms", defaults::DEFAULT_MONITOR_PERIODICITY_MS); - -} diff --git a/libconfluo/src/conf/configuration_parser.cc b/libconfluo/src/conf/configuration_parser.cc deleted file mode 100644 index 05aaa39eb..000000000 --- a/libconfluo/src/conf/configuration_parser.cc +++ /dev/null @@ -1,9 +0,0 @@ -#include "conf/configuration_parser.h" - -namespace confluo { - -const std::string encoding_params::UNENCODED = "unencoded"; -const std::string encoding_params::LZ4 = "lz4"; -const std::string encoding_params::ELIAS_GAMMA = "elias_gamma"; - -} diff --git a/libconfluo/src/conf/defaults.cc b/libconfluo/src/conf/defaults.cc deleted file mode 100644 index 747ea620d..000000000 --- a/libconfluo/src/conf/defaults.cc +++ /dev/null @@ -1,20 +0,0 @@ -#include "conf/defaults.h" - -namespace confluo { - -const size_t archival_defaults::DEFAULT_MAX_FILE_SIZE; -const uint64_t archival_defaults::DEFAULT_PERIODICITY_MS; -const std::string archival_defaults::DEFAULT_DATA_LOG_ENCODING_TYPE = encoding_params::LZ4; -const std::string archival_defaults::DEFAULT_REFLOG_ENCODING_TYPE = encoding_params::ELIAS_GAMMA; -const uint64_t archival_defaults::DEFAULT_IN_MEMORY_DATALOG_WINDOW_BYTES; -const uint64_t archival_defaults::DEFAULT_IN_MEMORY_FILTER_WINDOW_NS; - -const int defaults::HARDWARE_CONCURRENCY = std::thread::hardware_concurrency(); -constexpr double defaults::DEFAULT_INDEX_BUCKET_SIZE; -constexpr size_t defaults::DEFAULT_MAX_MEMORY; -constexpr uint64_t defaults::DEFAULT_TIME_RESOLUTION_NS; -constexpr uint64_t defaults::DEFAULT_MEMORY_MONITOR_PERIODICITY_MS; -constexpr uint64_t defaults::DEFAULT_MONITOR_WINDOW_MS; -constexpr uint64_t defaults::DEFAULT_MONITOR_PERIODICITY_MS; - -} diff --git a/libconfluo/src/confluo_store.cc b/libconfluo/src/confluo_store.cc index c74a69f71..b2352a48e 100644 --- a/libconfluo/src/confluo_store.cc +++ b/libconfluo/src/confluo_store.cc @@ -84,7 +84,7 @@ int64_t confluo_store::remove_atomic_multilog(int64_t id) { } void confluo_store::memory_management_task() { - if (ALLOCATOR.memory_utilization() >= configuration_params::MAX_MEMORY) { + if (ALLOCATOR.memory_utilization() >= configuration_params::MAX_MEMORY()) { for (size_t id = 0; id < atomic_multilogs_.size(); id++) { // TODO how aggressively to archive and should multilogs with archival OFF be archived? atomic_multilogs_.get(id)->archive(); diff --git a/libconfluo/src/filter.cc b/libconfluo/src/filter.cc index 2e2166267..a5d8673c1 100644 --- a/libconfluo/src/filter.cc +++ b/libconfluo/src/filter.cc @@ -35,7 +35,7 @@ size_t filter::num_aggregates() const { void filter::update(const record_t &r) { if (exp_.test(r) && fn_(r)) { aggregated_reflog *refs = idx_.insert( - byte_string(r.timestamp() / configuration_params::TIME_RESOLUTION_NS), + byte_string(r.timestamp() / configuration_params::TIME_RESOLUTION_NS()), r.log_offset(), aggregates_); int tid = thread_manager::get_id(); for (size_t i = 0; i < refs->num_aggregates(); i++) { diff --git a/libconfluo/src/read_tail.cc b/libconfluo/src/read_tail.cc index e248460fb..e5330f611 100644 --- a/libconfluo/src/read_tail.cc +++ b/libconfluo/src/read_tail.cc @@ -13,7 +13,7 @@ read_tail::read_tail(const std::string &data_path, const storage::storage_mode & void read_tail::init(const std::string &data_path, const storage::storage_mode &mode) { mode_ = mode; - read_tail_ = (atomic::type *) storage::STORAGE_FNS[mode_].allocate( + read_tail_ = (atomic::type *) storage::storage_mode_functions::STORAGE_FNS()[mode_].allocate( data_path + "/read_tail", sizeof(uint64_t)); atomic::store(read_tail_, UINT64_C(0)); } @@ -28,7 +28,7 @@ void read_tail::advance(uint64_t old_tail, uint32_t bytes) { expected = old_tail; std::this_thread::yield(); } - storage::STORAGE_FNS[mode_].flush(read_tail_, sizeof(uint64_t)); + storage::storage_mode_functions::STORAGE_FNS()[mode_].flush(read_tail_, sizeof(uint64_t)); } } \ No newline at end of file diff --git a/libconfluo/src/storage/storage.cc b/libconfluo/src/storage/storage.cc index 731e0c39a..90d952bb8 100644 --- a/libconfluo/src/storage/storage.cc +++ b/libconfluo/src/storage/storage.cc @@ -3,21 +3,33 @@ namespace confluo { namespace storage { -/** Storage functionality for in memory mode */ -storage_functions IN_MEMORY_FNS = - {storage_mode::IN_MEMORY, in_memory::allocate, in_memory::allocate_bucket, in_memory::free_mem, in_memory::flush}; +storage_functions &storage_mode_functions::IN_MEMORY_FNS() { + static storage_functions fns + {storage_mode::IN_MEMORY, in_memory::allocate, in_memory::allocate_bucket, in_memory::free_mem, + in_memory::flush}; + return fns; +} -/** Storage functionality for durable relaxed mode */ -storage_functions DURABLE_RELAXED_FNS = - {storage_mode::DURABLE_RELAXED, durable_relaxed::allocate, durable_relaxed::allocate_bucket, durable_relaxed::free, - durable_relaxed::flush}; +storage_functions &storage_mode_functions::DURABLE_RELAXED_FNS() { + static storage_functions fns + {storage_mode::DURABLE_RELAXED, durable_relaxed::allocate, durable_relaxed::allocate_bucket, + durable_relaxed::free, + durable_relaxed::flush}; + return fns; +} -/** Storage functionality for durable mode */ -storage_functions - DURABLE_FNS = {storage_mode::DURABLE, durable::allocate, durable::allocate_bucket, durable::free, durable::flush}; +storage_functions &storage_mode_functions::DURABLE_FNS() { + static storage_functions + fns{storage_mode::DURABLE, durable::allocate, durable::allocate_bucket, durable::free, durable::flush}; + return fns; +} -/** Contains the storage functions for all storage modes */ -storage_functions STORAGE_FNS[3] = {IN_MEMORY_FNS, DURABLE_RELAXED_FNS, DURABLE_FNS}; +storage_functions *storage_mode_functions::STORAGE_FNS() { + static storage_functions fns[3] = + {storage_mode_functions::IN_MEMORY_FNS(), storage_mode_functions::DURABLE_RELAXED_FNS(), + storage_mode_functions::DURABLE_FNS()}; + return fns; +} } } \ No newline at end of file diff --git a/libconfluo/src/storage/storage_allocator.cc b/libconfluo/src/storage/storage_allocator.cc index fdb69938e..1436b2154 100644 --- a/libconfluo/src/storage/storage_allocator.cc +++ b/libconfluo/src/storage/storage_allocator.cc @@ -17,7 +17,7 @@ void storage_allocator::register_cleanup_callback(storage_allocator::callback_fn void *storage_allocator::alloc(size_t size, ptr_aux_block aux) { int retries = 0; - while (mem_stat_.get() >= configuration_params::MAX_MEMORY) { + while (mem_stat_.get() >= configuration_params::MAX_MEMORY()) { mem_cleanup_callback_(); if (retries > MAX_CLEANUP_RETRIES) THROW(memory_exception, "Max memory reached!"); diff --git a/libconfluo/src/threads/thread_manager.cc b/libconfluo/src/threads/thread_manager.cc index 0dd942586..88a131b3f 100644 --- a/libconfluo/src/threads/thread_manager.cc +++ b/libconfluo/src/threads/thread_manager.cc @@ -2,11 +2,6 @@ namespace confluo { -/** The max concurrecy is specified by the configuration parameters */ -int thread_manager::MAX_CONCURRENCY = configuration_params::MAX_CONCURRENCY; -/** Initializes the thread information */ -thread_info *thread_manager::THREAD_INFO = thread_manager::init_thread_info(); - int thread_manager::register_thread() { // De-register if already registered deregister_thread(); @@ -27,24 +22,20 @@ int thread_manager::get_id() { } int thread_manager::get_max_concurrency() { - return MAX_CONCURRENCY; -} - -void thread_manager::set_max_concurrency(int max_concurrency) { - MAX_CONCURRENCY = max_concurrency; + return MAX_CONCURRENCY(); } thread_info *thread_manager::init_thread_info() { - thread_info *tinfo = new thread_info[MAX_CONCURRENCY]; - for (int i = 0; i < MAX_CONCURRENCY; i++) + thread_info *tinfo = new thread_info[MAX_CONCURRENCY()]; + for (int i = 0; i < thread_manager::MAX_CONCURRENCY(); i++) atomic::init(&tinfo[i].valid, false); return tinfo; } int thread_manager::find() { auto tid = std::this_thread::get_id(); - for (int i = 0; i < MAX_CONCURRENCY; i++) { - if (atomic::load(&THREAD_INFO[i].valid) && THREAD_INFO[i].tid == tid) { + for (int i = 0; i < thread_manager::MAX_CONCURRENCY(); i++) { + if (atomic::load(&THREAD_INFO()[i].valid) && THREAD_INFO()[i].tid == tid) { return i; } } @@ -55,9 +46,9 @@ int thread_manager::find() { int thread_manager::set() { auto tid = std::this_thread::get_id(); bool expected = false; - for (int i = 0; i < MAX_CONCURRENCY; i++) { - if (atomic::strong::cas(&THREAD_INFO[i].valid, &expected, true)) { - THREAD_INFO[i].tid = tid; + for (int i = 0; i < thread_manager::MAX_CONCURRENCY(); i++) { + if (atomic::strong::cas(&THREAD_INFO()[i].valid, &expected, true)) { + THREAD_INFO()[i].tid = tid; return i; } expected = false; @@ -66,7 +57,17 @@ int thread_manager::set() { } void thread_manager::unset(int i) { - atomic::store(&THREAD_INFO[i].valid, false); + atomic::store(&THREAD_INFO()[i].valid, false); +} + +int thread_manager::MAX_CONCURRENCY() { + static int concurrency = configuration_params::MAX_CONCURRENCY(); + return concurrency; +} + +thread_info *thread_manager::THREAD_INFO() { + static thread_info *info = thread_manager::init_thread_info(); + return info; } } \ No newline at end of file diff --git a/libconfluo/test/aggregated_reflog_test.h b/libconfluo/test/aggregated_reflog_test.h index 7f0644e53..6f0d5e6e1 100644 --- a/libconfluo/test/aggregated_reflog_test.h +++ b/libconfluo/test/aggregated_reflog_test.h @@ -45,15 +45,10 @@ int64_t AggregatedReflogTest::cnt[11]; TEST_F(AggregatedReflogTest, GetSetTest) { aggregate_log log; - log.push_back( - new aggregate_info("agg1", aggregate_manager::get_aggregator("sum"), 0)); - log.push_back( - new aggregate_info("agg2", aggregate_manager::get_aggregator("min"), 0)); - log.push_back( - new aggregate_info("agg3", aggregate_manager::get_aggregator("max"), 0)); - log.push_back( - new aggregate_info("agg4", aggregate_manager::get_aggregator("count"), - 0)); + log.push_back(new aggregate_info("agg1", aggregate_manager::get_aggregator("sum"), 0)); + log.push_back(new aggregate_info("agg2", aggregate_manager::get_aggregator("min"), 0)); + log.push_back(new aggregate_info("agg3", aggregate_manager::get_aggregator("max"), 0)); + log.push_back(new aggregate_info("agg4", aggregate_manager::get_aggregator("count"), 0)); aggregated_reflog ar(log); ASSERT_TRUE(numeric(limits::double_zero) == ar.get_aggregate(0, 0)); @@ -63,27 +58,27 @@ TEST_F(AggregatedReflogTest, GetSetTest) { for (int i = 1; i <= 10; i++) { numeric value(i); - ar.seq_update_aggregate(0, 0, value, i * 2); + ar.seq_update_aggregate(0, 0, value, static_cast(i * 2)); for (int j = 0; j <= i; j++) ASSERT_TRUE(numeric(sum[j]) == ar.get_aggregate(0, j * 2)); } for (int i = 1; i <= 10; i++) { numeric value(10 - i); - ar.seq_update_aggregate(0, 1, value, i * 2); + ar.seq_update_aggregate(0, 1, value, static_cast(i * 2)); for (int j = 0; j <= i; j++) ASSERT_TRUE(numeric(min[j]) == ar.get_aggregate(1, j * 2)); } for (int i = 1; i <= 10; i++) { numeric value(i); - ar.seq_update_aggregate(0, 2, value, i * 2); + ar.seq_update_aggregate(0, 2, value, static_cast(i * 2)); for (int j = 0; j <= i; j++) ASSERT_TRUE(numeric(max[j]) == ar.get_aggregate(2, j * 2)); } for (int i = 1; i <= 10; i++) { - ar.seq_update_aggregate(0, 3, numeric(INT64_C(1)), i * 2); + ar.seq_update_aggregate(0, 3, numeric(INT64_C(1)), static_cast(i * 2)); for (int j = 0; j <= i; j++) ASSERT_TRUE(numeric(cnt[j]) == ar.get_aggregate(3, j * 2)); } @@ -107,15 +102,10 @@ TEST_F(AggregatedReflogTest, GetSetTest) { TEST_F(AggregatedReflogTest, MultiThreadedGetSetTest) { aggregate_log log; - log.push_back( - new aggregate_info("agg1", aggregate_manager::get_aggregator("sum"), 0)); - log.push_back( - new aggregate_info("agg2", aggregate_manager::get_aggregator("min"), 0)); - log.push_back( - new aggregate_info("agg3", aggregate_manager::get_aggregator("max"), 0)); - log.push_back( - new aggregate_info("agg4", aggregate_manager::get_aggregator("count"), - 0)); + log.push_back(new aggregate_info("agg1", aggregate_manager::get_aggregator("sum"), 0)); + log.push_back(new aggregate_info("agg2", aggregate_manager::get_aggregator("min"), 0)); + log.push_back(new aggregate_info("agg3", aggregate_manager::get_aggregator("max"), 0)); + log.push_back(new aggregate_info("agg4", aggregate_manager::get_aggregator("count"), 0)); aggregated_reflog ar(log); ASSERT_TRUE(numeric(limits::double_zero) == ar.get_aggregate(0, 0)); @@ -124,29 +114,29 @@ TEST_F(AggregatedReflogTest, MultiThreadedGetSetTest) { ASSERT_TRUE(numeric(limits::ulong_zero) == ar.get_aggregate(3, 0)); std::vector workers; - int max_i = std::min(10, defaults::HARDWARE_CONCURRENCY); + int max_i = std::min(10, defaults::HARDWARE_CONCURRENCY()); int max_j = max_i * 2; for (int i = 1; i <= max_i; i++) { workers.push_back(std::thread([i, &ar] { - ar.seq_update_aggregate(i - 1, 0, numeric(i), i * 2); + ar.seq_update_aggregate(i - 1, 0, numeric(i), static_cast(i * 2)); })); } for (int i = 1; i <= max_i; i++) { workers.push_back(std::thread([i, &ar] { - ar.seq_update_aggregate(i - 1, 1, numeric(10 - i), i * 2); + ar.seq_update_aggregate(i - 1, 1, numeric(10 - i), static_cast(i * 2)); })); } for (int i = 1; i <= max_i; i++) { workers.push_back(std::thread([i, &ar] { - ar.seq_update_aggregate(i - 1, 2, numeric(i), i * 2); + ar.seq_update_aggregate(i - 1, 2, numeric(i), static_cast(i * 2)); })); } for (int i = 1; i <= max_i; i++) { workers.push_back(std::thread([i, &ar] { - ar.seq_update_aggregate(i - 1, 3, numeric(INT64_C(1)), i * 2); + ar.seq_update_aggregate(i - 1, 3, numeric(INT64_C(1)), static_cast(i * 2)); })); } diff --git a/libconfluo/test/archival/index_load_test.h b/libconfluo/test/archival/index_load_test.h index 9e630de7b..c78405d75 100644 --- a/libconfluo/test/archival/index_load_test.h +++ b/libconfluo/test/archival/index_load_test.h @@ -90,7 +90,7 @@ TEST_F(IndexLoadTest, IndexLogLoadTest) { index_log indexes; index_id = indexes.push_back(&index); s[s.get_field_index("a")].set_indexing(); - s[s.get_field_index("a")].set_indexed(static_cast(index_id), configuration_params::INDEX_BUCKET_SIZE); + s[s.get_field_index("a")].set_indexed(static_cast(index_id), configuration_params::INDEX_BUCKET_SIZE()); archival::index_log_archiver archiver(path, &indexes, &s); archiver.archive(static_cast(1e6)); diff --git a/libconfluo/test/atomic_multilog_test.h b/libconfluo/test/atomic_multilog_test.h index 11f31f123..32a005bba 100644 --- a/libconfluo/test/atomic_multilog_test.h +++ b/libconfluo/test/atomic_multilog_test.h @@ -427,7 +427,7 @@ TEST_F(AtomicMultilogTest, RemoveFilterTriggerTest) { mlog.install_trigger("trigger2", "agg2 >= 10"); int64_t now_ns = time_utils::cur_ns(); - int64_t beg = now_ns / configuration_params::TIME_RESOLUTION_NS; + int64_t beg = now_ns / configuration_params::TIME_RESOLUTION_NS(); int64_t end = beg; mlog.append(record(now_ns, false, '0', 0, 0, 0, 0.0, 0.01, "abc")); mlog.append(record(now_ns, true, '1', 10, 2, 1, 0.1, 0.02, "defg")); @@ -514,7 +514,7 @@ TEST_F(AtomicMultilogTest, FilterAggregateTriggerTest) { sleep(1); int64_t now_ns = time_utils::cur_ns(); - int64_t beg = now_ns / configuration_params::TIME_RESOLUTION_NS; + int64_t beg = now_ns / configuration_params::TIME_RESOLUTION_NS(); int64_t end = beg; mlog.append(record(now_ns, false, '0', 0, 0, 0, 0.0, 0.01, "abc")); mlog.append(record(now_ns, true, '1', 10, 2, 1, 0.1, 0.02, "defg")); @@ -847,7 +847,7 @@ TEST_F(AtomicMultilogTest, BatchFilterAggregateTriggerTest) { mlog.install_trigger("trigger8", "agg8 >= 10"); int64_t now_ns = time_utils::cur_ns(); - int64_t beg = now_ns / configuration_params::TIME_RESOLUTION_NS; + int64_t beg = now_ns / configuration_params::TIME_RESOLUTION_NS(); int64_t end = beg; record_batch batch = build_batch(mlog, now_ns); diff --git a/libconfluo/test/threads/thread_manager_test.h b/libconfluo/test/threads/thread_manager_test.h index 37854cf08..bb5f616c8 100644 --- a/libconfluo/test/threads/thread_manager_test.h +++ b/libconfluo/test/threads/thread_manager_test.h @@ -23,7 +23,7 @@ TEST_F(ThreadManagerTest, MultiThreadedRegisterDeregisterTest) { threads.push_back(std::thread([] { int id = thread_manager::register_thread(); ASSERT_TRUE(-1 != id); - ASSERT_TRUE(id < defaults::HARDWARE_CONCURRENCY); + ASSERT_TRUE(id < defaults::HARDWARE_CONCURRENCY()); ASSERT_EQ(id, thread_manager::get_id()); ASSERT_EQ(id, thread_manager::deregister_thread()); })); diff --git a/librpc/CMakeLists.txt b/librpc/CMakeLists.txt index 8f9d509d7..7f0d219a0 100644 --- a/librpc/CMakeLists.txt +++ b/librpc/CMakeLists.txt @@ -33,17 +33,18 @@ add_executable(confluod src/rpc_types.cc src/rpc_alert_stream.cc src/rpc_client.cc - src/rpc_configuration_params.cc - src/rpc_defaults.cc src/rpc_record_batch_builder.cc src/rpc_record_stream.cc - src/rpc_type_conversions.cc) + src/rpc_type_conversions.cc + src/confluo_server.cc) target_link_libraries(confluod confluo ${CMAKE_THREAD_LIBS_INIT} thriftstatic ${lz4_STATIC_LIB} ${JEMALLOC_LIBRARIES}) add_dependencies(confluod thrift lz4 jemalloc) add_library(rpcclient STATIC rpc/rpc_constants.h src/rpc_constants.cc + rpc/rpc_server.h + src/rpc_server.cc rpc/rpc_service.h rpc/rpc_service.tcc src/rpc_service.cc @@ -60,9 +61,7 @@ add_library(rpcclient STATIC rpc/rpc_record_stream.h src/rpc_record_stream.cc rpc/rpc_configuration_params.h - src/rpc_configuration_params.cc - rpc/rpc_defaults.h - src/rpc_defaults.cc) + rpc/rpc_defaults.h) target_link_libraries(rpcclient thriftstatic) add_dependencies(rpcclient thrift) diff --git a/librpc/rpc/rpc_configuration_params.h b/librpc/rpc/rpc_configuration_params.h index fe64a987d..d481b4790 100644 --- a/librpc/rpc/rpc_configuration_params.h +++ b/librpc/rpc/rpc_configuration_params.h @@ -2,6 +2,8 @@ #define RPC_RPC_CONFIGURATION_PARAMS_H_ #include +#include +#include "rpc_defaults.h" namespace confluo { namespace rpc { @@ -12,7 +14,9 @@ namespace rpc { class rpc_configuration_params { public: /** Iterator for the batches */ - static size_t ITERATOR_BATCH_SIZE; + static size_t ITERATOR_BATCH_SIZE() { + return conf::instance().get("iterator_batch_size", rpc_defaults::DEFAULT_ITERATOR_BATCH_SIZE()); + } }; } diff --git a/librpc/rpc/rpc_defaults.h b/librpc/rpc/rpc_defaults.h index 7c19b8abd..186554777 100644 --- a/librpc/rpc/rpc_defaults.h +++ b/librpc/rpc/rpc_defaults.h @@ -13,7 +13,9 @@ class rpc_defaults { public: // Iterator /** Default batch size for the iterator */ - static const size_t DEFAULT_ITERATOR_BATCH_SIZE = 20; + static inline size_t DEFAULT_ITERATOR_BATCH_SIZE() { + return 20; + } }; } diff --git a/librpc/rpc/rpc_server.h b/librpc/rpc/rpc_server.h index 09b0c078c..6b6c8a083 100644 --- a/librpc/rpc/rpc_server.h +++ b/librpc/rpc/rpc_server.h @@ -64,42 +64,19 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param store The confluo store used to initialize the service * handler */ - rpc_service_handler(confluo_store *store) - : handler_id_(-1), - store_(store), - iterator_id_(0) { - } + rpc_service_handler(confluo_store *store); /** * Registers this service handler on a new thread * @throw management_expcetion If this service handler could not * be registered */ - void register_handler() { - handler_id_ = thread_manager::register_thread(); - if (handler_id_ < 0) { - rpc_management_exception ex; - ex.msg = "Could not register handler"; - throw ex; - } else { - LOG_INFO << "Registered handler thread " << std::this_thread::get_id() << " as " << handler_id_; - } - } + void register_handler(); /** * Deregisters this service handler and its associated thread */ - void deregister_handler() { - int ret = thread_manager::deregister_thread(); - if (ret < 0) { - rpc_management_exception ex; - ex.msg = "Could not deregister handler"; - throw ex; - } else { - LOG_INFO << "Deregistered handler thread " << std::this_thread::get_id() - << " as " << ret; - } - } + void deregister_handler(); /** * Creates an atomic multilog from the given name, schema, and storage @@ -113,21 +90,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @return ID associated with the atomic multilog, -1 if it could * not be created */ - int64_t create_atomic_multilog(const std::string &name, - const rpc_schema &schema, - const rpc_storage_mode mode) { - int64_t ret = -1; - try { - ret = store_->create_atomic_multilog(name, - rpc_type_conversions::convert_schema(schema), - rpc_type_conversions::convert_mode(mode)); - } catch (management_exception &ex) { - rpc_management_exception e; - e.msg = ex.what(); - throw e; - } - return ret; - } + int64_t create_atomic_multilog(const std::string &name, const rpc_schema &schema, const rpc_storage_mode mode); /** * Gets information about the atomic multilog @@ -135,11 +98,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param _return The info about the atomic multilog that is filled up * @param name The name of the atomic multilog */ - void get_atomic_multilog_info(rpc_atomic_multilog_info &_return, const std::string &name) { - _return.id = store_->get_atomic_multilog_id(name); - auto dschema = store_->get_atomic_multilog(_return.id)->get_schema().columns(); - _return.schema = rpc_type_conversions::convert_schema(dschema); - } + void get_atomic_multilog_info(rpc_atomic_multilog_info &_return, const std::string &name); /** * Removes the atomic multilog with the matching ID @@ -148,15 +107,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @throw management_excpetion If the atomic multilog could not be * removed */ - void remove_atomic_multilog(int64_t id) { - try { - store_->remove_atomic_multilog(id); - } catch (management_exception &ex) { - rpc_management_exception e; - e.msg = ex.what(); - throw e; - } - } + void remove_atomic_multilog(int64_t id); /** * Adds an index to a field in the atomic multilog @@ -166,15 +117,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param bucket_size The size of the bucket * @throw managmeent_exception If the index could not be added */ - void add_index(int64_t id, const std::string &field_name, const double bucket_size) { - try { - store_->get_atomic_multilog(id)->add_index(field_name, bucket_size); - } catch (management_exception &ex) { - rpc_management_exception e; - e.msg = ex.what(); - throw e; - } - } + void add_index(int64_t id, const std::string &field_name, const double bucket_size); /** * Removes an index from a field in the atomic multilog @@ -183,15 +126,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param field_name The name of the field in the atomic multilog * @throw management_exception If the index could not be removed */ - void remove_index(int64_t id, const std::string &field_name) { - try { - store_->get_atomic_multilog(id)->remove_index(field_name); - } catch (management_exception &ex) { - rpc_management_exception e; - e.msg = ex.what(); - throw e; - } - } + void remove_index(int64_t id, const std::string &field_name); /** * Adds a filter to the atomic multilog @@ -204,19 +139,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * expression */ void add_filter(int64_t id, const std::string &filter_name, - const std::string &filter_expr) { - try { - store_->get_atomic_multilog(id)->add_filter(filter_name, filter_expr); - } catch (management_exception &ex) { - rpc_management_exception e; - e.msg = ex.what(); - throw e; - } catch (parse_exception &ex) { - rpc_management_exception e; - e.msg = ex.what(); - throw e; - } - } + const std::string &filter_expr); /** * Removes a filter from the atomic multilog @@ -225,15 +148,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param filter_name The name of the filter * @throw managment_exception If the filter could not be removed */ - void remove_filter(int64_t id, const std::string &filter_name) { - try { - store_->get_atomic_multilog(id)->remove_filter(filter_name); - } catch (management_exception &ex) { - rpc_management_exception e; - e.msg = ex.what(); - throw e; - } - } + void remove_filter(int64_t id, const std::string &filter_name); /** * Adds an aggregate to the atomic multilog @@ -245,23 +160,10 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @throw management_exception If the aggregate could not be added * @throw parse_exception If the filter expression could not be parsed */ - void add_aggregate(int64_t id, const std::string &aggregate_name, + void add_aggregate(int64_t id, + const std::string &aggregate_name, const std::string &filter_name, - const std::string &aggregate_expr) { - try { - store_->get_atomic_multilog(id)->add_aggregate(aggregate_name, - filter_name, - aggregate_expr); - } catch (management_exception &ex) { - rpc_management_exception e; - e.msg = ex.what(); - throw e; - } catch (parse_exception &ex) { - rpc_management_exception e; - e.msg = ex.what(); - throw e; - } - } + const std::string &aggregate_expr); /** * Removes an aggregate from the atomic multilog @@ -270,15 +172,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param aggregate_name The name of the aggregate * @throw management_exception If the aggregate could not be removed */ - void remove_aggregate(int64_t id, const std::string &aggregate_name) { - try { - store_->get_atomic_multilog(id)->remove_aggregate(aggregate_name); - } catch (management_exception &ex) { - rpc_management_exception e; - e.msg = ex.what(); - throw e; - } - } + void remove_aggregate(int64_t id, const std::string &aggregate_name); /** * Adds a trigger to the atomic multilog @@ -290,21 +184,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @throw parse_exception If the trigger expression could not be * parsed */ - void add_trigger(int64_t id, const std::string &trigger_name, - const std::string &trigger_expr) { - try { - store_->get_atomic_multilog(id)->install_trigger(trigger_name, - trigger_expr); - } catch (management_exception &ex) { - rpc_management_exception e; - e.msg = ex.what(); - throw e; - } catch (parse_exception &ex) { - rpc_management_exception e; - e.msg = ex.what(); - throw e; - } - } + void add_trigger(int64_t id, const std::string &trigger_name, const std::string &trigger_expr); /** * Removes a trigger from the atomic multilog @@ -313,15 +193,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param trigger_name The name of the trigger * @throw management_exception If the trigger could not be removed */ - void remove_trigger(int64_t id, const std::string &trigger_name) { - try { - store_->get_atomic_multilog(id)->remove_trigger(trigger_name); - } catch (management_exception &ex) { - rpc_management_exception e; - e.msg = ex.what(); - throw e; - } - } + void remove_trigger(int64_t id, const std::string &trigger_name); /** * Appends string data to the atomic multilog @@ -331,10 +203,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * * @return The offset to where the data is */ - int64_t append(int64_t id, const std::string &data) { - void *buf = (char *) &data[0]; // XXX: Fix - return store_->get_atomic_multilog(id)->append(buf); - } + int64_t append(int64_t id, const std::string &data); /** * Appends a record batch to the atomic multilog @@ -344,10 +213,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * * @return The offset where the batch is located */ - int64_t append_batch(int64_t id, const rpc_record_batch &batch) { - record_batch rbatch = rpc_type_conversions::convert_batch(batch); - return store_->get_atomic_multilog(id)->append_batch(rbatch); - } + int64_t append_batch(int64_t id, const rpc_record_batch &batch); /** * Reads n record strings from the atomic multilog @@ -357,17 +223,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param offset The offset to read from * @param nrecords The number of records to read */ - void read(std::string &_return, int64_t id, const int64_t offset, const int64_t nrecords) { - atomic_multilog *mlog = store_->get_atomic_multilog(id); - uint64_t limit; - read_only_data_log_ptr ptr; - mlog->read(offset, limit, ptr); - data_ptr dptr = ptr.decode(); - char *data = reinterpret_cast(dptr.get()); - size_t size = std::min(static_cast(limit - offset), - static_cast(nrecords * mlog->record_size())); - _return.assign(data, size); - } + void read(std::string &_return, int64_t id, const int64_t offset, const int64_t nrecords); /** * Queries an aggregate from the atomic multilog @@ -378,13 +234,11 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param begin_ms The beginning time in milliseconds * @param end_ms The end time in milliseconds */ - void query_aggregate(std::string &_return, int64_t id, + void query_aggregate(std::string &_return, + int64_t id, const std::string &aggregate_name, - const int64_t begin_ms, - const int64_t end_ms) { - atomic_multilog *m = store_->get_atomic_multilog(id); - _return = m->get_aggregate(aggregate_name, begin_ms, end_ms).to_string(); - } + int64_t begin_ms, + int64_t end_ms); // TODO: Add tests /** @@ -395,12 +249,10 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param aggregate_expr The aggregate expression * @param filter_expr The filter expression */ - void adhoc_aggregate(std::string &_return, int64_t id, + void adhoc_aggregate(std::string &_return, + int64_t id, const std::string &aggregate_expr, - const std::string &filter_expr) { - atomic_multilog *m = store_->get_atomic_multilog(id); - _return = m->execute_aggregate(aggregate_expr, filter_expr).to_string(); - } + const std::string &filter_expr); /** * Executes an ad hoc filter @@ -409,29 +261,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param id The identifier of the atomic multilog * @param filter_expr The filter expression */ - void adhoc_filter(rpc_iterator_handle &_return, int64_t id, - const std::string &filter_expr) { - bool success = false; - rpc_iterator_id it_id = new_iterator_id(); - atomic_multilog *mlog = store_->get_atomic_multilog(id); - try { - adhoc_entry entry(it_id, mlog->execute_filter(filter_expr)); - adhoc_status ret = adhoc_.insert(std::move(entry)); - success = ret.second; - } catch (parse_exception &ex) { - rpc_invalid_operation e; - e.msg = ex.what(); - throw e; - } - - if (!success) { - rpc_invalid_operation e; - e.msg = "Duplicate rpc_iterator_id assigned"; - throw e; - } - - adhoc_more(_return, mlog->record_size(), it_id); - } + void adhoc_filter(rpc_iterator_handle &_return, int64_t id, const std::string &filter_expr); /** * Queries a predefined filter @@ -443,21 +273,11 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param end_ms The end time in milliseconds * @throw rpc_invalid_exception If there was a duplicate rpc iterator */ - void predef_filter(rpc_iterator_handle &_return, int64_t id, - const std::string &filter_name, const int64_t begin_ms, - const int64_t end_ms) { - rpc_iterator_id it_id = new_iterator_id(); - atomic_multilog *mlog = store_->get_atomic_multilog(id); - predef_entry entry(it_id, mlog->query_filter(filter_name, begin_ms, end_ms)); - predef_status ret = predef_.insert(std::move(entry)); - if (!ret.second) { - rpc_invalid_operation e; - e.msg = "Duplicate rpc_iterator_id assigned"; - throw e; - } - - predef_more(_return, mlog->record_size(), it_id); - } + void predef_filter(rpc_iterator_handle &_return, + int64_t id, + const std::string &filter_name, + const int64_t begin_ms, + const int64_t end_ms); /** * Queries a combined filter @@ -470,31 +290,12 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param end_ms The end time in milliseconds * @throw rpc_invalid_exception If there was a duplicate rpc iterator */ - void combined_filter(rpc_iterator_handle &_return, int64_t id, + void combined_filter(rpc_iterator_handle &_return, + int64_t id, const std::string &filter_name, - const std::string &filter_expr, const int64_t begin_ms, - const int64_t end_ms) { - bool success = false; - rpc_iterator_id it_id = new_iterator_id(); - atomic_multilog *mlog = store_->get_atomic_multilog(id); - try { - combined_entry entry(it_id, mlog->query_filter(filter_name, - begin_ms, end_ms, filter_expr)); - combined_status ret = combined_.insert(std::move(entry)); - success = ret.second; - } catch (parse_exception &ex) { - rpc_invalid_operation e; - e.msg = ex.what(); - throw e; - } - if (!success) { - rpc_invalid_operation e; - e.msg = "Duplicate rpc_iterator_id assigned"; - throw e; - } - - combined_more(_return, mlog->record_size(), it_id); - } + const std::string &filter_expr, + const int64_t begin_ms, + const int64_t end_ms); /** * Gets the alerts from a time range @@ -505,20 +306,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param end_ms The end time in milliseconds * @throw rpc_invalid_exception If there was a duplicate rpc iterator */ - void alerts_by_time(rpc_iterator_handle &_return, int64_t id, - const int64_t begin_ms, const int64_t end_ms) { - rpc_iterator_id it_id = new_iterator_id(); - atomic_multilog *mlog = store_->get_atomic_multilog(id); - alerts_entry entry(it_id, mlog->get_alerts(begin_ms, end_ms)); - alerts_status ret = alerts_.insert(std::move(entry)); - if (!ret.second) { - rpc_invalid_operation e; - e.msg = "Duplicate rpc_iterator_id assigned"; - throw e; - } - - alerts_more(_return, it_id); - } + void alerts_by_time(rpc_iterator_handle &_return, int64_t id, const int64_t begin_ms, const int64_t end_ms); /** * Gets the alerts from a time range and by trigger @@ -530,22 +318,11 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param end_ms The end time in milliseconds * @throw rpc_invalid_exception If there was a duplicate rpc iterator */ - void alerts_by_trigger_and_time(rpc_iterator_handle &_return, int64_t id, - const std::string &trigger_name, const int64_t begin_ms, - const int64_t end_ms) { - rpc_iterator_id it_id = new_iterator_id(); - atomic_multilog *mlog = store_->get_atomic_multilog(id); - alerts_entry entry(it_id, mlog->get_alerts(begin_ms, end_ms, - trigger_name)); - alerts_status ret = alerts_.insert(std::move(entry)); - if (!ret.second) { - rpc_invalid_operation e; - e.msg = "Duplicate rpc_iterator_id assigned"; - throw e; - } - - alerts_more(_return, it_id); - } + void alerts_by_trigger_and_time(rpc_iterator_handle &_return, + int64_t id, + const std::string &trigger_name, + const int64_t begin_ms, + const int64_t end_ms); /** * Gets more from the map @@ -554,35 +331,7 @@ class rpc_service_handler : virtual public rpc_serviceIf { * @param id The identifier * @param desc The iterator description */ - void get_more(rpc_iterator_handle &_return, int64_t id, - const rpc_iterator_descriptor &desc) { - if (desc.handler_id != handler_id_) { - rpc_invalid_operation ex; - ex.msg = "handler_id mismatch"; - throw ex; - } - - size_t record_size = store_->get_atomic_multilog(id)->record_size(); - - switch (desc.type) { - case rpc_iterator_type::RPC_ADHOC: { - adhoc_more(_return, record_size, desc.id); - break; - } - case rpc_iterator_type::RPC_PREDEF: { - predef_more(_return, record_size, desc.id); - break; - } - case rpc_iterator_type::RPC_COMBINED: { - combined_more(_return, record_size, desc.id); - break; - } - case rpc_iterator_type::RPC_ALERTS: { - alerts_more(_return, desc.id); - break; - } - } - } + void get_more(rpc_iterator_handle &_return, int64_t id, const rpc_iterator_descriptor &desc); /** * Gets the number of records from the store @@ -591,121 +340,18 @@ class rpc_service_handler : virtual public rpc_serviceIf { * * @return The number of records */ - int64_t num_records(int64_t id) { - return store_->get_atomic_multilog(id)->num_records(); - } + int64_t num_records(int64_t id); private: - rpc_iterator_id new_iterator_id() { - return iterator_id_++; - } - - void adhoc_more(rpc_iterator_handle &_return, size_t record_size, - rpc_iterator_id it_id) { - // Initialize iterator descriptor - _return.desc.data_type = rpc_data_type::RPC_RECORD; - _return.desc.handler_id = handler_id_; - _return.desc.id = it_id; - _return.desc.type = rpc_iterator_type::RPC_ADHOC; - - // Read data from iterator - try { - auto &res = adhoc_.at(it_id); - size_t to_read = rpc_configuration_params::ITERATOR_BATCH_SIZE; - _return.data.reserve(record_size * to_read); - size_t i = 0; - for (; res->has_more() && i < to_read; ++i, res->advance()) { - record_t rec = res->get(); - _return.data.append(reinterpret_cast(rec.data()), rec.length()); - } - _return.num_entries = i; - _return.has_more = res->has_more(); - } catch (std::out_of_range &ex) { - rpc_invalid_operation e; - e.msg = "No such iterator"; - throw e; - } - } - - void predef_more(rpc_iterator_handle &_return, size_t record_size, - rpc_iterator_id it_id) { - // Initialize iterator descriptor - _return.desc.data_type = rpc_data_type::RPC_RECORD; - _return.desc.handler_id = handler_id_; - _return.desc.id = it_id; - _return.desc.type = rpc_iterator_type::RPC_PREDEF; - - // Read data from iterator - try { - auto &res = predef_.at(it_id); - size_t to_read = rpc_configuration_params::ITERATOR_BATCH_SIZE; - _return.data.reserve(record_size * to_read); - size_t i = 0; - for (; res->has_more() && i < to_read; ++i, res->advance()) { - record_t rec = res->get(); - _return.data.append(reinterpret_cast(rec.data()), rec.length()); - } - _return.num_entries = i; - _return.has_more = res->has_more(); - } catch (std::out_of_range &ex) { - rpc_invalid_operation e; - e.msg = "No such iterator"; - throw e; - } - } - - void combined_more(rpc_iterator_handle &_return, size_t record_size, - rpc_iterator_id it_id) { - // Initialize iterator descriptor - _return.desc.data_type = rpc_data_type::RPC_RECORD; - _return.desc.handler_id = handler_id_; - _return.desc.id = it_id; - _return.desc.type = rpc_iterator_type::RPC_COMBINED; - - // Read data from iterator - try { - auto &res = combined_.at(it_id); - size_t to_read = rpc_configuration_params::ITERATOR_BATCH_SIZE; - _return.data.reserve(record_size * to_read); - size_t i = 0; - for (; res->has_more() && i < to_read; ++i, res->advance()) { - record_t rec = res->get(); - _return.data.append(reinterpret_cast(rec.data()), rec.length()); - } - _return.num_entries = i; - _return.has_more = res->has_more(); - } catch (std::out_of_range &ex) { - rpc_invalid_operation e; - e.msg = "No such iterator"; - throw e; - } - } - - void alerts_more(rpc_iterator_handle &_return, rpc_iterator_id it_id) { - // Initialize iterator descriptor - _return.desc.data_type = rpc_data_type::RPC_ALERT; - _return.desc.handler_id = handler_id_; - _return.desc.id = it_id; - _return.desc.type = rpc_iterator_type::RPC_ALERTS; - - // Read data from iterator - try { - auto &res = alerts_.at(it_id); - size_t to_read = rpc_configuration_params::ITERATOR_BATCH_SIZE; - size_t i = 0; - for (; res->has_more() && i < to_read; ++i, res->advance()) { - alert a = res->get(); - _return.data.append(a.to_string()); - _return.data.push_back('\n'); - } - _return.num_entries = i; - _return.has_more = res->has_more(); - } catch (std::out_of_range &ex) { - rpc_invalid_operation e; - e.msg = "No such iterator"; - throw e; - } - } + rpc_iterator_id new_iterator_id(); + + void adhoc_more(rpc_iterator_handle &_return, size_t record_size, rpc_iterator_id it_id); + + void predef_more(rpc_iterator_handle &_return, size_t record_size, rpc_iterator_id it_id); + + void combined_more(rpc_iterator_handle &_return, size_t record_size, rpc_iterator_id it_id); + + void alerts_more(rpc_iterator_handle &_return, rpc_iterator_id it_id); rpc_handler_id handler_id_; confluo_store *store_; @@ -728,15 +374,12 @@ class rpc_clone_factory : public rpc_serviceIfFactory { * * @param store The confluo store for the rpc clone */ - rpc_clone_factory(confluo_store *store) - : store_(store) { - } + rpc_clone_factory(confluo_store *store); /** * Destructs the rpc clone factory */ - virtual ~rpc_clone_factory() { - } + virtual ~rpc_clone_factory(); /** * Gets the service handler for the rpc connection @@ -745,25 +388,14 @@ class rpc_clone_factory : public rpc_serviceIfFactory { * * @return An rpc service handler for the confluo store */ - virtual rpc_serviceIf *getHandler(const TConnectionInfo &conn_info) { - std::shared_ptr sock = std::dynamic_pointer_cast( - conn_info.transport); - LOG_INFO << "Incoming connection\n" - << "\t\t\tSocketInfo: " << sock->getSocketInfo() << "\n" - << "\t\t\tPeerHost: " << sock->getPeerHost() << "\n" - << "\t\t\tPeerAddress: " << sock->getPeerAddress() << "\n" - << "\t\t\tPeerPort: " << sock->getPeerPort(); - return new rpc_service_handler(store_); - } + virtual rpc_serviceIf *getHandler(const TConnectionInfo &conn_info); /** * Destructs the handler * * @param handler The handler to destruct */ - virtual void releaseHandler(rpc_serviceIf *handler) { - delete handler; - } + virtual void releaseHandler(rpc_serviceIf *handler); private: confluo_store *store_; @@ -783,23 +415,7 @@ class rpc_server { * * @return A pointer to the server */ - static std::shared_ptr create(confluo_store *store, - const std::string &address, - int port) { - std::shared_ptr clone_factory( - new rpc_clone_factory(store)); - std::shared_ptr proc_factory( - new rpc_serviceProcessorFactory(clone_factory)); - std::shared_ptr sock(new TServerSocket(address, port)); - std::shared_ptr transport_factory( - new TBufferedTransportFactory()); - std::shared_ptr protocol_factory( - new TBinaryProtocolFactory()); - std::shared_ptr server( - new TThreadedServer(proc_factory, sock, transport_factory, - protocol_factory)); - return server; - } + static std::shared_ptr create(confluo_store *store, const std::string &address, int port); }; } diff --git a/librpc/src/confluo_server.cc b/librpc/src/confluo_server.cc new file mode 100644 index 000000000..a31e20ed9 --- /dev/null +++ b/librpc/src/confluo_server.cc @@ -0,0 +1,57 @@ +#include +#include +#include "cmd_parse.h" +#include "rpc_server.h" + +using namespace ::apache::thrift; +using namespace ::apache::thrift::protocol; +using namespace ::apache::thrift::transport; +using namespace ::apache::thrift::server; + +using boost::shared_ptr; + +using namespace ::confluo; +using namespace ::confluo::rpc; +using namespace ::utils; + +int main(int argc, char **argv) { + utils::error_handling::install_signal_handler(argv[0], SIGSEGV, SIGKILL, SIGSTOP); + + cmd_options opts; + opts.add(cmd_option("port", 'p', false).set_default("9090").set_description("Port that server listens on")); + opts.add(cmd_option("address", 'a', false).set_default("127.0.0.1").set_description("Address server binds to")); + opts.add(cmd_option("data-path", 'd', false).set_default(".").set_description("Data path for Confluo")); + + cmd_parser parser(argc, argv, opts); + if (parser.get_flag("help")) { + fprintf(stderr, "%s\n", parser.help_msg().c_str()); + return 0; + } + + int port; + std::string address; + std::string data_path; + + try { + port = parser.get_int("port"); + address = parser.get("address"); + data_path = parser.get("data-path"); + } catch (std::exception &e) { + fprintf(stderr, "could not parse cmdline args: %s\n", e.what()); + fprintf(stderr, "%s\n", parser.help_msg().c_str()); + return 0; + } + + LOG_INFO << parser.parsed_values(); + + confluo_store *store = new confluo_store(data_path); + + try { + auto server = rpc_server::create(store, address, port); + server->serve(); + } catch (std::exception &e) { + LOG_ERROR << "Could not start server listening on " << address << ":" << port << ": " << e.what(); + } + + return 0; +} \ No newline at end of file diff --git a/librpc/src/rpc_configuration_params.cc b/librpc/src/rpc_configuration_params.cc deleted file mode 100644 index fd611f7ce..000000000 --- a/librpc/src/rpc_configuration_params.cc +++ /dev/null @@ -1,12 +0,0 @@ -#include "rpc_configuration_params.h" -#include "conf/configuration_params.h" -#include "rpc_defaults.h" - -namespace confluo { -namespace rpc { - -size_t rpc_configuration_params::ITERATOR_BATCH_SIZE = - confluo_conf.get("iterator_batch_size", rpc_defaults::DEFAULT_ITERATOR_BATCH_SIZE); - -} -} \ No newline at end of file diff --git a/librpc/src/rpc_defaults.cc b/librpc/src/rpc_defaults.cc deleted file mode 100644 index 874bd9c24..000000000 --- a/librpc/src/rpc_defaults.cc +++ /dev/null @@ -1,9 +0,0 @@ -#include "rpc_defaults.h" - -namespace confluo { -namespace rpc { - -const size_t rpc_defaults::DEFAULT_ITERATOR_BATCH_SIZE; - -} -} \ No newline at end of file diff --git a/librpc/src/rpc_record_batch_builder.cc b/librpc/src/rpc_record_batch_builder.cc index 47f4dee95..dff1c7f86 100644 --- a/librpc/src/rpc_record_batch_builder.cc +++ b/librpc/src/rpc_record_batch_builder.cc @@ -15,7 +15,7 @@ rpc_record_batch_builder::rpc_record_batch_builder(const schema_t &schema) } void rpc_record_batch_builder::add_record(const record_data &rec) { int64_t ts = *reinterpret_cast(rec.data()); - int64_t time_block = ts / configuration_params::TIME_RESOLUTION_NS; + int64_t time_block = ts / configuration_params::TIME_RESOLUTION_NS(); batch_sizes_[time_block] += schema_.record_size(); batch_[time_block].write(rec.data(), schema_.record_size()); nrecords_++; diff --git a/librpc/src/rpc_server.cc b/librpc/src/rpc_server.cc index 72ed9de72..8f5e8ab3d 100644 --- a/librpc/src/rpc_server.cc +++ b/librpc/src/rpc_server.cc @@ -1,5 +1,4 @@ #include "rpc_server.h" -#include "cmd_parse.h" using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; @@ -12,54 +11,448 @@ using namespace ::confluo; using namespace ::confluo::rpc; using namespace ::utils; -int main(int argc, char **argv) { - utils::error_handling::install_signal_handler(argv[0], SIGSEGV, SIGKILL, - SIGSTOP); +namespace confluo { +namespace rpc { - cmd_options opts; - opts.add( - cmd_option("port", 'p', false).set_default("9090").set_description( - "Port that server listens on")); +rpc_service_handler::rpc_service_handler(confluo_store *store) + : handler_id_(-1), + store_(store), + iterator_id_(0) { +} +void rpc_service_handler::register_handler() { + handler_id_ = thread_manager::register_thread(); + if (handler_id_ < 0) { + rpc_management_exception ex; + ex.msg = "Could not register handler"; + throw ex; + } else { + LOG_INFO << "Registered handler thread " << std::this_thread::get_id() << " as " << handler_id_; + } +} +void rpc_service_handler::deregister_handler() { + int ret = thread_manager::deregister_thread(); + if (ret < 0) { + rpc_management_exception ex; + ex.msg = "Could not deregister handler"; + throw ex; + } else { + LOG_INFO << "Deregistered handler thread " << std::this_thread::get_id() << " as " << ret; + } +} +int64_t rpc_service_handler::create_atomic_multilog(const std::string &name, + const rpc_schema &schema, + const rpc_storage_mode mode) { + int64_t ret; + try { + ret = store_->create_atomic_multilog(name, + rpc_type_conversions::convert_schema(schema), + rpc_type_conversions::convert_mode(mode)); + } catch (management_exception &ex) { + rpc_management_exception e; + e.msg = ex.what(); + throw e; + } + return ret; +} +void rpc_service_handler::get_atomic_multilog_info(rpc_atomic_multilog_info &_return, const std::string &name) { + _return.id = store_->get_atomic_multilog_id(name); + auto dschema = store_->get_atomic_multilog(_return.id)->get_schema().columns(); + _return.schema = rpc_type_conversions::convert_schema(dschema); +} +void rpc_service_handler::remove_atomic_multilog(int64_t id) { + try { + store_->remove_atomic_multilog(id); + } catch (management_exception &ex) { + rpc_management_exception e; + e.msg = ex.what(); + throw e; + } +} +void rpc_service_handler::add_index(int64_t id, const std::string &field_name, const double bucket_size) { + try { + store_->get_atomic_multilog(id)->add_index(field_name, bucket_size); + } catch (management_exception &ex) { + rpc_management_exception e; + e.msg = ex.what(); + throw e; + } +} +void rpc_service_handler::remove_index(int64_t id, const std::string &field_name) { + try { + store_->get_atomic_multilog(id)->remove_index(field_name); + } catch (management_exception &ex) { + rpc_management_exception e; + e.msg = ex.what(); + throw e; + } +} +void rpc_service_handler::add_filter(int64_t id, const std::string &filter_name, const std::string &filter_expr) { + try { + store_->get_atomic_multilog(id)->add_filter(filter_name, filter_expr); + } catch (management_exception &ex) { + rpc_management_exception e; + e.msg = ex.what(); + throw e; + } catch (parse_exception &ex) { + rpc_management_exception e; + e.msg = ex.what(); + throw e; + } +} +void rpc_service_handler::remove_filter(int64_t id, const std::string &filter_name) { + try { + store_->get_atomic_multilog(id)->remove_filter(filter_name); + } catch (management_exception &ex) { + rpc_management_exception e; + e.msg = ex.what(); + throw e; + } +} +void rpc_service_handler::add_aggregate(int64_t id, + const std::string &aggregate_name, + const std::string &filter_name, + const std::string &aggregate_expr) { + try { + store_->get_atomic_multilog(id)->add_aggregate(aggregate_name, filter_name, aggregate_expr); + } catch (management_exception &ex) { + rpc_management_exception e; + e.msg = ex.what(); + throw e; + } catch (parse_exception &ex) { + rpc_management_exception e; + e.msg = ex.what(); + throw e; + } +} +void rpc_service_handler::remove_aggregate(int64_t id, const std::string &aggregate_name) { + try { + store_->get_atomic_multilog(id)->remove_aggregate(aggregate_name); + } catch (management_exception &ex) { + rpc_management_exception e; + e.msg = ex.what(); + throw e; + } +} +void rpc_service_handler::add_trigger(int64_t id, const std::string &trigger_name, const std::string &trigger_expr) { + try { + store_->get_atomic_multilog(id)->install_trigger(trigger_name, trigger_expr); + } catch (management_exception &ex) { + rpc_management_exception e; + e.msg = ex.what(); + throw e; + } catch (parse_exception &ex) { + rpc_management_exception e; + e.msg = ex.what(); + throw e; + } +} +void rpc_service_handler::remove_trigger(int64_t id, const std::string &trigger_name) { + try { + store_->get_atomic_multilog(id)->remove_trigger(trigger_name); + } catch (management_exception &ex) { + rpc_management_exception e; + e.msg = ex.what(); + throw e; + } +} +int64_t rpc_service_handler::append(int64_t id, const std::string &data) { + void *buf = (char *) &data[0]; // XXX: Fix + return static_cast(store_->get_atomic_multilog(id)->append(buf)); +} +int64_t rpc_service_handler::append_batch(int64_t id, const rpc_record_batch &batch) { + record_batch rbatch = rpc_type_conversions::convert_batch(batch); + return static_cast(store_->get_atomic_multilog(id)->append_batch(rbatch)); +} +void rpc_service_handler::read(std::string &_return, int64_t id, const int64_t offset, const int64_t nrecords) { + atomic_multilog *mlog = store_->get_atomic_multilog(id); + uint64_t limit; + read_only_data_log_ptr ptr; + mlog->read((uint64_t) offset, limit, ptr); + data_ptr dptr = ptr.decode(); + char *data = reinterpret_cast(dptr.get()); + size_t size = std::min(static_cast(limit - offset), + static_cast(nrecords * mlog->record_size())); + _return.assign(data, size); +} +void rpc_service_handler::query_aggregate(std::string &_return, + int64_t id, + const std::string &aggregate_name, + const int64_t begin_ms, + const int64_t end_ms) { + atomic_multilog *m = store_->get_atomic_multilog(id); + _return = m->get_aggregate(aggregate_name, (uint64_t) begin_ms, (uint64_t) end_ms).to_string(); +} +void rpc_service_handler::adhoc_aggregate(std::string &_return, + int64_t id, + const std::string &aggregate_expr, + const std::string &filter_expr) { + atomic_multilog *m = store_->get_atomic_multilog(id); + _return = m->execute_aggregate(aggregate_expr, filter_expr).to_string(); +} +void rpc_service_handler::adhoc_filter(rpc_iterator_handle &_return, int64_t id, const std::string &filter_expr) { + bool success; + rpc_iterator_id it_id = new_iterator_id(); + atomic_multilog *mlog = store_->get_atomic_multilog(id); + try { + adhoc_entry entry(it_id, mlog->execute_filter(filter_expr)); + adhoc_status ret = adhoc_.insert(std::move(entry)); + success = ret.second; + } catch (parse_exception &ex) { + rpc_invalid_operation e; + e.msg = ex.what(); + throw e; + } + + if (!success) { + rpc_invalid_operation e; + e.msg = "Duplicate rpc_iterator_id assigned"; + throw e; + } + + adhoc_more(_return, mlog->record_size(), it_id); +} +void rpc_service_handler::predef_filter(rpc_iterator_handle &_return, + int64_t id, + const std::string &filter_name, + const int64_t begin_ms, + const int64_t end_ms) { + rpc_iterator_id it_id = new_iterator_id(); + atomic_multilog *mlog = store_->get_atomic_multilog(id); + predef_entry entry(it_id, mlog->query_filter(filter_name, (uint64_t) begin_ms, (uint64_t) end_ms)); + predef_status ret = predef_.insert(std::move(entry)); + if (!ret.second) { + rpc_invalid_operation e; + e.msg = "Duplicate rpc_iterator_id assigned"; + throw e; + } + + predef_more(_return, mlog->record_size(), it_id); +} +void rpc_service_handler::combined_filter(rpc_iterator_handle &_return, + int64_t id, + const std::string &filter_name, + const std::string &filter_expr, + const int64_t begin_ms, + const int64_t end_ms) { + bool success; + rpc_iterator_id it_id = new_iterator_id(); + atomic_multilog *mlog = store_->get_atomic_multilog(id); + try { + combined_entry entry(it_id, mlog->query_filter(filter_name, (uint64_t) begin_ms, (uint64_t) end_ms, filter_expr)); + combined_status ret = combined_.insert(std::move(entry)); + success = ret.second; + } catch (parse_exception &ex) { + rpc_invalid_operation e; + e.msg = ex.what(); + throw e; + } + if (!success) { + rpc_invalid_operation e; + e.msg = "Duplicate rpc_iterator_id assigned"; + throw e; + } + + combined_more(_return, mlog->record_size(), it_id); +} +void rpc_service_handler::alerts_by_time(rpc_iterator_handle &_return, + int64_t id, + const int64_t begin_ms, + const int64_t end_ms) { + rpc_iterator_id it_id = new_iterator_id(); + atomic_multilog *mlog = store_->get_atomic_multilog(id); + alerts_entry entry(it_id, mlog->get_alerts((uint64_t) begin_ms, (uint64_t) end_ms)); + alerts_status ret = alerts_.insert(std::move(entry)); + if (!ret.second) { + rpc_invalid_operation e; + e.msg = "Duplicate rpc_iterator_id assigned"; + throw e; + } - opts.add( - cmd_option("address", 'a', false).set_default("127.0.0.1").set_description( - "Address server binds to")); + alerts_more(_return, it_id); +} +void rpc_service_handler::alerts_by_trigger_and_time(rpc_iterator_handle &_return, + int64_t id, + const std::string &trigger_name, + const int64_t begin_ms, + const int64_t end_ms) { + rpc_iterator_id it_id = new_iterator_id(); + atomic_multilog *mlog = store_->get_atomic_multilog(id); + alerts_entry entry(it_id, mlog->get_alerts((uint64_t) begin_ms, (uint64_t) end_ms, trigger_name)); + alerts_status ret = alerts_.insert(std::move(entry)); + if (!ret.second) { + rpc_invalid_operation e; + e.msg = "Duplicate rpc_iterator_id assigned"; + throw e; + } + + alerts_more(_return, it_id); +} +void rpc_service_handler::get_more(rpc_iterator_handle &_return, int64_t id, const rpc_iterator_descriptor &desc) { + if (desc.handler_id != handler_id_) { + rpc_invalid_operation ex; + ex.msg = "handler_id mismatch"; + throw ex; + } - opts.add( - cmd_option("data-path", 'd', false).set_default(".").set_description( - "Data path for Confluo")); + size_t record_size = store_->get_atomic_multilog(id)->record_size(); - cmd_parser parser(argc, argv, opts); - if (parser.get_flag("help")) { - fprintf(stderr, "%s\n", parser.help_msg().c_str()); - return 0; + switch (desc.type) { + case rpc_iterator_type::RPC_ADHOC: { + adhoc_more(_return, record_size, desc.id); + break; + } + case rpc_iterator_type::RPC_PREDEF: { + predef_more(_return, record_size, desc.id); + break; + } + case rpc_iterator_type::RPC_COMBINED: { + combined_more(_return, record_size, desc.id); + break; + } + case rpc_iterator_type::RPC_ALERTS: { + alerts_more(_return, desc.id); + break; + } } +} - int port; - std::string address; - std::string data_path; +int64_t rpc_service_handler::num_records(int64_t id) { + return static_cast(store_->get_atomic_multilog(id)->num_records()); +} +rpc_iterator_id rpc_service_handler::new_iterator_id() { + return iterator_id_++; +} +void rpc_service_handler::adhoc_more(rpc_iterator_handle &_return, size_t record_size, rpc_iterator_id it_id) { + // Initialize iterator descriptor + _return.desc.data_type = rpc_data_type::RPC_RECORD; + _return.desc.handler_id = handler_id_; + _return.desc.id = it_id; + _return.desc.type = rpc_iterator_type::RPC_ADHOC; + + // Read data from iterator try { - port = parser.get_int("port"); - address = parser.get("address"); - data_path = parser.get("data-path"); - } catch (std::exception &e) { - fprintf(stderr, "could not parse cmdline args: %s\n", e.what()); - fprintf(stderr, "%s\n", parser.help_msg().c_str()); - return 0; + auto &res = adhoc_.at(it_id); + size_t to_read = rpc_configuration_params::ITERATOR_BATCH_SIZE(); + _return.data.reserve(record_size * to_read); + size_t i = 0; + for (; res->has_more() && i < to_read; ++i, res->advance()) { + record_t rec = res->get(); + _return.data.append(reinterpret_cast(rec.data()), rec.length()); + } + _return.num_entries = static_cast(i); + _return.has_more = res->has_more(); + } catch (std::out_of_range &ex) { + rpc_invalid_operation e; + e.msg = "No such iterator"; + throw e; } +} +void rpc_service_handler::predef_more(rpc_iterator_handle &_return, size_t record_size, rpc_iterator_id it_id) { + // Initialize iterator descriptor + _return.desc.data_type = rpc_data_type::RPC_RECORD; + _return.desc.handler_id = handler_id_; + _return.desc.id = it_id; + _return.desc.type = rpc_iterator_type::RPC_PREDEF; - LOG_INFO << parser.parsed_values(); + // Read data from iterator + try { + auto &res = predef_.at(it_id); + size_t to_read = rpc_configuration_params::ITERATOR_BATCH_SIZE(); + _return.data.reserve(record_size * to_read); + size_t i = 0; + for (; res->has_more() && i < to_read; ++i, res->advance()) { + record_t rec = res->get(); + _return.data.append(reinterpret_cast(rec.data()), rec.length()); + } + _return.num_entries = static_cast(i); + _return.has_more = res->has_more(); + } catch (std::out_of_range &ex) { + rpc_invalid_operation e; + e.msg = "No such iterator"; + throw e; + } +} +void rpc_service_handler::combined_more(rpc_iterator_handle &_return, size_t record_size, rpc_iterator_id it_id) { + // Initialize iterator descriptor + _return.desc.data_type = rpc_data_type::RPC_RECORD; + _return.desc.handler_id = handler_id_; + _return.desc.id = it_id; + _return.desc.type = rpc_iterator_type::RPC_COMBINED; - confluo_store *store = new confluo_store(data_path); + // Read data from iterator + try { + auto &res = combined_.at(it_id); + size_t to_read = rpc_configuration_params::ITERATOR_BATCH_SIZE(); + _return.data.reserve(record_size * to_read); + size_t i = 0; + for (; res->has_more() && i < to_read; ++i, res->advance()) { + record_t rec = res->get(); + _return.data.append(reinterpret_cast(rec.data()), rec.length()); + } + _return.num_entries = static_cast(i); + _return.has_more = res->has_more(); + } catch (std::out_of_range &ex) { + rpc_invalid_operation e; + e.msg = "No such iterator"; + throw e; + } +} +void rpc_service_handler::alerts_more(rpc_iterator_handle &_return, rpc_iterator_id it_id) { + // Initialize iterator descriptor + _return.desc.data_type = rpc_data_type::RPC_ALERT; + _return.desc.handler_id = handler_id_; + _return.desc.id = it_id; + _return.desc.type = rpc_iterator_type::RPC_ALERTS; + // Read data from iterator try { - auto server = rpc_server::create(store, address, port); - server->serve(); - } catch (std::exception &e) { - LOG_ERROR << "Could not start server listening on " << address << ":" << port << ": " << e.what(); + auto &res = alerts_.at(it_id); + size_t to_read = rpc_configuration_params::ITERATOR_BATCH_SIZE(); + size_t i = 0; + for (; res->has_more() && i < to_read; ++i, res->advance()) { + alert a = res->get(); + _return.data.append(a.to_string()); + _return.data.push_back('\n'); + } + _return.num_entries = static_cast(i); + _return.has_more = res->has_more(); + } catch (std::out_of_range &ex) { + rpc_invalid_operation e; + e.msg = "No such iterator"; + throw e; } +} - return 0; +rpc_clone_factory::rpc_clone_factory(confluo_store *store) + : store_(store) { +} +rpc_clone_factory::~rpc_clone_factory() { +} +rpc_serviceIf *rpc_clone_factory::getHandler(const TConnectionInfo &conn_info) { + std::shared_ptr sock = std::dynamic_pointer_cast( + conn_info.transport); + LOG_INFO << "Incoming connection\n" + << "\t\t\tSocketInfo: " << sock->getSocketInfo() << "\n" + << "\t\t\tPeerHost: " << sock->getPeerHost() << "\n" + << "\t\t\tPeerAddress: " << sock->getPeerAddress() << "\n" + << "\t\t\tPeerPort: " << sock->getPeerPort(); + return new rpc_service_handler(store_); +} +void rpc_clone_factory::releaseHandler(rpc_serviceIf *handler) { + delete handler; } +std::shared_ptr rpc_server::create(confluo_store *store, const std::string &address, int port) { + std::shared_ptr clone_factory(new rpc_clone_factory(store)); + std::shared_ptr proc_factory(new rpc_serviceProcessorFactory(clone_factory)); + std::shared_ptr sock(new TServerSocket(address, port)); + std::shared_ptr t_factory(new TBufferedTransportFactory()); + std::shared_ptr p_factory(new TBinaryProtocolFactory()); + std::shared_ptr server(new TThreadedServer(proc_factory, sock, t_factory, p_factory)); + return server; +} +} +} + + diff --git a/librpc/test/client_connection_test.h b/librpc/test/client_connection_test.h index e5a4d2410..83b8dc931 100644 --- a/librpc/test/client_connection_test.h +++ b/librpc/test/client_connection_test.h @@ -25,7 +25,7 @@ TEST_F(ClientConnectionTest, ConcurrentConnectionsTest) { }); rpc_test_utils::wait_till_server_ready(SERVER_ADDRESS, SERVER_PORT); - std::vector clients(static_cast(configuration_params::MAX_CONCURRENCY)); + std::vector clients(static_cast(configuration_params::MAX_CONCURRENCY())); for (auto &client : clients) { client.connect(SERVER_ADDRESS, SERVER_PORT); } diff --git a/librpc/test/client_read_ops_test.h b/librpc/test/client_read_ops_test.h index 406c71262..2bc43d2a7 100644 --- a/librpc/test/client_read_ops_test.h +++ b/librpc/test/client_read_ops_test.h @@ -64,8 +64,7 @@ class ClientReadOpsTest : public testing::Test { static rec r; - static void *record(bool a, int8_t b, int16_t c, int32_t d, int64_t e, - float f, double g, const char *h) { + static void *record(bool a, int8_t b, int16_t c, int32_t d, int64_t e, float f, double g, const char *h) { int64_t ts = utils::time_utils::cur_ns(); r = {ts, a, b, c, d, e, f, g, {}}; size_t len = std::min(static_cast(16), strlen(h)); @@ -76,8 +75,7 @@ class ClientReadOpsTest : public testing::Test { return reinterpret_cast(&r); } - static void *record(int64_t ts, bool a, int8_t b, int16_t c, int32_t d, - int64_t e, float f, double g, const char *h) { + static void *record(int64_t ts, bool a, int8_t b, int16_t c, int32_t d, int64_t e, float f, double g, const char *h) { r = {ts, a, b, c, d, e, f, g, {}}; size_t len = std::min(static_cast(16), strlen(h)); memcpy(r.h, h, len); @@ -388,7 +386,7 @@ TEST_F(ClientReadOpsTest, FilterAggregateTriggerTest) { mlog->install_trigger("trigger8", "agg8 >= 10"); int64_t now_ns = time_utils::cur_ns(); - int64_t beg = now_ns / configuration_params::TIME_RESOLUTION_NS; + int64_t beg = now_ns / configuration_params::TIME_RESOLUTION_NS(); int64_t end = beg; mlog->append(record(now_ns, false, '0', 0, 0, 0, 0.0, 0.01, "abc")); mlog->append(record(now_ns, true, '1', 10, 2, 1, 0.1, 0.02, "defg")); @@ -718,7 +716,7 @@ TEST_F(ClientReadOpsTest, BatchFilterAggregateTriggerTest) { mlog->install_trigger("trigger8", "agg8 >= 10"); int64_t now_ns = time_utils::cur_ns(); - int64_t beg = now_ns / configuration_params::TIME_RESOLUTION_NS; + int64_t beg = now_ns / configuration_params::TIME_RESOLUTION_NS(); int64_t end = beg; record_batch batch = build_batch(*mlog, now_ns); mlog->append_batch(batch); diff --git a/librpc/test/client_write_ops_test.h b/librpc/test/client_write_ops_test.h index 8574db2d4..7792b585b 100644 --- a/librpc/test/client_write_ops_test.h +++ b/librpc/test/client_write_ops_test.h @@ -41,8 +41,7 @@ class ClientWriteOpsTest : public testing::Test { static rec r; - static void *record(bool a, int8_t b, int16_t c, int32_t d, int64_t e, - float f, double g, const char *h) { + static void *record(bool a, int8_t b, int16_t c, int32_t d, int64_t e, float f, double g, const char *h) { int64_t ts = utils::time_utils::cur_ns(); r = {ts, a, b, c, d, e, f, g, {}}; size_t len = std::min(static_cast(16), strlen(h)); @@ -53,8 +52,7 @@ class ClientWriteOpsTest : public testing::Test { return reinterpret_cast(&r); } - static void *record(int64_t ts, bool a, int8_t b, int16_t c, int32_t d, - int64_t e, float f, double g, const char *h) { + static void *record(int64_t ts, bool a, int8_t b, int16_t c, int32_t d, int64_t e, float f, double g, const char *h) { r = {ts, a, b, c, d, e, f, g, {}}; size_t len = std::min(static_cast(16), strlen(h)); memcpy(r.h, h, len); @@ -350,7 +348,7 @@ TEST_F(ClientWriteOpsTest, AddFilterAndTriggerTest) { client.install_trigger("trigger8", "agg8 >= 10"); int64_t now_ns = time_utils::cur_ns(); - int64_t beg = now_ns / configuration_params::TIME_RESOLUTION_NS; + int64_t beg = now_ns / configuration_params::TIME_RESOLUTION_NS(); int64_t end = beg; mlog->append(record(now_ns, false, '0', 0, 0, 0, 0.0, 0.01, "abc")); mlog->append(record(now_ns, true, '1', 10, 2, 1, 0.1, 0.02, "defg")); @@ -627,7 +625,7 @@ TEST_F(ClientWriteOpsTest, RemoveFilterTriggerTest) { client.install_trigger("trigger1", "agg1 >= 10"); client.install_trigger("trigger2", "agg2 >= 10"); - int64_t beg = r.ts / configuration_params::TIME_RESOLUTION_NS; + int64_t beg = r.ts / configuration_params::TIME_RESOLUTION_NS(); mlog->append(record(false, '0', 0, 0, 0, 0.0, 0.01, "abc")); mlog->append(record(true, '1', 10, 2, 1, 0.1, 0.02, "defg")); mlog->append(record(false, '2', 20, 4, 10, 0.2, 0.03, "hijkl")); @@ -637,7 +635,7 @@ TEST_F(ClientWriteOpsTest, RemoveFilterTriggerTest) { mlog->append(record(false, '6', 60, 12, 100000, 0.6, 0.07, "zzz")); mlog->append(record(true, '7', 70, 14, 1000000, 0.7, 0.08, "zzz")); - int64_t end = r.ts / configuration_params::TIME_RESOLUTION_NS; + int64_t end = r.ts / configuration_params::TIME_RESOLUTION_NS(); size_t i = 0; for (auto r = mlog->query_filter("filter1", beg, end); r->has_more();