Skip to content

Commit

Permalink
More updates to separation of source from header files
Browse files Browse the repository at this point in the history
  • Loading branch information
anuragkh committed Jun 4, 2018
1 parent 1daa358 commit 63dc722
Show file tree
Hide file tree
Showing 38 changed files with 808 additions and 749 deletions.
4 changes: 1 addition & 3 deletions libconfluo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ add_library(confluo STATIC
src/aggregate/aggregate_manager.cc
src/aggregate/aggregate_ops.cc
src/compression/confluo_encoder.cc
src/conf/configuration_params.cc
src/conf/defaults.cc
src/container/data_log.cc
src/container/reflog.cc
src/parser/aggregate_parser.cc
Expand Down Expand Up @@ -188,7 +186,7 @@ add_library(confluo STATIC
src/container/cursor/alert_cursor.cc
src/container/cursor/offset_cursors.cc
src/container/cursor/record_cursors.cc
src/conf/configuration_parser.cc)
)
target_link_libraries(confluo confluoutils)

if (BUILD_TESTS)
Expand Down
12 changes: 6 additions & 6 deletions libconfluo/confluo/archival/monolog_linear_archiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class monolog_linear_archiver : public archiver {
* @param log monolog to archive
*/
monolog_linear_archiver(const std::string &path, monolog *log)
: writer_(path, "monolog_linear", archival_configuration_params::MAX_FILE_SIZE),
: writer_(path, "monolog_linear", archival_configuration_params::MAX_FILE_SIZE()),
archival_tail_(0),
log_(log) {
writer_.close();
Expand Down Expand Up @@ -80,15 +80,15 @@ class monolog_linear_archiver : public archiver {
void archive_bucket(T *bucket) {
auto metadata = ptr_metadata::get(bucket);
auto encoded_bucket = confluo_encoder::encode(bucket, metadata->data_size_,
archival_configuration_params::DATA_LOG_ENCODING_TYPE);
archival_configuration_params::DATA_LOG_ENCODING_TYPE());
size_t enc_size = encoded_bucket.size();
auto off = writer_.append<ptr_metadata, uint8_t>(metadata, 1, encoded_bucket.get(), enc_size);

auto action = monolog_linear_archival_action(archival_tail_ + BUCKET_SIZE);
writer_.commit<monolog_linear_archival_action>(action);

ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::DATA_LOG_ENCODING_TYPE);
void *archived_bucket = ALLOCATOR.mmap(off.path(), off.offset(), enc_size, aux);
ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::DATA_LOG_ENCODING_TYPE());
void *archived_bucket = ALLOCATOR.mmap(off.path(), static_cast<off_t>(off.offset()), enc_size, aux);
log_->data()[archival_tail_ / BUCKET_SIZE].swap_ptr(encoded_ptr<T>(archived_bucket));
}

Expand Down Expand Up @@ -116,8 +116,8 @@ class monolog_linear_load_utils {
incremental_file_offset off = reader.tell();
size_t size = reader.read<ptr_metadata>().data_size_;

ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::DATA_LOG_ENCODING_TYPE);
void *encoded_bucket = ALLOCATOR.mmap(off.path(), off.offset(), size, aux);
ptr_aux_block aux(state_type::D_ARCHIVED, archival_configuration_params::DATA_LOG_ENCODING_TYPE());
void *encoded_bucket = ALLOCATOR.mmap(off.path(), static_cast<off_t>(off.offset()), size, aux);
buckets[load_offset / BUCKET_SIZE].init_ptr(encoded_ptr<T>(encoded_bucket));

log.reserve(BUCKET_SIZE);
Expand Down
4 changes: 2 additions & 2 deletions libconfluo/confluo/atomic_multilog.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class atomic_multilog {
* @param bucket_size The size of the bucket
* @throw ex Management exception
*/
void add_index(const std::string &field_name, double bucket_size = configuration_params::INDEX_BUCKET_SIZE);
void add_index(const std::string &field_name, double bucket_size = configuration_params::INDEX_BUCKET_SIZE());

/**
* Removes index from the atomic multilog
Expand Down Expand Up @@ -215,7 +215,7 @@ class atomic_multilog {
*/
void install_trigger(const std::string &name,
const std::string &expr,
const uint64_t periodicity_ms = configuration_params::MONITOR_PERIODICITY_MS);
const uint64_t periodicity_ms = configuration_params::MONITOR_PERIODICITY_MS());

