Skip to content

Commit

Permalink
misc: Add comments (#4863)
Browse files Browse the repository at this point in the history
ref #4862
  • Loading branch information
breezewish authored May 13, 2022
1 parent 2dc22fd commit 8bb0622
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 41 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class ColumnFile
virtual ColumnFileReaderPtr
getReader(const DMContext & context, const StorageSnapshotPtr & storage_snap, const ColumnDefinesPtr & col_defs) const = 0;

/// only ColumnInMemoryFile can be appendable
/// Note: Only ColumnFileInMemory can be appendable. Other ColumnFiles (i.e. ColumnFilePersisted) have
/// been persisted in the disk and their data will be immutable.
virtual bool isAppendable() const { return false; }
virtual void disableAppend() {}
virtual bool append(DMContext & /*dm_context*/, const Block & /*data*/, size_t /*offset*/, size_t /*limit*/, size_t /*data_bytes*/)
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ using ColumnTinyFilePtr = std::shared_ptr<ColumnFileTiny>;
/// It may be created in two ways:
/// 1. created directly when writing to storage if the data is large enough
/// 2. created when flushed `ColumnFileInMemory` to disk
/// And it may have cache data if the column file is small enough(The details are in the flush process).
class ColumnFileTiny : public ColumnFilePersisted
{
friend class ColumnFileTinyReader;
Expand All @@ -38,13 +37,15 @@ class ColumnFileTiny : public ColumnFilePersisted
UInt64 rows = 0;
UInt64 bytes = 0;

// The id of data page which stores the data of this pack.
/// The id of data page which stores the data of this pack.
PageId data_page_id;

/// The members below are not serialized.
// The cache data in memory.

/// The cache data in memory.
/// Currently this field is unused.
CachePtr cache;
// Used to map column id to column instance in a Block.
/// Used to map column id to column instance in a Block.
ColIdToOffset colid_to_offset;

private:
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ class DeltaValueSpace
/// Flush the data of column files which haven't write to disk yet, and also save the metadata of column files.
bool flush(DMContext & context);

/// Compacts fragment column files into bigger one, to save some IOPS during reading.
/// Compact fragment column files in the delta layer into bigger column files, to save some IOPS during reading.
/// It does not merge the delta into stable layer.
/// a.k.a. minor compaction.
bool compact(DMContext & context);

/// Create a constant snapshot for read.
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace DM
{
void MemTableSet::appendColumnFileInner(const ColumnFilePtr & column_file)
{
// If this column file's schema is identical to last_schema, then use the last_schema instance,
// If this column file's schema is identical to last_schema, then use the last_schema instance (instead of the one in `column_file`),
// so that we don't have to serialize my_schema instance.
if (auto * m_file = column_file->tryToInMemoryFile(); m_file)
{
Expand All @@ -54,6 +54,8 @@ void MemTableSet::appendColumnFileInner(const ColumnFilePtr & column_file)

if (!column_files.empty())
{
// As we are now appending a new column file (which can be used for new appends),
// let's simply mark the last column file as not appendable.
auto & last_column_file = column_files.back();
if (last_column_file->isAppendable())
last_column_file->disableAppend();
Expand Down Expand Up @@ -212,7 +214,7 @@ ColumnFileFlushTaskPtr MemTableSet::buildFlushTask(DMContext & context, size_t r
if (column_files.empty())
return nullptr;

// make the last column file not appendable
// Mark the last ColumnFile not appendable, so that `appendToCache` will not reuse it and we will be safe to flush it to disk.
if (column_files.back()->isAppendable())
column_files.back()->disableAppend();

Expand All @@ -224,6 +226,8 @@ ColumnFileFlushTaskPtr MemTableSet::buildFlushTask(DMContext & context, size_t r
auto & task = flush_task->addColumnFile(column_file);
if (auto * m_file = column_file->tryToInMemoryFile(); m_file)
{
// If the ColumnFile is not yet persisted in the disk, it will contain block data.
// In this case, let's write the block data in the flush process as well.
task.rows_offset = cur_rows_offset;
task.deletes_offset = cur_deletes_offset;
task.block_data = m_file->readDataForFlush();
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,14 @@ class MemTableSet : public std::enable_shared_from_this<MemTableSet>
/// The following methods returning false means this operation failed, caused by other threads could have done
/// some updates on this instance. E.g. this instance have been abandoned.
/// Caller should try again from the beginning.

/// Append a ColumnFile into this MemTableSet. The ColumnFile may be flushed later.
/// Note that some ColumnFiles may not contain block data, but only a reference to the block data stored in disk.
/// See different ColumnFile implementations for details.
void appendColumnFile(const ColumnFilePtr & column_file);

/// Append the block data into a ColumnFileInMemory (may be reused).
/// The ColumnFileInMemory will be stored in this MemTableSet and flushed later.
void appendToCache(DMContext & dm_context, const Block & block, size_t offset, size_t limit);

void appendDeleteRange(const RowKeyRange & delete_range);
Expand All @@ -99,7 +105,7 @@ class MemTableSet : public std::enable_shared_from_this<MemTableSet>
/// Create a constant snapshot for read.
ColumnFileSetSnapshotPtr createSnapshot(const StorageSnapshotPtr & storage_snap);

/// Build a flush task which will try to flush all column files in MemTableSet now
/// Build a flush task which will try to flush all column files in this MemTableSet at this moment.
ColumnFileFlushTaskPtr buildFlushTask(DMContext & context, size_t rows_offset, size_t deletes_offset, size_t flush_version);

void removeColumnFilesInFlushTask(const ColumnFileFlushTask & flush_task);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ inline bool hasColumn(const ColumnDefines & columns, const ColId & col_id)
return false;
}

/// Checks whether two blocks have the same schema.
template <bool check_default_value = false>
inline bool isSameSchema(const Block & a, const Block & b)
{
Expand Down
26 changes: 21 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
const auto bytes = block.bytes();

{
// Sort by handle & version in ascending order.
// Sort the block by handle & version in ascending order.
SortDescription sort;
sort.emplace_back(EXTRA_HANDLE_COLUMN_NAME, 1, 0);
sort.emplace_back(VERSION_COLUMN_NAME, 1, 0);
Expand All @@ -594,6 +594,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
const auto handle_column = block.getByName(EXTRA_HANDLE_COLUMN_NAME).column;
auto rowkey_column = RowKeyColumnContainer(handle_column, is_common_handle);

// Write block by segments
while (offset != rows)
{
RowKeyValueRef start_key = rowkey_column.getRowKeyValue(offset);
Expand All @@ -604,6 +605,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
// Keep trying until succeeded.
while (true)
{
// Find the segment according to current start_key
SegmentPtr segment;
{
std::shared_lock lock(read_write_mutex);
Expand All @@ -618,12 +620,16 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
}

FAIL_POINT_PAUSE(FailPoints::pause_when_writing_to_dt_store);

// Do force merge or stop write if necessary.
waitForWrite(dm_context, segment);
if (segment->hasAbandoned())
continue;

const auto & rowkey_range = segment->getRowKeyRange();

// The [offset, rows - offset] can be exceeding the Segment's rowkey_range. Cut the range
// to fit the segment.
auto [cur_offset, cur_limit] = rowkey_range.getPosRange(handle_column, offset, rows - offset);
if (unlikely(cur_offset != offset))
throw Exception("cur_offset does not equal to offset", ErrorCodes::LOGICAL_ERROR);
Expand All @@ -632,8 +638,8 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
auto alloc_bytes = block.bytes(offset, limit);

bool is_small = limit < dm_context->delta_cache_limit_rows / 4 && alloc_bytes < dm_context->delta_cache_limit_bytes / 4;
// Small column files are appended to Delta Cache, then flushed later.
// While large column files are directly written to PageStorage.
// For small column files, data is appended to MemTableSet, then flushed later.
// For large column files, data is directly written to PageStorage, while the ColumnFile entry is appended to MemTableSet.
if (is_small)
{
if (segment->writeToCache(*dm_context, block, offset, limit))
Expand All @@ -651,6 +657,8 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
wbs.rollbackWrittenLogAndData();
wbs.clear();

// In this case we will construct a ColumnFile that does not contain block data in the memory.
// The block data has been written to PageStorage in wbs.
write_column_file = ColumnFileTiny::writeColumnFile(*dm_context, block, offset, limit, wbs);
wbs.writeLogAndData();
write_range = rowkey_range;
Expand Down Expand Up @@ -1200,6 +1208,7 @@ void DeltaMergeStore::waitForWrite(const DMContextPtr & dm_context, const Segmen
size_t delta_rows = segment->getDelta()->getRows();
size_t delta_bytes = segment->getDelta()->getBytes();

// No need to stall the write stall if not exceeding the threshold of force merge.
if (delta_rows < forceMergeDeltaRows(dm_context) && delta_bytes < forceMergeDeltaBytes(dm_context))
return;

Expand All @@ -1216,16 +1225,23 @@ void DeltaMergeStore::waitForWrite(const DMContextPtr & dm_context, const Segmen

size_t sleep_ms;
if (delta_rows >= stop_write_delta_rows || delta_bytes >= stop_write_delta_bytes)
{
// For stop write (hard limit), wait until segment is updated (e.g. delta is merged).
sleep_ms = std::numeric_limits<size_t>::max();
}
else
{
// For force merge (soft limit), wait for a reasonable amount of time.
// It is possible that the segment is still not updated after the wait.
sleep_ms = static_cast<double>(segment_bytes) / k10mb * 1000 * wait_duration_factor;
}

// checkSegmentUpdate could do foreground merge delta, so call it before sleep.
checkSegmentUpdate(dm_context, segment, ThreadType::Write);

size_t sleep_step = 50;
// The delta will be merged, only after this segment got abandoned.
// Because merge delta will replace the segment instance.
// Wait at most `sleep_ms` until the delta is merged.
// Merge delta will replace the segment instance, causing `segment->hasAbandoned() == true`.
while (!segment->hasAbandoned() && sleep_ms > 0)
{
size_t ms = std::min(sleep_ms, sleep_step);
Expand Down
28 changes: 26 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,14 @@ class DeltaMergeStore : private boost::noncopyable

void flushCache(const DMContextPtr & dm_context, const RowKeyRange & range);

/// Do merge delta for all segments. Only used for debug.
/// Merge delta into the stable layer for all segments.
///
/// This function is called when using `MANAGE TABLE [TABLE] MERGE DELTA` from TiFlash Client.
void mergeDeltaAll(const Context & context);

/// Compact fragment column files into bigger one.

/// Compact the delta layer, merging multiple fragmented delta files into larger ones.
/// This is a minor compaction as it does not merge the delta into stable layer.
void compact(const Context & context, const RowKeyRange & range);

/// Iterator over all segments and apply gc jobs.
Expand Down Expand Up @@ -424,13 +428,33 @@ class DeltaMergeStore : private boost::noncopyable
return handle_define.id != EXTRA_HANDLE_COLUMN_ID;
}

/// Try to stall the writing. It will suspend the current thread if flow control is necessary.
/// There are roughly two flow control mechanisms:
/// - Force Merge (1 GB by default, see force_merge_delta_rows|size): Wait for a small amount of time at most.
/// - Stop Write (2 GB by default, see stop_write_delta_rows|size): Wait until delta is merged.
void waitForWrite(const DMContextPtr & context, const SegmentPtr & segment);

void waitForDeleteRange(const DMContextPtr & context, const SegmentPtr & segment);

/// Try to update the segment. "Update" means splitting the segment into two, merging two segments, merging the delta, etc.
/// If an update is really performed, the segment will be abandoned (with `segment->hasAbandoned() == true`).
/// See `segmentSplit`, `segmentMerge`, `segmentMergeDelta` for details.
///
/// This may be called from multiple threads, e.g. at the foreground write moment, or in background threads.
/// A `thread_type` should be specified indicating the type of the thread calling this function.
/// Depend on the thread type, the "update" to do may be varied.
void checkSegmentUpdate(const DMContextPtr & context, const SegmentPtr & segment, ThreadType thread_type);

/// Split the segment into two.
/// After splitting, the segment will be abandoned (with `segment->hasAbandoned() == true`) and the new two segments will be returned.
SegmentPair segmentSplit(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground);

/// Merge two segments into one.
/// After merging, both segments will be abandoned (with `segment->hasAbandoned() == true`).
void segmentMerge(DMContext & dm_context, const SegmentPtr & left, const SegmentPtr & right, bool is_foreground);

/// Merge the delta (major compaction) in the segment.
/// After delta-merging, the segment will be abandoned (with `segment->hasAbandoned() == true`) and a new segment will be returned.
SegmentPtr segmentMergeDelta(
DMContext & dm_context,
const SegmentPtr & segment,
Expand Down
26 changes: 21 additions & 5 deletions dbms/src/Storages/DeltaMerge/RowKeyRange.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ struct RowKeyValueRef
RowKeyValue toRowKeyValue() const;
};

/// Stores the raw bytes of the RowKey. Normally the RowKey will be stored in a column.
struct RowKeyValue
{
RowKeyValue() = default;
Expand Down Expand Up @@ -112,7 +113,11 @@ struct RowKeyValue
// Format as a hex string for debugging. The value will be converted to '?' if redact-log is on
String toDebugString() const;

RowKeyValueRef toRowKeyValueRef() const { return RowKeyValueRef{is_common_handle, value->data(), value->size(), int_value}; }
inline RowKeyValueRef toRowKeyValueRef() const
{
return RowKeyValueRef{is_common_handle, value->data(), value->size(), int_value};
}

DecodedTiKVKeyPtr toRegionKey(TableID table_id) const
{
// FIXME: move this to TiKVRecordFormat.h
Expand Down Expand Up @@ -177,6 +182,8 @@ struct RowKeyValue

using RowKeyValues = std::vector<RowKeyValue>;

/// An optimized implementation that will try to compare IntHandle via comparing Int values directly.
/// For common handles, per-byte comparison will be still used.
inline int compare(const RowKeyValueRef & a, const RowKeyValueRef & b)
{
if (unlikely(a.is_common_handle != b.is_common_handle))
Expand Down Expand Up @@ -214,6 +221,9 @@ inline int compare(const RowKeyValueRef & a, const RowKeyValueRef & b)
}
}

// TODO (wenxuan): The following compare operators can be simplified using
// boost::operator, or operator<=> when we upgrade to C++20.

inline int compare(const StringRef & a, const RowKeyValueRef & b)
{
RowKeyValueRef r_a{true, a.data, a.size, 0};
Expand Down Expand Up @@ -353,12 +363,13 @@ size_t lowerBound(const RowKeyColumnContainer & rowkey_column, size_t first, siz
}
} // namespace

/// A range denoted as [StartRowKey, EndRowKey).
struct RowKeyRange
{
// todo use template to refine is_common_handle
// TODO: use template to refine is_common_handle
bool is_common_handle;
/// start and end in RowKeyRange are always meaningful
/// it is assumed that start value is included and end value is excluded.

// start and end in RowKeyRange are always meaningful.
RowKeyValue start;
RowKeyValue end;
size_t rowkey_column_size;
Expand Down Expand Up @@ -440,6 +451,7 @@ struct RowKeyRange
}
}

/// Create a RowKeyRange that covers all key space.
static RowKeyRange newAll(bool is_common_handle, size_t rowkey_column_size)
{
if (is_common_handle)
Expand All @@ -456,6 +468,7 @@ struct RowKeyRange
}
}

/// Create a RowKeyRange that covers no data at all.
static RowKeyRange newNone(bool is_common_handle, size_t rowkey_column_size)
{
if (is_common_handle)
Expand Down Expand Up @@ -594,10 +607,13 @@ struct RowKeyRange
return check(first) && check(last_include);
}

/// Check whether thisRange.Start <= key
inline bool checkStart(const RowKeyValueRef & value) const { return compare(getStart(), value) <= 0; }

/// Check whether key < thisRange.End
inline bool checkEnd(const RowKeyValueRef & value) const { return compare(value, getEnd()) < 0; }

/// Check whether the key is included in this range.
inline bool check(const RowKeyValueRef & value) const { return checkStart(value) && checkEnd(value); }

inline RowKeyValueRef getStart() const { return start.toRowKeyValueRef(); }
Expand Down Expand Up @@ -625,7 +641,7 @@ struct RowKeyRange
return {start_key, end_key};
}

/// return <offset, limit>
/// Clip the <offset, limit> according to this range, and return the clipped <offset, limit>.
std::pair<size_t, size_t> getPosRange(const ColumnPtr & column, const size_t offset, const size_t limit) const
{
RowKeyColumnContainer rowkey_column(column, is_common_handle);
Expand Down
Loading

0 comments on commit 8bb0622

Please sign in to comment.