diff --git a/dbms/src/Raft/RaftService.cpp b/dbms/src/Raft/RaftService.cpp index 651690b7489..1586304a660 100644 --- a/dbms/src/Raft/RaftService.cpp +++ b/dbms/src/Raft/RaftService.cpp @@ -129,16 +129,16 @@ RaftService::~RaftService() grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context, CommandServerReaderWriter * stream) { - RaftContext raft_contex(&db_context, grpc_context, stream); + RaftContext raft_context(&db_context, grpc_context, stream); try { - kvstore->report(raft_contex); + kvstore->report(raft_context); enginepb::CommandRequestBatch cmds; while (stream->Read(&cmds)) { - kvstore->onServiceCommand(std::move(cmds), raft_contex); + kvstore->onServiceCommand(std::move(cmds), raft_context); } } catch (...) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index ec84d25a31e..c2902213e57 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -220,7 +220,7 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings action.task = action.segment->createAppendTask(op_context, wbs, action.update); } - // TODO: We need to do a delta merge after write a delete range, otherwise, the rows got deleted could never be acutally removed. + // TODO: We need to do a delta merge after write a delete range, otherwise, the rows got deleted could never be actually removed. commitWrites(actions, wbs, dm_context, op_context); } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index c06f1a097f8..f0cb2d57277 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -87,6 +87,32 @@ TEST_F(DeltaMergeStore_test, Create) } } +TEST_F(DeltaMergeStore_test, OpenWithExtraColumns) +{ + const ColumnDefine col_str_define(2, "col2", std::make_shared()); + const ColumnDefine col_i8_define(3, "i8", std::make_shared()); + { + ColumnDefines table_column_defines = DMTestEnv::getDefaultColumns(); + table_column_defines.emplace_back(col_str_define); + table_column_defines.emplace_back(col_i8_define); + store = reload(table_column_defines); + } + + { + // check column structure + const auto & cols = store->getTableColumns(); + ASSERT_EQ(cols.size(), 5UL); + const auto & str_col = cols[3]; + ASSERT_EQ(str_col.name, col_str_define.name); + ASSERT_EQ(str_col.id, col_str_define.id); + ASSERT_TRUE(str_col.type->equals(*col_str_define.type)); + const auto & i8_col = cols[4]; + ASSERT_EQ(i8_col.name, col_i8_define.name); + ASSERT_EQ(i8_col.id, col_i8_define.id); + ASSERT_TRUE(i8_col.type->equals(*col_i8_define.type)); + } +} + TEST_F(DeltaMergeStore_test, SimpleWriteRead) { const ColumnDefine col_str_define(2, "col2", std::make_shared()); @@ -233,6 +259,291 @@ TEST_F(DeltaMergeStore_test, SimpleWriteRead) } } +TEST_F(DeltaMergeStore_test, DeleteRead) +{ + const size_t num_rows_write = 128; + { + // Create a block with sequential Int64 handle in range [0, 128) + Block block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false); + store->write(*context, context->getSettingsRef(), block); + } + // Test Reading first + { + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read(*context, + context->getSettingsRef(), + columns, + {HandleRange::newAll()}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024)[0]; + size_t num_rows_read = 0; + while (Block block = in->read()) + { + num_rows_read += block.rows(); + for (auto && iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); ++i) + { + if (iter.name == "pk") + { + ASSERT_EQ(c->getInt(i), i); + } + } + } + } + + ASSERT_EQ(num_rows_read, num_rows_write); + } + // Delete range [0, 64) + const size_t num_deleted_rows = 64; + { + HandleRange range(0, num_deleted_rows); + store->deleteRange(*context, context->getSettingsRef(), range); + } + // Read after deletion + { + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read(*context, + context->getSettingsRef(), + columns, + {HandleRange::newAll()}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024)[0]; + size_t num_rows_read = 0; + while (Block block = in->read()) + { + num_rows_read += block.rows(); + for (auto && iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); ++i) + { + if (iter.name == "pk") + { + // Range after deletion is [64, 128) + ASSERT_EQ(c->getInt(i), i + Int64(num_deleted_rows)); + } + } + } + } + + ASSERT_EQ(num_rows_read, num_rows_write - num_deleted_rows); + } +} + +TEST_F(DeltaMergeStore_test, WriteMultipleBlock) +{ + const size_t num_write_rows = 32; + + // Test write multi blocks without overlap + { + Block block1 = DMTestEnv::prepareSimpleWriteBlock(0, 1 * num_write_rows, false); + Block block2 = DMTestEnv::prepareSimpleWriteBlock(1 * num_write_rows, 2 * num_write_rows, false); + Block block3 = DMTestEnv::prepareSimpleWriteBlock(2 * num_write_rows, 3 * num_write_rows, false); + store->write(*context, context->getSettingsRef(), block1); + store->write(*context, context->getSettingsRef(), block2); + store->write(*context, context->getSettingsRef(), block3); + } + + { + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read(*context, + context->getSettingsRef(), + columns, + {HandleRange::newAll()}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024)[0]; + size_t num_rows_read = 0; + while (Block block = in->read()) + { + num_rows_read += block.rows(); + for (auto && iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); ++i) + { + if (iter.name == "pk") + { + ASSERT_EQ(c->getInt(i), i); + } + } + } + } + + ASSERT_EQ(num_rows_read, 3 * num_write_rows); + } + + store = reload(); + + // Test write multi blocks with overlap + { + UInt64 tso1 = 1; + UInt64 tso2 = 100; + Block block1 = DMTestEnv::prepareSimpleWriteBlock(0, 1 * num_write_rows, false, tso1); + Block block2 = DMTestEnv::prepareSimpleWriteBlock(1 * num_write_rows, 2 * num_write_rows, false, tso1); + Block block3 = DMTestEnv::prepareSimpleWriteBlock(num_write_rows / 2, num_write_rows / 2 + num_write_rows, false, tso2); + store->write(*context, context->getSettingsRef(), block1); + store->write(*context, context->getSettingsRef(), block2); + store->write(*context, context->getSettingsRef(), block3); + } + // Read without version + { + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read(*context, + context->getSettingsRef(), + columns, + {HandleRange::newAll()}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024)[0]; + size_t num_rows_read = 0; + while (Block block = in->read()) + { + num_rows_read += block.rows(); + for (auto && iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); ++i) + { + if (iter.name == "pk") + { + ASSERT_EQ(c->getInt(i), i); + } + } + } + } + + ASSERT_EQ(num_rows_read, 3 * num_write_rows); + } + // Read with version + { + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read(*context, + context->getSettingsRef(), + columns, + {HandleRange::newAll()}, + /* num_streams= */ 1, + /* max_version= */ UInt64(1), + /* expected_block_size= */ 1024)[0]; + size_t num_rows_read = 0; + while (Block block = in->read()) + { + num_rows_read += block.rows(); + for (auto && iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); ++i) + { + if (iter.name == "pk") + { + ASSERT_EQ(c->getInt(i), i); + } + } + } + } + + ASSERT_EQ(num_rows_read, 2 * num_write_rows); + } +} + +// DEPRECATED: +// This test case strongly depends on implementation of `shouldSplit()` and `shouldMerge()`. +// The machanism of them may be changed one day. So uncomment the test if need. +TEST_F(DeltaMergeStore_test, DISABLED_WriteLargeBlock) +{ + DB::Settings settings = context->getSettings(); + // Mock dm_segment_rows for test + // if rows > 8 will split + // if left->rows < 2 && right->rows + left->rows < 4 will merge + settings.dm_segment_limit_rows = 4; + + { + store->check(*context); + } + + { + // Write 7 rows that would not trigger a split + Block block = DMTestEnv::prepareSimpleWriteBlock(0, 8, false); + store->write(*context, settings, block); + } + + { + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read(*context, + settings, + columns, + {HandleRange::newAll()}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024)[0]; + size_t num_rows_read = 0; + while (Block block = in->read()) + { + num_rows_read += block.rows(); + for (auto & iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); i++) + { + if (iter.name == "pk") + { + EXPECT_EQ(c->getInt(i), i); + } + } + } + } + ASSERT_EQ(num_rows_read, 8UL); + } + + { + // Write rows that would trigger a split + Block block = DMTestEnv::prepareSimpleWriteBlock(8, 9, false); + store->write(*context, settings, block); + } + + // Now there is 2 segments + // segment1: 0, 1, 2, 3 + // segment2: 4, 5, 6, 7, 8 + { + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read(*context, + settings, + columns, + {HandleRange::newAll()}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024)[0]; + size_t num_rows_read = 0; + // block_num represents index of current segment + int block_num = 0; + while (Block block = in->read()) + { + num_rows_read += block.rows(); + for (auto & iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); i++) + { + if (iter.name == "pk" && block_num == 0) + { + EXPECT_EQ(c->getInt(i), i); + } + else if (iter.name == "pk" && block_num == 1) + { + EXPECT_EQ(c->getInt(i), i + 4); + } + } + } + block_num++; + } + ASSERT_EQ(num_rows_read, 9UL); + } +} + TEST_F(DeltaMergeStore_test, ReadWithSpecifyTso) { const UInt64 tso1 = 4; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index a0944610cee..ba327c9ef38 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1,3 +1,4 @@ +#include #include #include "dm_basic_include.h" @@ -75,9 +76,9 @@ class Segment_test : public ::testing::Test .handle_column = table_columns_.at(0), .min_version = 0, - .not_compress = settings.not_compress_columns, + .not_compress = settings.not_compress_columns, - .segment_limit_rows = db_context->getSettingsRef().dm_segment_limit_rows, + .segment_limit_rows = db_context->getSettingsRef().dm_segment_limit_rows, .delta_limit_rows = db_context->getSettingsRef().dm_segment_delta_limit_rows, .delta_limit_bytes = db_context->getSettingsRef().dm_segment_delta_limit_bytes, @@ -355,6 +356,9 @@ TEST_P(SegmentDeletion_test, DeleteDataInStable) segment->write(dmContext(), {remove}); // TODO test delete range partial overlap with segment // TODO test delete range not included by segment + + // flush segment + segment = segment->mergeDelta(dmContext()); } if (merge_delta_after_delete) @@ -474,6 +478,200 @@ TEST_P(SegmentDeletion_test, DeleteDataInStableAndDelta) } INSTANTIATE_TEST_CASE_P(WhetherReadOrMergeDeltaBeforeDeleteRange, SegmentDeletion_test, testing::Combine(testing::Bool(), testing::Bool())); +TEST_F(Segment_test, DeleteRead) +{ + const size_t num_rows_write = 64; + { + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + segment->write(dmContext(), std::move(block)); + } + + { + // flush segment + segment = segment->mergeDelta(dmContext()); + } + + { + // Test delete range [70, 100) + HandleRange del{70, 100}; + segment->write(dmContext(), {del}); + // flush segment + segment = segment->mergeDelta(dmContext()); + } + + { + // Read after deletion + // The deleted range has no overlap with current data, so there should be no change + auto in = segment->getInputStream(/* dm_context= */ dmContext(), + /* columns_to_read= */ tableColumns(), + /* segment_snap= */ segment->getReadSnapshot(), + /* storage_snaps= */ {dmContext().storage_pool}, + /* read_ranges= */ {HandleRange::newAll()}, + /* filter */ {}, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024); + in->readPrefix(); + while (Block block = in->read()) + { + ASSERT_EQ(block.rows(), num_rows_write); + for (auto & iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); i++) + { + if (iter.name == "pk") + { + EXPECT_EQ(c->getInt(i), i); + } + } + } + } + in->readSuffix(); + } + + { + // Test delete range [63, 70) + HandleRange del{63, 70}; + segment->write(dmContext(), {del}); + // flush segment + segment = segment->mergeDelta(dmContext()); + } + + { + // Read after deletion + // The deleted range has overlap range [63, 64) with current data, so the record with Handle 63 should be deleted + auto in = segment->getInputStream(/* dm_context= */ dmContext(), + /* columns_to_read= */ tableColumns(), + /* segment_snap= */ segment->getReadSnapshot(), + /* storage_snaps= */ {dmContext().storage_pool}, + /* read_ranges= */ {HandleRange::newAll()}, + /* filter */ {}, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024); + in->readPrefix(); + while (Block block = in->read()) + { + ASSERT_EQ(block.rows(), num_rows_write - 1); + for (auto & iter : block) + { + auto c = iter.column; + if (iter.name == "pk") + { + EXPECT_EQ(c->getInt(0), 0); + EXPECT_EQ(c->getInt(62), 62); + } + EXPECT_EQ(c->size(), 63UL); + } + } + in->readSuffix(); + } + + { + // Test delete range [1, 32) + HandleRange del{1, 32}; + segment->write(dmContext(), {del}); + // flush segment + segment = segment->mergeDelta(dmContext()); + } + + { + // Read after deletion + auto in = segment->getInputStream(/* dm_context= */ dmContext(), + /* columns_to_read= */ tableColumns(), + /* segment_snap= */ segment->getReadSnapshot(), + /* storage_snaps= */ {dmContext().storage_pool}, + /* read_ranges= */ {HandleRange::newAll()}, + /* filter */ {}, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024); + in->readPrefix(); + while (Block block = in->read()) + { + ASSERT_EQ(block.rows(), num_rows_write - 32); + for (auto & iter : block) + { + auto c = iter.column; + if (iter.name == "pk") + { + EXPECT_EQ(c->getInt(0), 0); + EXPECT_EQ(c->getInt(1), 32); + } + } + } + in->readSuffix(); + } + + { + // Test delete range [1, 32) + // delete should be idempotent + HandleRange del{1, 32}; + segment->write(dmContext(), {del}); + // flush segment + segment = segment->mergeDelta(dmContext()); + } + + { + // Read after deletion + auto in = segment->getInputStream(/* dm_context= */ dmContext(), + /* columns_to_read= */ tableColumns(), + /* segment_snap= */ segment->getReadSnapshot(), + /* storage_snaps= */ {dmContext().storage_pool}, + /* read_ranges= */ {HandleRange::newAll()}, + /* filter */ {}, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024); + in->readPrefix(); + while (Block block = in->read()) + { + ASSERT_EQ(block.rows(), num_rows_write - 32); + for (auto & iter : block) + { + auto c = iter.column; + if (iter.name == "pk") + { + EXPECT_EQ(c->getInt(0), 0); + EXPECT_EQ(c->getInt(1), 32); + } + } + } + in->readSuffix(); + } + + { + // Test delete range [0, 2) + // There is an overlap range [0, 1) + HandleRange del{0, 2}; + segment->write(dmContext(), {del}); + // flush segment + segment = segment->mergeDelta(dmContext()); + } + + { + // Read after deletion + auto in = segment->getInputStream(/* dm_context= */ dmContext(), + /* columns_to_read= */ tableColumns(), + /* segment_snap= */ segment->getReadSnapshot(), + /* storage_snaps= */ {dmContext().storage_pool}, + /* read_ranges= */ {HandleRange::newAll()}, + /* filter */ {}, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024); + in->readPrefix(); + while (Block block = in->read()) + { + ASSERT_EQ(block.rows(), num_rows_write - 33); + for (auto & iter : block) + { + auto c = iter.column; + if (iter.name == "pk") + { + EXPECT_EQ(c->getInt(0), 32); + } + } + } + in->readSuffix(); + } +} TEST_F(Segment_test, Split) { @@ -589,6 +787,212 @@ TEST_F(Segment_test, Split) } } +TEST_F(Segment_test, Restore) +{ + // compare will compares the given segments. + // If they are equal, result will be true, otherwise it will be false. + auto compare = [&](const SegmentPtr & seg1, const SegmentPtr & seg2, bool & result) { + result = false; + auto in1 = seg1->getInputStream(/* dm_context= */ dmContext(), + /* columns_to_read= */ tableColumns(), + /* segment_snap= */ seg1->getReadSnapshot(), + /* storage_snaps= */ {dmContext().storage_pool}, + /* read_ranges= */ {HandleRange::newAll()}, + /* filter */ {}, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024); + + auto in2 = seg2->getInputStream(/* dm_context= */ dmContext(), + /* columns_to_read= */ tableColumns(), + /* segment_snap= */ seg2->getReadSnapshot(), + /* storage_snaps= */ {dmContext().storage_pool}, + /* read_ranges= */ {HandleRange::newAll()}, + /* filter */ {}, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024); + in1->readPrefix(); + in2->readPrefix(); + for (;;) + { + Block block1 = in1->read(); + Block block2 = in2->read(); + if (!block1) + { + ASSERT_TRUE(!block2); + break; + } + + ASSERT_EQ(block1.rows(), block2.rows()); + + auto iter1 = block1.begin(); + auto iter2 = block2.begin(); + + for (;;) + { + if (iter1 == block1.end()) + { + ASSERT_EQ(iter2, block2.end()); + break; + } + + auto c1 = iter1->column; + auto c2 = iter2->column; + + ASSERT_EQ(c1->size(), c2->size()); + + for (Int64 i = 0; i < Int64(c1->size()); i++) + { + if (iter1->name == "pk") + { + ASSERT_EQ(iter2->name, "pk"); + ASSERT_EQ(c1->getInt(i), c2->getInt(i)); + } + } + + // Call next + iter1++; + iter2++; + } + } + in1->readSuffix(); + in2->readSuffix(); + + result = true; + }; + + const size_t num_rows_write = 64; + { + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + segment->write(dmContext(), std::move(block)); + // flush segment + segment = segment->mergeDelta(dmContext()); + } + + SegmentPtr new_segment = segment->restoreSegment(dmContext(), segment->segmentId()); + + { + // test compare + bool result; + compare(segment, new_segment, result); + ASSERT_TRUE(result); + } + + { + // Do some update and restore again + HandleRange del(0, 32); + segment->write(dmContext(), {del}); + new_segment = segment->restoreSegment(dmContext(), segment->segmentId()); + } + + { + // test compare + bool result; + compare(new_segment, new_segment, result); + ASSERT_TRUE(result); + } +} + +TEST_F(Segment_test, MassiveSplit) +try +{ + Settings settings = dmContext().db_context.getSettings(); + settings.dm_segment_limit_rows = 11; + settings.dm_segment_delta_limit_rows = 7; + + segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); + + size_t num_batches_written = 0; + const size_t num_rows_per_write = 5; + + const time_t start_time = std::time(nullptr); + + auto temp = std::vector(); + for (;;) + { + { + // Write to segment + Block block = DMTestEnv::prepareSimpleWriteBlock( // + num_batches_written * num_rows_per_write, // + num_batches_written * num_rows_per_write + num_rows_per_write, + false); + segment->write(dmContext(), std::move(block)); + num_batches_written += 1; + } + + { + // Delete some records so that the following condition can be satisfied: + // if pk % 5 < 2, then the record would be deleted + // if pk % 5 >= 2, then the record would be reserved + HandleRange del{Int64((num_batches_written - 1) * num_rows_per_write), + Int64((num_batches_written - 1) * num_rows_per_write + 2)}; + segment->write(dmContext(), {del}); + } + + { + // flush segment + segment = segment->mergeDelta(dmContext()); + } + + for (size_t i = (num_batches_written - 1) * num_rows_per_write + 2; i < num_batches_written * num_rows_per_write; i++) + { + temp.push_back(Int64(i)); + } + + { + // Read after writing + auto in = segment->getInputStream(/* dm_context= */ dmContext(), + /* columns_to_read= */ tableColumns(), + /* segment_snap= */ segment->getReadSnapshot(), + /* storage_snaps= */ {dmContext().storage_pool}, + /* read_ranges= */ {HandleRange::newAll()}, + /* filter */ {}, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024); + size_t num_rows_read = 0; + in->readPrefix(); + while (Block block = in->read()) + { + for (auto & iter : block) + { + auto c = iter.column; + for (size_t i = 0; i < c->size(); i++) + { + if (iter.name == "pk") + { + auto expect = temp.at(i + num_rows_read); + EXPECT_EQ(c->getInt(Int64(i)), expect); + } + } + } + num_rows_read += block.rows(); + } + in->readSuffix(); + ASSERT_EQ(num_batches_written * (num_rows_per_write - 2), num_rows_read); + } + + { + // Run for long enough to make sure Split is robust. + const time_t end_time = std::time(nullptr); + // if ((end_time - start_time) / 60 > 10) + if ((end_time - start_time) > 10) + { + return; + } + } + } +} +catch (const Exception & e) +{ + std::string text = e.displayText(); + + auto embedded_stack_trace_pos = text.find("Stack trace"); + std::cerr << "Code: " << e.code() << ". " << text << std::endl << std::endl; + if (std::string::npos == embedded_stack_trace_pos) + std::cerr << "Stack trace:" << std::endl << e.getStackTrace().toString() << std::endl; + + throw; +} + /// Mock a col from i8 -> i32 TEST_F(Segment_test, DDLAlterInt8ToInt32) {