/**
* Removes trigger from the atomic multilog
Expand Down
69 changes: 55 additions & 14 deletions libconfluo/confluo/conf/configuration_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,51 @@

namespace confluo {

class conf {
public:
static utils::configuration_map &instance() {
static std::string
paths = utils::config_utils::read_from_env("CONFLUO_CONF", "/etc/conf/confluo.conf:./conf/confluo.conf");
static utils::configuration_map confluo_conf(paths);
return confluo_conf;
}
};

/**
* Confluo archival configuration parameters
*/
class archival_configuration_params {
public:
static uint64_t PERIODICITY_MS;
static uint64_t IN_MEMORY_DATALOG_WINDOW_BYTES;
static uint64_t IN_MEMORY_FILTER_WINDOW_NS;
static uint64_t PERIODICITY_MS() {
return conf::instance().get<uint64_t>("archival_periodicity_ms", archival_defaults::DEFAULT_PERIODICITY_MS());
}

static uint64_t IN_MEMORY_DATALOG_WINDOW_BYTES() {
return conf::instance().get<uint64_t>("archival_in_memory_datalog_window_bytes",
archival_defaults::DEFAULT_IN_MEMORY_DATALOG_WINDOW_BYTES());
}

static uint64_t IN_MEMORY_FILTER_WINDOW_NS() {
return conf::instance().get<uint64_t>("archival_in_memory_filter_window_ns",
archival_defaults::DEFAULT_IN_MEMORY_FILTER_WINDOW_NS());
}

// Maximum archival file size in bytes. Cannot be smaller than a data log bucket.
static size_t MAX_FILE_SIZE;
static size_t MAX_FILE_SIZE() {
return conf::instance().get<size_t>("max_archival_file_size", archival_defaults::DEFAULT_MAX_FILE_SIZE());
}

// Archival compression parameters; TODO parse types
static uint8_t DATA_LOG_ENCODING_TYPE;
static uint8_t REFLOG_ENCODING_TYPE;
static uint8_t DATA_LOG_ENCODING_TYPE() {
return configuration_parser::to_encoding_type(
conf::instance().get<std::string>("data_log_archival_encoding",
archival_defaults::DEFAULT_DATA_LOG_ENCODING_TYPE()));
}
static uint8_t REFLOG_ENCODING_TYPE() {
return configuration_parser::to_encoding_type(
conf::instance().get<std::string>("reflog_archival_encoding",
archival_defaults::DEFAULT_REFLOG_ENCODING_TYPE()));
}
};

/**
Expand All @@ -30,25 +60,36 @@ class archival_configuration_params {
class configuration_params {
public:
/** Memory configuration parameters */
static size_t MAX_MEMORY;
static size_t MAX_MEMORY() {
return conf::instance().get<size_t>("max_memory", defaults::DEFAULT_MAX_MEMORY());
}

/** Thread configuration parameters */
static int MAX_CONCURRENCY;
static int MAX_CONCURRENCY() {
return conf::instance().get<int>("max_concurrency", defaults::HARDWARE_CONCURRENCY());
}

/** Index configuration parameters */
static double INDEX_BUCKET_SIZE;
static double INDEX_BUCKET_SIZE() {
return conf::instance().get<double>("index_block_size", defaults::DEFAULT_INDEX_BUCKET_SIZE());
}

/** Time resolution */
static uint64_t TIME_RESOLUTION_NS;
static uint64_t TIME_RESOLUTION_NS() {
return conf::instance().get<uint64_t>("time_resolution_ns", defaults::DEFAULT_TIME_RESOLUTION_NS());
}

/** Monitor configuration parameters */
static uint64_t MONITOR_WINDOW_MS;
static uint64_t MONITOR_WINDOW_MS() {
return conf::instance().get<uint64_t>("monitor_window_ms", defaults::DEFAULT_MONITOR_WINDOW_MS());
}

/** Monitor periodicity in milliseconds */
static uint64_t MONITOR_PERIODICITY_MS;
static uint64_t MONITOR_PERIODICITY_MS() {
return conf::instance().get<uint64_t>("monitor_periodicity_ms", defaults::DEFAULT_MONITOR_PERIODICITY_MS());
}
};

extern utils::configuration_map confluo_conf;

}

