Skip to content

Commit

Permalink
Implemented writing in binary
Browse files Browse the repository at this point in the history
  • Loading branch information
hosseinmoein committed May 15, 2024
1 parent 1adfdbf commit 30c4213
Show file tree
Hide file tree
Showing 9 changed files with 422 additions and 19 deletions.
19 changes: 19 additions & 0 deletions include/DataFrame/Internals/DataFrame_functors.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,25 @@ struct print_csv_functor_ : DataVec::template visitor_base<Ts ...> {
void operator() (const T &vec);
};

// ----------------------------------------------------------------------------

template<typename ... Ts>
struct print_binary_functor_ : DataVec::template visitor_base<Ts ...> {

inline print_binary_functor_ (const char *n,
std::ostream &o,
long sr,
long er)
: name(n), os(o), start_row(sr), end_row(er) { }

const char *name;
std::ostream &os;
const long start_row;
const long end_row;

template<typename T>
void operator() (const T &vec);
};

// ----------------------------------------------------------------------------

Expand Down
44 changes: 34 additions & 10 deletions include/DataFrame/Internals/DataFrame_misc.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -197,23 +197,47 @@ DataFrame<I, H>::print_csv_functor_<Ts ...>::operator() (const T &vec) {
using VecType = typename std::remove_reference<T>::type;
using ValueType = typename VecType::value_type;

_write_csv_df_header_<std::ostream, ValueType>(os, name, vec.size()) << ':';
_write_csv_df_header_<std::ostream, ValueType>(os, name, vec.size())
<< ':';

const long vec_size = vec.size();
const long sr = std::min(start_row, vec_size);
const long er = std::min(end_row, vec_size);

if (vec_size > 0) {
for (long i = sr; i < er; ++i)
_write_csv_df_index_(os, vec[i]) << ',';
}
for (long i = sr; i < er; ++i)
_write_csv_df_index_(os, vec[i]) << ',';
os << '\n';

return;
}

// ----------------------------------------------------------------------------

template<typename I, typename H>
template<typename ... Ts>
template<typename T>
void
DataFrame<I, H>::print_binary_functor_<Ts ...>::operator() (const T &vec) {

using VecType = typename std::remove_reference<T>::type;
using ValueType = typename VecType::value_type;

char col_name[64];

std::strncpy(col_name, name, sizeof(col_name));
os.write(col_name, sizeof(col_name));
if constexpr (std::is_same_v<ValueType, std::string>)
_write_binary_string_(os, vec);
else if constexpr (std::is_same_v<ValueType, DateTime>)
_write_binary_datetime_(os, vec);
else
_write_binary_data_(os, vec);

return;
}

// ----------------------------------------------------------------------------

