Skip to content

Commit

Permalink
Implemented writing to ColumnString directly via StringWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
Enmk committed Mar 30, 2023
1 parent 31a8a7b commit 89ebe61
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 100 deletions.
47 changes: 22 additions & 25 deletions src/Functions/keyvaluepair/impl/CHKeyValuePairExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Functions/keyvaluepair/impl/StateHandler.h>
#include <Functions/keyvaluepair/impl/KeyValuePairExtractor.h>

// TODO: debug stuff, remove it before merging
#include <fmt/core.h>
#include <magic_enum.hpp>

Expand All @@ -16,9 +17,6 @@ namespace DB
template <typename StateHandler>
class CHKeyValuePairExtractor : public KeyValuePairExtractor
{
using Key = typename StateHandler::KeyType;
using Value = typename StateHandler::ValueType;

using State = typename DB::extractKV::StateHandler::State;
using NextState = DB::extractKV::StateHandler::NextState;

Expand All @@ -37,8 +35,8 @@ class CHKeyValuePairExtractor : public KeyValuePairExtractor
// std::cerr << "CHKeyValuePairExtractor::extract: \"" << data << "\"" << std::endl;
auto state = State::WAITING_KEY;

Key key;
Value value;
extractKV::StringWriter key(*keys);
extractKV::StringWriter value(*values);

uint64_t row_offset = 0;
const auto & config = state_handler.extractor_configuration;
Expand All @@ -59,7 +57,7 @@ class CHKeyValuePairExtractor : public KeyValuePairExtractor
<< fancyQuote(data)
<< std::endl;

next_state = processState(data, state, key, value, keys, values, row_offset);
next_state = processState(data, state, key, value, row_offset);

std::cerr << "CHKeyValuePairExtractor::extract 2, new_state: "
<< magic_enum::enum_name(next_state.state)
Expand All @@ -83,18 +81,21 @@ class CHKeyValuePairExtractor : public KeyValuePairExtractor
break;
}

// TODO (vnemkov): consider removing, we should reach FLUSH_PAIR state from state machine.
// if break occured earlier, consume previously generated pair
if (state == State::FLUSH_PAIR || !(key.empty() && value.empty()))
flushPair(data, key, value, keys, values, row_offset);
if (state == State::FLUSH_PAIR || !(key.isEmpty() && value.isEmpty()))
flushPair(data, key, value, row_offset);

keys->validate();
values->validate();

return row_offset;
}

private:

NextState processState(std::string_view file, State state, Key & key,
Value & value, ColumnString::MutablePtr & keys,
ColumnString::MutablePtr & values, uint64_t & row_offset)
NextState processState(std::string_view file, State state, extractKV::StringWriter & key,
extractKV::StringWriter & value, uint64_t & row_offset)
{
switch (state)
{
Expand All @@ -103,13 +104,13 @@ class CHKeyValuePairExtractor : public KeyValuePairExtractor
case State::READING_KEY:
{
auto result = state_handler.readKey(file, key);
std::cerr << "CHKeyValuePairExtractor::processState key: " << fancyQuote(key) << std::endl;
std::cerr << "CHKeyValuePairExtractor::processState key: " << fancyQuote(key.uncommittedChunk()) << std::endl;
return result;
}
case State::READING_QUOTED_KEY:
{
auto result = state_handler.readQuotedKey(file, key);
std::cerr << "CHKeyValuePairExtractor::processState key: " << fancyQuote(key) << std::endl;
std::cerr << "CHKeyValuePairExtractor::processState key: " << fancyQuote(key.uncommittedChunk()) << std::endl;
return result;
}
case State::READING_KV_DELIMITER:
Expand All @@ -121,33 +122,29 @@ class CHKeyValuePairExtractor : public KeyValuePairExtractor
case State::READING_VALUE:
{
auto result = state_handler.readValue(file, value);
std::cerr << "CHKeyValuePairExtractor::processState value: " << fancyQuote(value) << std::endl;
std::cerr << "CHKeyValuePairExtractor::processState value: " << fancyQuote(value.uncommittedChunk()) << std::endl;
return result;
}
case State::READING_QUOTED_VALUE:
{
auto result = state_handler.readQuotedValue(file, value);
std::cerr << "CHKeyValuePairExtractor::processState value: " << fancyQuote(value) << std::endl;
std::cerr << "CHKeyValuePairExtractor::processState value: " << fancyQuote(value.uncommittedChunk()) << std::endl;
return result;
}
case State::FLUSH_PAIR:
return flushPair(file, key, value, keys, values, row_offset);
return flushPair(file, key, value, row_offset);

case State::END:
return {0, state};
}
}