#endif /* CONFLUO_CONF_CONFIGURATION_PARAMS_H_ */
21 changes: 13 additions & 8 deletions libconfluo/confluo/conf/configuration_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,26 @@ namespace confluo {

class encoding_params {
public:
static const std::string UNENCODED;
static const std::string LZ4;
static const std::string ELIAS_GAMMA;
static inline std::string UNENCODED() {
return "unencoded";
}

static inline std::string LZ4() {
return "lz4";
}
static inline std::string ELIAS_GAMMA() {
return "elias_gamma";
}
};

class configuration_parser {

public:

static uint8_t to_encoding_type(const std::string &param) {
if (param == encoding_params::UNENCODED) {
if (param == encoding_params::UNENCODED()) {
return storage::encoding_type::D_UNENCODED;
} else if (param == encoding_params::LZ4) {
} else if (param == encoding_params::LZ4()) {
return storage::encoding_type::D_LZ4;
} else if (param == encoding_params::ELIAS_GAMMA) {
} else if (param == encoding_params::ELIAS_GAMMA()) {
return storage::encoding_type::D_ELIAS_GAMMA;
} else {
THROW(illegal_state_exception, "Invalid encoding type!");
Expand Down
63 changes: 50 additions & 13 deletions libconfluo/confluo/conf/defaults.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,30 @@ namespace confluo {
*/
class archival_defaults {
public:
static const uint64_t DEFAULT_PERIODICITY_MS = static_cast<const uint64_t>(5 * 60 * 1e3);
static const size_t DEFAULT_MAX_FILE_SIZE = 1024 * 1024 * 1024;
static const std::string DEFAULT_DATA_LOG_ENCODING_TYPE;
static const std::string DEFAULT_REFLOG_ENCODING_TYPE;
static inline uint64_t DEFAULT_PERIODICITY_MS() {
return static_cast<const uint64_t>(5 * 60 * 1e3);
}

static inline size_t DEFAULT_MAX_FILE_SIZE() {
return 1024 * 1024 * 1024;
}

static inline std::string DEFAULT_DATA_LOG_ENCODING_TYPE() {
return encoding_params::LZ4();
}

static inline std::string DEFAULT_REFLOG_ENCODING_TYPE() {
return encoding_params::ELIAS_GAMMA();
}

// TODO % of physical memory
static const uint64_t DEFAULT_IN_MEMORY_DATALOG_WINDOW_BYTES = static_cast<const uint64_t>(1e10);
static const uint64_t DEFAULT_IN_MEMORY_FILTER_WINDOW_NS = static_cast<const uint64_t>(10 * 1e3);
static inline uint64_t DEFAULT_IN_MEMORY_DATALOG_WINDOW_BYTES() {
return static_cast<const uint64_t>(1e10);
}

static inline uint64_t DEFAULT_IN_MEMORY_FILTER_WINDOW_NS() {
return static_cast<const uint64_t>(10 * 1e3);
}
};

/**
Expand All @@ -27,19 +44,39 @@ class archival_defaults {
class defaults {
public:
/** The thread hardware concurrency */
static const int HARDWARE_CONCURRENCY;
static inline int HARDWARE_CONCURRENCY() {
return std::thread::hardware_concurrency();
}

/** Default bucket size for index */
static constexpr double DEFAULT_INDEX_BUCKET_SIZE = 1.0;
static inline double DEFAULT_INDEX_BUCKET_SIZE() {
return 1.0;
}

/** Default time resolution in nanoseconds */
static constexpr uint64_t DEFAULT_TIME_RESOLUTION_NS = static_cast<const uint64_t>(1e6);
static inline uint64_t DEFAULT_TIME_RESOLUTION_NS() {
return static_cast<const uint64_t>(1e6);
}

/** Default maximum amount of memory */
static constexpr size_t DEFAULT_MAX_MEMORY = static_cast<const size_t>(1e9);
static inline size_t DEFAULT_MAX_MEMORY() {
return static_cast<const size_t>(1e9);
}

/** Default memory monitor periodicity in milliseconds */
static constexpr uint64_t DEFAULT_MEMORY_MONITOR_PERIODICITY_MS = 1;
static inline uint64_t DEFAULT_MEMORY_MONITOR_PERIODICITY_MS() {
return 1;
}

/** Default monitor window in milliseoncds */
static constexpr uint64_t DEFAULT_MONITOR_WINDOW_MS = 10;
static inline uint64_t DEFAULT_MONITOR_WINDOW_MS() {
return 10;
}

/** Default periodicity for monitor in milliseconds */
static constexpr uint64_t DEFAULT_MONITOR_PERIODICITY_MS = 1;
static inline uint64_t DEFAULT_MONITOR_PERIODICITY_MS() {
return 1;
}
};

}
Expand Down
9 changes: 5 additions & 4 deletions libconfluo/confluo/container/monolog/monolog_linear_bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class monolog_linear_bucket {
* @return The storage size in bytes
*/
size_t storage_size() const {
if (data_.atomic_load() != nullptr)
if (data_.atomic_load().ptr() != nullptr)
return (size_ + BUFFER_SIZE) * sizeof(T);
return 0;
}
Expand All @@ -99,7 +99,8 @@ class monolog_linear_bucket {
*/
void flush(size_t offset, size_t len) {
storage::encoded_ptr<T> enc_ptr = data_.atomic_load();
storage::STORAGE_FNS[mode_].flush(static_cast<T *>(enc_ptr.ptr()) + offset, len * sizeof(T));
storage::storage_mode_functions::STORAGE_FNS()[mode_].flush(static_cast<T *>(enc_ptr.ptr()) + offset,
len * sizeof(T));
}

/**
Expand Down Expand Up @@ -251,7 +252,7 @@ class monolog_linear_bucket {
block_state state = UNINIT;
if (atomic::strong::cas(&state_, &state, INIT)) {
size_t file_size = (size_ + BUFFER_SIZE) * sizeof(T);
void *data_ptr = storage::STORAGE_FNS[mode_].allocate_bucket(path_, file_size);
void *data_ptr = storage::storage_mode_functions::STORAGE_FNS()[mode_].allocate_bucket(path_, file_size);
memset(data_ptr, '\0', sizeof(T) * file_size);
storage::encoded_ptr<T> enc_ptr(data_ptr);
data_.atomic_init(enc_ptr);
Expand All @@ -273,7 +274,7 @@ class monolog_linear_bucket {
block_state state = UNINIT;
if (atomic::strong::cas(&state_, &state, INIT)) {
size_t file_size = (size_ + BUFFER_SIZE) * sizeof(T);
void *data_ptr = storage::STORAGE_FNS[mode_].allocate_bucket(path_, file_size);
void *data_ptr = storage::storage_mode_functions::STORAGE_FNS()[mode_].allocate_bucket(path_, file_size);
memset(data_ptr, '\0', sizeof(T) * file_size);
storage::encoded_ptr<T> enc_ptr(data_ptr);
data_.atomic_init(enc_ptr);
Expand Down
5 changes: 1 addition & 4 deletions libconfluo/confluo/container/reflog.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ class reflog_constants {
* typedef for RefLog type -- a MonoLog of type uint64_t,
* 18 bucket containers and bucket size of 1024.
*/
typedef monolog_exp2_linear<uint64_t,
reflog_constants::NCONTAINERS,
reflog_constants::BUCKET_SIZE> reflog;

typedef monolog_exp2_linear<uint64_t, reflog_constants::NCONTAINERS, reflog_constants::BUCKET_SIZE> reflog;
typedef storage::read_only_encoded_ptr<uint64_t> read_only_reflog_ptr;
typedef storage::encoded_ptr<uint64_t> encoded_reflog_ptr;
typedef storage::decoded_ptr<uint64_t> decoded_reflog_ptr;
Expand Down
19 changes: 11 additions & 8 deletions libconfluo/confluo/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,20 @@ struct durable {
}
};

/** Storage functionality for in memory mode */
extern storage_functions IN_MEMORY_FNS;
class storage_mode_functions {
public:
/** Storage functionality for in memory mode */
static storage_functions &IN_MEMORY_FNS();

/** Storage functionality for durable relaxed mode */
extern storage_functions DURABLE_RELAXED_FNS;
/** Storage functionality for durable relaxed mode */
static storage_functions &DURABLE_RELAXED_FNS();

/** Storage functionality for durable mode */
extern storage_functions DURABLE_FNS;
/** Storage functionality for durable mode */
static storage_functions &DURABLE_FNS();

/** Contains the storage functions for all storage modes */
extern storage_functions STORAGE_FNS[3];
/** Contains the storage functions for all storage modes */
static storage_functions *STORAGE_FNS();
};

}
}
Expand Down
Loading

0 comments on commit 63dc722

Please sign in to comment.