Skip to content

Commit

Permalink
Add compression flush callback
Browse files Browse the repository at this point in the history
Add the ability to define a callback function that gets called every
time a new segment is flushed during compression. The callback can be
used to report compression progress.

Also expose the function to create a tuple sort state for compression.
  • Loading branch information
erimatnor committed Sep 25, 2024
1 parent cb5833c commit ee5eb9b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 24 deletions.
53 changes: 31 additions & 22 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ compressor_for_type(Oid type)
return definitions[algorithm].compressor_for_type(type);
}

DecompressionIterator *(*tsl_get_decompression_iterator_init(CompressionAlgorithm algorithm,
bool reverse))(Datum, Oid)
DecompressionInitializer
tsl_get_decompression_iterator_init(CompressionAlgorithm algorithm, bool reverse)
{
if (algorithm >= _END_COMPRESSION_ALGORITHMS)
elog(ERROR, "invalid compression algorithm %d", algorithm);
Expand Down Expand Up @@ -535,18 +535,13 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)
return cstat;
}

static Tuplesortstate *
compress_chunk_sort_relation(CompressionSettings *settings, Relation in_rel)
Tuplesortstate *
compression_create_tuplesort_state(CompressionSettings *settings, Relation rel)
{
TupleDesc tupDesc = RelationGetDescr(in_rel);
Tuplesortstate *tuplesortstate;
TableScanDesc scan;
TupleTableSlot *slot;

TupleDesc tupdesc = RelationGetDescr(rel);
int num_segmentby = ts_array_length(settings->fd.segmentby);
int num_orderby = ts_array_length(settings->fd.orderby);
int n_keys = num_segmentby + num_orderby;

AttrNumber *sort_keys = palloc(sizeof(*sort_keys) * n_keys);
Oid *sort_operators = palloc(sizeof(*sort_operators) * n_keys);
Oid *sort_collations = palloc(sizeof(*sort_collations) * n_keys);
Expand All @@ -568,24 +563,33 @@ compress_chunk_sort_relation(CompressionSettings *settings, Relation in_rel)
attname = ts_array_get_element_text(settings->fd.orderby, position);
}
compress_chunk_populate_sort_info_for_column(settings,
RelationGetRelid(in_rel),
RelationGetRelid(rel),
attname,
&sort_keys[n],
&sort_operators[n],
&sort_collations[n],
&nulls_first[n]);
}

tuplesortstate = tuplesort_begin_heap(tupDesc,
n_keys,
sort_keys,
sort_operators,
sort_collations,
nulls_first,
maintenance_work_mem,
NULL,
false /*=randomAccess*/);
return tuplesort_begin_heap(tupdesc,
n_keys,
sort_keys,
sort_operators,
sort_collations,
nulls_first,
maintenance_work_mem,
NULL,
false /*=randomAccess*/);
}

static Tuplesortstate *
compress_chunk_sort_relation(CompressionSettings *settings, Relation in_rel)
{
Tuplesortstate *tuplesortstate;
TableScanDesc scan;
TupleTableSlot *slot;

tuplesortstate = compression_create_tuplesort_state(settings, in_rel);
scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL);
slot = table_slot_create(in_rel, NULL);

Expand Down Expand Up @@ -1323,6 +1327,11 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change
row_compressor->compressed_values[compressed_col] = 0;
row_compressor->compressed_is_null[compressed_col] = true;
}

if (NULL != row_compressor->on_flush)
row_compressor->on_flush(row_compressor,
row_compressor->rows_compressed_into_current_value);

row_compressor->rowcnt_pre_compression += row_compressor->rows_compressed_into_current_value;
row_compressor->num_compressed_rows++;
row_compressor->rows_compressed_into_current_value = 0;
Expand Down Expand Up @@ -2007,7 +2016,7 @@ enum Anum_compressed_info
extern Datum
tsl_compressed_data_info(PG_FUNCTION_ARGS)
{
const CompressedDataHeader *header = (CompressedDataHeader *) PG_GETARG_VARLENA_P(0);
const CompressedDataHeader *header = get_compressed_data_header(PG_GETARG_DATUM(0));
TupleDesc tupdesc;
HeapTuple tuple;
bool has_nulls = false;
Expand Down Expand Up @@ -2085,7 +2094,7 @@ compression_get_default_algorithm(Oid typeoid)

default:
{
/* use dictitionary if possible, otherwise use array */
/* use dictionary if possible, otherwise use array */
TypeCacheEntry *tentry =
lookup_type_cache(typeoid, TYPECACHE_EQ_OPR_FINFO | TYPECACHE_HASH_PROC_FINFO);
if (tentry->hash_proc_finfo.fn_addr == NULL || tentry->eq_opr_finfo.fn_addr == NULL)
Expand Down
11 changes: 9 additions & 2 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,14 @@ typedef enum
TOAST_STORAGE_EXTENDED
} CompressionStorage;

typedef DecompressionIterator *(*DecompressionInitializer)(Datum, Oid);
typedef ArrowArray *(*DecompressAllFunction)(Datum compressed, Oid element_type,
MemoryContext dest_mctx);

typedef struct CompressionAlgorithmDefinition
{
DecompressionIterator *(*iterator_init_forward)(Datum, Oid element_type);
DecompressionIterator *(*iterator_init_reverse)(Datum, Oid element_type);
DecompressionInitializer iterator_init_forward;
DecompressionInitializer iterator_init_reverse;
DecompressAllFunction decompress_all;
void (*compressed_data_send)(CompressedDataHeader *, StringInfo);
Datum (*compressed_data_recv)(StringInfo);
Expand Down Expand Up @@ -268,6 +269,10 @@ typedef struct RowCompressor
bool first_iteration;
/* the heap insert options */
int insert_options;

/* Callback called on every flush. The ntuples argument is the number of
* tuples flushed. Typically used for progress reporting. */
void (*on_flush)(struct RowCompressor *rowcompress, uint64 ntuples);
} RowCompressor;

/*
Expand Down Expand Up @@ -359,6 +364,8 @@ extern void compress_chunk_populate_sort_info_for_column(CompressionSettings *se
const char *attname, AttrNumber *att_nums,
Oid *sort_operator, Oid *collation,
bool *nulls_first);
extern Tuplesortstate *compression_create_tuplesort_state(CompressionSettings *settings,
Relation rel);
extern void row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor,
Relation uncompressed_table, Relation compressed_table,
int16 num_columns_in_compressed_table, bool need_bistate,
Expand Down

0 comments on commit ee5eb9b

Please sign in to comment.