Skip to content

Commit

Permalink
mutation_writer: bucket_writer: add close
Browse files Browse the repository at this point in the history
bucket_writer::close waits for the _consumer_fut.
It is called both after consume_end_of_stream()
and after abort().

_consumer_fut is expected to return an exception
on the abort path.  Wait for it and drop any exception
so it won't be abandoned as seen in scylladb#7904.

With that moved to close() time, consume_end_of_stream
doesn't need to return a future and is made void
all the way in the stack.  This is ok since
queue_reader_handle::push_end_of_stream is synchronous too.

Added a unit test that aborts the reader consumer
during `segregate_by_timestamp`, reproducing the
Exceptional future ignored issue without the fix.

Signed-off-by: Benny Halevy <[email protected]>
  • Loading branch information
bhalevy authored and syuu1228 committed Jan 25, 2021
1 parent 58ce5f3 commit b96a45c
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 18 deletions.
7 changes: 5 additions & 2 deletions mutation_writer/feed_writers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@ future<> bucket_writer::consume(mutation_fragment mf) {
return _handle.push(std::move(mf));
}

future<> bucket_writer::consume_end_of_stream() {
void bucket_writer::consume_end_of_stream() {
_handle.push_end_of_stream();
return std::move(_consume_fut);
}

void bucket_writer::abort(std::exception_ptr ep) noexcept {
_handle.abort(std::move(ep));
}

future<> bucket_writer::close() noexcept {
return std::move(_consume_fut);
}

} // mutation_writer
16 changes: 13 additions & 3 deletions mutation_writer/feed_writers.hh
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ public:

future<> consume(mutation_fragment mf);

future<> consume_end_of_stream();
void consume_end_of_stream();

void abort(std::exception_ptr ep) noexcept;

future<> close() noexcept;
};

template <typename Writer>
Expand All @@ -59,9 +61,17 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
if (f.failed()) {
auto ex = f.get_exception();
wr.abort(ex);
return make_exception_future<>(ex);
return wr.close().then_wrapped([ex = std::move(ex)] (future<> f) mutable {
if (f.failed()) {
// The consumer is expected to fail when aborted,
// so just ignore any exception.
(void)f.get_exception();
}
return make_exception_future<>(std::move(ex));
});
} else {
return wr.consume_end_of_stream();
wr.consume_end_of_stream();
return wr.close();
}
});
});
Expand Down
16 changes: 10 additions & 6 deletions mutation_writer/shard_based_splitting_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ class shard_based_splitting_mutation_writer {
return write_to_shard(mutation_fragment(*_schema, _permit, std::move(pe)));
}