template<typename I, typename H>
template<typename ... Ts>
template<typename T>
Expand Down Expand Up @@ -696,7 +720,7 @@ operator() (T &vec) {

using VecType = typename std::remove_reference<T>::type;
using ValueType = typename VecType::value_type;
using ViewType = typename DF::template ColumnVecType<ValueType>;
using ViewType = typename DF::template ColumnVecType<ValueType>;

ViewType new_col;
const size_type vec_size = vec.size();
Expand Down Expand Up @@ -737,12 +761,12 @@ operator() (T &vec) const {
if (sel_indices[i] < vec_s) {
if constexpr (std::is_base_of<HeteroVector<align_value>, H>::value)
vec.erase(vec.begin() + (sel_indices[i] - del_count++));
else
else
vec.erase(sel_indices[i] - del_count++);
}
}
else
break;
}
}
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -772,7 +796,7 @@ random_load_data_functor_<DF, Ts ...>::operator() (const T &vec) {

const size_type vec_s = vec.size();
const size_type n_rows = rand_indices.size();
typename DF::template ColumnVecType<ValueType> new_vec;
typename DF::template ColumnVecType<ValueType> new_vec;
size_type prev_value { 0 };

new_vec.reserve(n_rows);
Expand Down
2 changes: 1 addition & 1 deletion include/DataFrame/Internals/DataFrame_private_decl.h
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ col_vector_push_back_cont_func_(V &vec,

value.reserve(2048);
while (file.get(c)) [[likely]] {
if (c == '\n') break;
if (c == '\n') [[unlikely]] break;
file.unget();
value.clear();
_get_token_from_file_(file, ',', value, '\0');
Expand Down
90 changes: 89 additions & 1 deletion include/DataFrame/Internals/DataFrame_standalone.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <DataFrame/Utils/Threads/ThreadGranularity.h>

#include <cctype>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <future>
Expand Down Expand Up @@ -881,6 +882,93 @@ inline static S &_write_csv_df_index_(S &o, unsigned char value) {

// ----------------------------------------------------------------------------

template<typename STRM, typename V>
inline static STRM &_write_binary_string_(STRM &strm, const V &str_vec) {

char buffer[32];

std::strncpy(buffer, "string", sizeof(buffer));
strm.write(buffer, sizeof(buffer));

const uint64_t vec_size = str_vec.size();

strm.write(reinterpret_cast<const char *>(&vec_size), sizeof(vec_size));
for (const auto &str : str_vec) {
strm.write(str.data(), str.size());
strm.put('\0');
}
return (strm);
}

// ----------------------------------------------------------------------------

template<typename STRM, typename V>
inline static STRM &_write_binary_data_(STRM &strm, const V &vec) {

using VecType = typename std::remove_reference<V>::type;
using ValueType = typename VecType::value_type;

char buffer[32];
const auto &citer = _typeinfo_name_.find(typeid(ValueType));

if (citer != _typeinfo_name_.end()) [[likely]]
std::strncpy(buffer, citer->second, sizeof(buffer));
else
std::strncpy(buffer, "N/A", sizeof(buffer));
strm.write(buffer, sizeof(buffer));

const uint64_t vec_size = vec.size();

strm.write(reinterpret_cast<const char *>(&vec_size), sizeof(vec_size));
if constexpr (std::is_same_v<ValueType, bool>) {
for (const auto &b : vec) {
const bool bval = b;

strm.write(reinterpret_cast<const char *>(&bval), sizeof(bool));
}
}
else {
// Views don't have the data() method
//
constexpr bool has_data_method =
requires(const VecType &v) { v.data(); };

if constexpr (has_data_method) {
strm.write(reinterpret_cast<const char *>(vec.data()),
sizeof(vec_size) * sizeof(ValueType));
}
else {
for (std::size_t i = 0; i < vec.size(); ++i)
strm.write(reinterpret_cast<const char *>(&(vec[i])),
sizeof(ValueType));
}
}
return (strm);
}

// ----------------------------------------------------------------------------

template<typename STRM, typename V>
inline static STRM &_write_binary_datetime_(STRM &strm, const V &dt_vec) {

char buffer[32];

std::strncpy(buffer, "DateTime", sizeof(buffer));
strm.write(buffer, sizeof(buffer));

const uint64_t vec_size = dt_vec.size();

strm.write(reinterpret_cast<const char *>(&vec_size), sizeof(vec_size));
for (const auto &dt : dt_vec) {
const double val = static_cast<double>(dt);

strm.write(reinterpret_cast<const char *>(&val), sizeof(double));
}
return (strm);
}

// ----------------------------------------------------------------------------

//
// Specializing std::hash for tuples
//
Expand Down Expand Up @@ -1203,7 +1291,7 @@ struct _LikeClauseUtil_ {
//
// NOTE: This could be, in some cases, n-squared. But it is pretty fast with
// moderately sized strings. I have not tested this with huge/massive
// strings.
// strings.
//
static inline bool
_like_clause_compare_(const char *pattern,
Expand Down
32 changes: 29 additions & 3 deletions include/DataFrame/Internals/DataFrame_write.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#include <DataFrame/DataFrame.h>
#include <DataFrame/Utils/Endianness.h>
#include <DataFrame/Utils/Utils.h>

// ----------------------------------------------------------------------------
Expand All @@ -45,7 +46,7 @@ write(const char *file_name,
long max_recs) const {

std::ofstream stream;
const IOStreamOpti io_opti(stream, file_name);
const IOStreamOpti io_opti(stream, file_name, iof == io_format::binary);

if (stream.fail()) [[unlikely]] {
String1K err;
Expand Down Expand Up @@ -83,7 +84,8 @@ write(S &o,

if (iof != io_format::csv &&
iof != io_format::json &&
iof != io_format::csv2)
iof != io_format::csv2 &&
iof != io_format::binary)
throw NotImplemented("write(): This io_format is not implemented");

bool need_pre_comma = false;
Expand Down Expand Up @@ -193,10 +195,34 @@ write(S &o,
o << '\n';
}
}
else if (iof == io_format::binary) {
const auto endianness = get_system_endian();

o.write(reinterpret_cast<const char *>(&endianness), sizeof(endians));

print_binary_functor_<Ts ...> idx_functor (DF_INDEX_COL_NAME,
o,
start_row,
end_row);

idx_functor(indices_);

const SpinGuard guard(lock_);

for (const auto &[name, idx] : column_list_) [[likely]] {
print_binary_functor_<Ts ...> functor (name.c_str(),
o,
start_row,
end_row);

data_[idx].change(functor);
}
}

if (iof == io_format::json)
o << "\n}";
o << std::endl;
if (iof != io_format::binary)
o << std::endl;
return (true);
}

Expand Down
Loading

0 comments on commit 30c4213

Please sign in to comment.