NextState flushPair(const std::string_view & file, Key & key,
Value & value, ColumnString::MutablePtr & keys,
ColumnString::MutablePtr & values, uint64_t & row_offset)
NextState flushPair(const std::string_view & file, extractKV::StringWriter & key,
extractKV::StringWriter & value, uint64_t & row_offset)
{
std::cerr << "CHKeyValuePairExtractor::flushPair key: " << fancyQuote(key) << ", value: " << fancyQuote(value) << std::endl;
keys->insertData(key.data(), key.size());
values->insertData(value.data(), value.size());

key = {};
value = {};
std::cerr << "CHKeyValuePairExtractor::flushPair key: " << fancyQuote(key.uncommittedChunk()) << ", value: " << fancyQuote(value.uncommittedChunk()) << std::endl;
key.commit();
value.commit();

++row_offset;
std::cerr << "CHKeyValuePairExtractor::flushPair total pairs: " << row_offset << std::endl;
Expand Down
48 changes: 27 additions & 21 deletions src/Functions/keyvaluepair/impl/InlineEscapingStateHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@

namespace
{
size_t consumeWithEscapeSequence(std::string_view file, size_t start_pos, size_t character_pos, std::string & output)
size_t consumeWithEscapeSequence(std::string_view file, size_t start_pos, size_t character_pos, DB::extractKV::StringWriter & output)
{
output.insert(output.end(), file.begin() + start_pos, file.begin() + character_pos);
output.append(file.begin() + start_pos, file.begin() + character_pos);

std::string tmp_out;
DB::ReadBufferFromMemory buf(file.begin() + character_pos, file.size() - character_pos);
DB::parseComplexEscapeSequence(output, buf);

DB::parseComplexEscapeSequence(tmp_out, buf);
output.append(tmp_out);

return buf.getPosition();
}
Expand Down Expand Up @@ -62,11 +65,11 @@ NextState InlineEscapingStateHandler::waitKey(std::string_view file) const
* If I find a key value delimiter and that is empty, I do not need to copy? hm,m hm hm
* */

NextState InlineEscapingStateHandler::readKey(std::string_view file, KeyType & key) const
NextState InlineEscapingStateHandler::readKey(std::string_view file, StringWriter & key) const
{
const auto & [key_value_delimiter, _, pair_delimiters] = extractor_configuration;

key.clear();
key.reset();

size_t pos = 0;
while (const auto * p = find_first_symbols_or_null({file.begin() + pos, file.end()}, read_needles))
Expand All @@ -83,9 +86,10 @@ NextState InlineEscapingStateHandler::readKey(std::string_view file, KeyType & k
return {next_pos, State::WAITING_KEY};
}
}
else if (*p == key_value_delimiter)
else
if (*p == key_value_delimiter)
{
key.insert(key.end(), file.begin() + pos, file.begin() + character_position);
key.append(file.begin() + pos, file.begin() + character_position);

return {next_pos, State::WAITING_VALUE};
}
Expand All @@ -102,11 +106,11 @@ NextState InlineEscapingStateHandler::readKey(std::string_view file, KeyType & k
return {file.size(), State::END};
}

NextState InlineEscapingStateHandler::readQuotedKey(std::string_view file, KeyType & key) const
NextState InlineEscapingStateHandler::readQuotedKey(std::string_view file, StringWriter & key) const
{
const auto quoting_character = extractor_configuration.quoting_character;

key.clear();
key.reset();

size_t pos = 0;
while (const auto * p = find_first_symbols_or_null({file.begin() + pos, file.end()}, read_quoted_needles))
Expand All @@ -126,9 +130,9 @@ NextState InlineEscapingStateHandler::readQuotedKey(std::string_view file, KeyTy
}
else if (*p == quoting_character)
{
key.insert(key.end(), file.begin() + pos, file.begin() + character_position);
key.append(file.begin() + pos, file.begin() + character_position);

if (key.empty())
if (key.isEmpty())
{
return {next_pos, State::WAITING_KEY};
}
Expand Down Expand Up @@ -172,11 +176,11 @@ NextState InlineEscapingStateHandler::waitValue(std::string_view file) const
return {pos, State::READING_VALUE};
}

NextState InlineEscapingStateHandler::readValue(std::string_view file, ValueType & value) const
NextState InlineEscapingStateHandler::readValue(std::string_view file, StringWriter & value) const
{
const auto & [key_value_delimiter, _, pair_delimiters] = extractor_configuration;

value.clear();
value.reset();

size_t pos = 0;
while (const auto * p = find_first_symbols_or_null({file.begin() + pos, file.end()}, read_needles))
Expand All @@ -192,19 +196,20 @@ NextState InlineEscapingStateHandler::readValue(std::string_view file, ValueType
{
// It is agreed that value with an invalid escape seqence in it
// is considered malformed and shoudn't be included in result.
value.clear();
value.reset();
return {next_pos, State::WAITING_KEY};
}
}
else if (*p == key_value_delimiter)
else
if (*p == key_value_delimiter)
{
// reached new key
return {next_pos, State::WAITING_KEY};
}
else if (std::find(pair_delimiters.begin(), pair_delimiters.end(), *p) != pair_delimiters.end())
{
// reached next pair
value.insert(value.end(), file.begin() + pos, file.begin() + character_position);
value.append(file.begin() + pos, file.begin() + character_position);

return {next_pos, State::FLUSH_PAIR};
}
Expand All @@ -213,17 +218,17 @@ NextState InlineEscapingStateHandler::readValue(std::string_view file, ValueType
}

// Reached end of input, consume rest of the file as value and make sure KV pair is produced.
value.insert(value.end(), file.begin() + pos, file.end());
value.append(file.begin() + pos, file.end());
return {file.size(), State::FLUSH_PAIR};
}

NextState InlineEscapingStateHandler::readQuotedValue(std::string_view file, ValueType & value) const
NextState InlineEscapingStateHandler::readQuotedValue(std::string_view file, StringWriter & value) const
{
const auto quoting_character = extractor_configuration.quoting_character;

size_t pos = 0;

value.clear();
value.reset();

while (const auto * p = find_first_symbols_or_null({file.begin() + pos, file.end()}, read_quoted_needles))
{
Expand All @@ -239,9 +244,10 @@ NextState InlineEscapingStateHandler::readQuotedValue(std::string_view file, Val
return {next_pos, State::WAITING_KEY};
}
}
else if (*p == quoting_character)
else
if (*p == quoting_character)
{
value.insert(value.end(), file.begin() + pos, file.begin() + character_position);
value.append(file.begin() + pos, file.begin() + character_position);

return {next_pos, State::FLUSH_PAIR};
}
Expand Down
11 changes: 4 additions & 7 deletions src/Functions/keyvaluepair/impl/InlineEscapingStateHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,15 @@ namespace extractKV
class InlineEscapingStateHandler : public StateHandler
{
public:
using KeyType = std::string;
using ValueType = std::string;

explicit InlineEscapingStateHandler(Configuration configuration_);

[[nodiscard]] NextState waitKey(std::string_view file) const;
[[nodiscard]] NextState readKey(std::string_view file, KeyType & key) const;
[[nodiscard]] NextState readQuotedKey(std::string_view file, KeyType & key) const;
[[nodiscard]] NextState readKey(std::string_view file, StringWriter & key) const;
[[nodiscard]] NextState readQuotedKey(std::string_view file, StringWriter & key) const;
[[nodiscard]] NextState readKeyValueDelimiter(std::string_view file) const;
[[nodiscard]] NextState waitValue(std::string_view file) const;
[[nodiscard]] NextState readValue(std::string_view file, ValueType & value) const;
[[nodiscard]] NextState readQuotedValue(std::string_view file, ValueType & value) const;
[[nodiscard]] NextState readValue(std::string_view file, StringWriter & value) const;
[[nodiscard]] NextState readQuotedValue(std::string_view file, StringWriter & value) const;

const Configuration extractor_configuration;

Expand Down
Loading

0 comments on commit 89ebe61

Please sign in to comment.