future<> consume_end_of_stream() {
return parallel_for_each(_shards, [] (std::optional<shard_writer>& shard) {
if (!shard) {
return make_ready_future<>();
void consume_end_of_stream() {
for (auto& shard : _shards) {
if (shard) {
shard->consume_end_of_stream();
}
return shard->consume_end_of_stream();
});
}
}
void abort(std::exception_ptr ep) {
for (auto&& shard : _shards) {
Expand All @@ -91,6 +90,11 @@ class shard_based_splitting_mutation_writer {
}
}
}
future<> close() noexcept {
return parallel_for_each(_shards, [] (std::optional<shard_writer>& shard) {
return shard ? shard->close() : make_ready_future<>();
});
}
};

future<> segregate_by_shard(flat_mutation_reader producer, reader_consumer consumer) {
Expand Down
13 changes: 9 additions & 4 deletions mutation_writer/timestamp_based_splitting_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,21 @@ class timestamp_based_splitting_mutation_writer {
future<> consume(range_tombstone&& rt);
future<> consume(partition_end&& pe);

future<> consume_end_of_stream() {
return parallel_for_each(_buckets, [] (std::pair<const bucket_id, timestamp_bucket_writer>& bucket) {
return bucket.second.consume_end_of_stream();
});
void consume_end_of_stream() {
for (auto& b : _buckets) {
b.second.consume_end_of_stream();
}
}
void abort(std::exception_ptr ep) {
for (auto&& b : _buckets) {
b.second.abort(ep);
}
}
future<> close() noexcept {
return parallel_for_each(_buckets, [] (std::pair<const bucket_id, timestamp_bucket_writer>& b) {
return b.second.close();
});
}
};

future<> timestamp_based_splitting_mutation_writer::write_to_bucket(bucket_id bucket, mutation_fragment&& mf) {
Expand Down
80 changes: 77 additions & 3 deletions test/boost/mutation_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ class test_bucket_writer {
mutation_opt _current_mutation;
bool _is_first_mutation = true;

size_t _throw_after;
size_t _mutation_consumed = 0;

public:
class expected_exception : public std::exception {
public:
virtual const char* what() const noexcept override {
return "expected_exception";
}
};

private:
void check_timestamp(api::timestamp_type ts) {
const auto bucket_id = _classify(ts);
Expand Down Expand Up @@ -223,40 +234,53 @@ class test_bucket_writer {
check_timestamp(rt.tomb.timestamp);
}

void maybe_throw() {
if (_mutation_consumed++ >= _throw_after) {
throw(expected_exception());
}
}

public:
test_bucket_writer(schema_ptr schema, classify_by_timestamp classify, std::unordered_map<int64_t, std::vector<mutation>>& buckets)
test_bucket_writer(schema_ptr schema, classify_by_timestamp classify, std::unordered_map<int64_t, std::vector<mutation>>& buckets, size_t throw_after = std::numeric_limits<size_t>::max())
: _schema(std::move(schema))
, _classify(std::move(classify))
, _buckets(buckets) {
}
, _buckets(buckets)
, _throw_after(throw_after)
{ }
void consume_new_partition(const dht::decorated_key& dk) {
maybe_throw();
BOOST_REQUIRE(!_current_mutation);
_current_mutation = mutation(_schema, dk);
}
void consume(tombstone partition_tombstone) {
maybe_throw();
BOOST_REQUIRE(_current_mutation);
verify_partition_tombstone(partition_tombstone);
_current_mutation->partition().apply(partition_tombstone);
}
stop_iteration consume(static_row&& sr) {
maybe_throw();
BOOST_REQUIRE(_current_mutation);
verify_static_row(sr);
_current_mutation->apply(mutation_fragment(*_schema, tests::make_permit(), std::move(sr)));
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr) {
maybe_throw();
BOOST_REQUIRE(_current_mutation);
verify_clustering_row(cr);
_current_mutation->apply(mutation_fragment(*_schema, tests::make_permit(), std::move(cr)));
return stop_iteration::no;
}
stop_iteration consume(range_tombstone&& rt) {
maybe_throw();
BOOST_REQUIRE(_current_mutation);
verify_range_tombstone(rt);
_current_mutation->apply(mutation_fragment(*_schema, tests::make_permit(), std::move(rt)));
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
maybe_throw();
BOOST_REQUIRE(_current_mutation);
BOOST_REQUIRE(_bucket_id);
auto& bucket = _buckets[*_bucket_id];
Expand Down Expand Up @@ -342,3 +366,53 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer) {
}

}

SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer_abort) {
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};

testlog.info("Random schema:\n{}", random_schema.cql());

auto ts_gen = [&, underlying = tests::default_timestamp_generator()] (std::mt19937& engine,
tests::timestamp_destination ts_dest, api::timestamp_type min_timestamp) -> api::timestamp_type {
if (ts_dest == tests::timestamp_destination::partition_tombstone ||
ts_dest == tests::timestamp_destination::row_marker ||
ts_dest == tests::timestamp_destination::row_tombstone ||
ts_dest == tests::timestamp_destination::collection_tombstone) {
if (tests::random::get_int<int>(0, 10, engine)) {
return api::missing_timestamp;
}
}
return underlying(engine, ts_dest, min_timestamp);
};

auto muts = tests::generate_random_mutations(random_schema, ts_gen).get0();

auto classify_fn = [] (api::timestamp_type ts) {
return int64_t(ts % 2);
};

std::unordered_map<int64_t, std::vector<mutation>> buckets;

int throw_after = tests::random::get_int(muts.size() - 1);
testlog.info("Will raise exception after {}/{} mutations", throw_after, muts.size());
auto consumer = [&] (flat_mutation_reader bucket_reader) {
return do_with(std::move(bucket_reader), [&] (flat_mutation_reader& rd) {
return rd.consume(test_bucket_writer(random_schema.schema(), classify_fn, buckets, throw_after), db::no_timeout);
});
};

try {
segregate_by_timestamp(flat_mutation_reader_from_mutations(tests::make_permit(), muts), classify_fn, std::move(consumer)).get();
} catch (const test_bucket_writer::expected_exception&) {
BOOST_TEST_PASSPOINT();
} catch (const seastar::broken_promise&) {
// Tolerated until we properly abort readers
BOOST_TEST_PASSPOINT();
}
}

0 comments on commit b96a45c

Please sign in to comment.