Skip to content

Commit

Permalink
Add function to show compression information
Browse files Browse the repository at this point in the history
Add a function that can be used on a compressed data value to show
some metadata information, such as the compression algorithm used and
the presence of any null values.
  • Loading branch information
erimatnor authored and mkindahl committed Aug 5, 2024
1 parent dbebf58 commit 500abf5
Show file tree
Hide file tree
Showing 20 changed files with 241 additions and 124 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7126
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7126 Add functions to show compression info
5 changes: 5 additions & 0 deletions sql/pre_install/types.functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ CREATE OR REPLACE FUNCTION _timescaledb_functions.compressed_data_recv(internal)
AS '@MODULE_PATHNAME@', 'ts_compressed_data_recv'
LANGUAGE C IMMUTABLE STRICT;

CREATE OR REPLACE FUNCTION _timescaledb_functions.compressed_data_info(_timescaledb_internal.compressed_data)
RETURNS TABLE (algorithm name, has_nulls bool)
LANGUAGE C STRICT IMMUTABLE
AS '@MODULE_PATHNAME@', 'ts_compressed_data_info';

CREATE OR REPLACE FUNCTION _timescaledb_functions.dimension_info_in(cstring)
RETURNS _timescaledb_internal.dimension_info
LANGUAGE C STRICT IMMUTABLE
Expand Down
4 changes: 4 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE FUNCTION _timescaledb_functions.compressed_data_info(_timescaledb_internal.compressed_data)
RETURNS TABLE (algorithm name, has_nulls bool)
LANGUAGE SQL STRICT IMMUTABLE
AS 'SELECT NULL,NULL' LANGUAGE SQL VOLATILE SET search_path = pg_catalog, pg_temp;
1 change: 1 addition & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP FUNCTION _timescaledb_functions.compressed_data_info(_timescaledb_internal.compressed_data);
1 change: 1 addition & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ CROSSMODULE_WRAPPER(compressed_data_send);
CROSSMODULE_WRAPPER(compressed_data_recv);
CROSSMODULE_WRAPPER(compressed_data_in);
CROSSMODULE_WRAPPER(compressed_data_out);
CROSSMODULE_WRAPPER(compressed_data_info);
CROSSMODULE_WRAPPER(deltadelta_compressor_append);
CROSSMODULE_WRAPPER(deltadelta_compressor_finish);
CROSSMODULE_WRAPPER(gorilla_compressor_append);
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ typedef struct CrossModuleFunctions
PGFunction compressed_data_recv;
PGFunction compressed_data_in;
PGFunction compressed_data_out;
PGFunction compressed_data_info;
bool (*process_compress_table)(AlterTableCmd *cmd, Hypertable *ht,
WithClauseResult *with_clause_options);
void (*process_altertable_cmd)(Hypertable *ht, const AlterTableCmd *cmd);
Expand Down
7 changes: 7 additions & 0 deletions tsl/src/compression/algorithms/array.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ typedef struct ArrayCompressed
uint64 alignment_sentinel[FLEXIBLE_ARRAY_MEMBER];
} ArrayCompressed;

bool
array_compressed_has_nulls(const CompressedDataHeader *header)
{
const ArrayCompressed *ac = (const ArrayCompressed *) header;
return ac->has_nulls;
}

static void
pg_attribute_unused() assertions(void)
{
Expand Down
1 change: 1 addition & 0 deletions tsl/src/compression/algorithms/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ typedef struct ArrayDecompressionIterator ArrayDecompressionIterator;

extern const Compressor array_compressor;

extern bool array_compressed_has_nulls(const CompressedDataHeader *header);
extern Compressor *array_compressor_for_type(Oid element_type);
extern ArrayCompressor *array_compressor_alloc(Oid type_to_compress);
extern void array_compressor_append_null(ArrayCompressor *compressor);
Expand Down
8 changes: 8 additions & 0 deletions tsl/src/compression/algorithms/deltadelta.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <common/base64.h>
#include <funcapi.h>
#include <lib/stringinfo.h>
#include <stdbool.h>
#include <utils/builtins.h>
#include <utils/date.h>
#include <utils/lsyscache.h>
Expand Down Expand Up @@ -76,6 +77,13 @@ typedef struct ExtendedCompressor
DeltaDeltaCompressor *internal;
} ExtendedCompressor;

bool
deltadelta_compressed_has_nulls(const CompressedDataHeader *header)
{
const DeltaDeltaCompressed *ddc = (const DeltaDeltaCompressed *) header;
return ddc->has_nulls;
}

static void
deltadelta_compressor_append_bool(Compressor *compressor, Datum val)
{
Expand Down
1 change: 1 addition & 0 deletions tsl/src/compression/algorithms/deltadelta.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ typedef struct DeltaDeltaCompressor DeltaDeltaCompressor;
typedef struct DeltaDeltaCompressed DeltaDeltaCompressed;
typedef struct DeltaDeltaDecompressionIterator DeltaDeltaDecompressionIterator;

extern bool deltadelta_compressed_has_nulls(const CompressedDataHeader *header);
extern Compressor *delta_delta_compressor_for_type(Oid element_type);
extern DeltaDeltaCompressor *delta_delta_compressor_alloc(void);
extern void delta_delta_compressor_append_null(DeltaDeltaCompressor *compressor);
Expand Down
7 changes: 7 additions & 0 deletions tsl/src/compression/algorithms/dictionary.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ typedef struct DictionaryCompressed
uint64 alignment_sentinel[FLEXIBLE_ARRAY_MEMBER];
} DictionaryCompressed;

bool
dictionary_compressed_has_nulls(const CompressedDataHeader *header)
{
const DictionaryCompressed *dc = (const DictionaryCompressed *) header;
return dc->has_nulls;
}

static void
pg_attribute_unused() assertions(void)
{
Expand Down
1 change: 1 addition & 0 deletions tsl/src/compression/algorithms/dictionary.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ typedef struct DictionaryCompressor DictionaryCompressor;
typedef struct DictionaryCompressed DictionaryCompressed;
typedef struct DictionaryDecompressionIterator DictionaryDecompressionIterator;

extern bool dictionary_compressed_has_nulls(const CompressedDataHeader *header);
extern Compressor *dictionary_compressor_for_type(Oid element_type);
extern DictionaryCompressor *dictionary_compressor_alloc(Oid type_to_compress);
extern void dictionary_compressor_append_null(DictionaryCompressor *compressor);
Expand Down
7 changes: 7 additions & 0 deletions tsl/src/compression/algorithms/gorilla.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ typedef struct CompressedGorillaData
Simple8bRleSerialized *nulls; /* NULL if no nulls */
} CompressedGorillaData;

bool
gorilla_compressed_has_nulls(const CompressedDataHeader *header)
{
const GorillaCompressed *gc = (const GorillaCompressed *) header;
return gc->has_nulls;
}

static void
pg_attribute_unused() assertions(void)
{
Expand Down
1 change: 1 addition & 0 deletions tsl/src/compression/algorithms/gorilla.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ typedef struct GorillaCompressor GorillaCompressor;
typedef struct GorillaCompressed GorillaCompressed;
typedef struct GorillaDecompressionIterator GorillaDecompressionIterator;

extern bool gorilla_compressed_has_nulls(const CompressedDataHeader *header);
extern Compressor *gorilla_compressor_for_type(Oid element_type);

extern GorillaCompressor *gorilla_compressor_alloc(void);
Expand Down
65 changes: 65 additions & 0 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ static const CompressionAlgorithmDefinition definitions[_END_COMPRESSION_ALGORIT
[COMPRESSION_ALGORITHM_DELTADELTA] = DELTA_DELTA_ALGORITHM_DEFINITION,
};

static const char *compression_algorithm_name[] = {
[_INVALID_COMPRESSION_ALGORITHM] = "INVALID", [COMPRESSION_ALGORITHM_ARRAY] = "ARRAY",
[COMPRESSION_ALGORITHM_DICTIONARY] = "DICTIONARY", [COMPRESSION_ALGORITHM_GORILLA] = "GORILLA",
[COMPRESSION_ALGORITHM_DELTADELTA] = "DELTADELTA",
};

const char *
compression_get_algorithm_name(CompressionAlgorithm alg)
{
return compression_algorithm_name[alg];
}

static Compressor *
compressor_for_type(Oid type)
{
Expand Down Expand Up @@ -1983,6 +1995,59 @@ tsl_compressed_data_out(PG_FUNCTION_ARGS)
PG_RETURN_CSTRING(encoded);
}

/* create_hypertable record attribute numbers */
enum Anum_compressed_info
{
Anum_compressed_info_algorithm = 1,
Anum_compressed_info_has_nulls,
_Anum_compressed_info_max,
};

#define Natts_compressed_info (_Anum_compressed_info_max - 1)

extern Datum
tsl_compressed_data_info(PG_FUNCTION_ARGS)
{
const CompressedDataHeader *header = (CompressedDataHeader *) PG_GETARG_VARLENA_P(0);
TupleDesc tupdesc;
HeapTuple tuple;
bool has_nulls = false;

if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in "
"context that cannot accept type record")));

switch (header->compression_algorithm)
{
case COMPRESSION_ALGORITHM_GORILLA:
has_nulls = gorilla_compressed_has_nulls(header);
break;
case COMPRESSION_ALGORITHM_DICTIONARY:
has_nulls = dictionary_compressed_has_nulls(header);
break;
case COMPRESSION_ALGORITHM_DELTADELTA:
has_nulls = deltadelta_compressed_has_nulls(header);
break;
case COMPRESSION_ALGORITHM_ARRAY:
has_nulls = array_compressed_has_nulls(header);
break;
}

tupdesc = BlessTupleDesc(tupdesc);

Datum values[Natts_compressed_info];
bool nulls[Natts_compressed_info] = { false };

values[AttrNumberGetAttrOffset(Anum_compressed_info_algorithm)] =
CStringGetDatum(compression_get_algorithm_name(header->compression_algorithm));
values[AttrNumberGetAttrOffset(Anum_compressed_info_has_nulls)] = BoolGetDatum(has_nulls);
tuple = heap_form_tuple(tupdesc, values, nulls);

return HeapTupleGetDatum(tuple);
}

extern CompressionStorage
compression_get_toast_storage(CompressionAlgorithm algorithm)
{
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ extern Datum tsl_compressed_data_send(PG_FUNCTION_ARGS);
extern Datum tsl_compressed_data_recv(PG_FUNCTION_ARGS);
extern Datum tsl_compressed_data_in(PG_FUNCTION_ARGS);
extern Datum tsl_compressed_data_out(PG_FUNCTION_ARGS);
extern Datum tsl_compressed_data_info(PG_FUNCTION_ARGS);

static void
pg_attribute_unused() assert_num_compression_algorithms_sane(void)
Expand All @@ -320,6 +321,7 @@ pg_attribute_unused() assert_num_compression_algorithms_sane(void)
"number of algorithms have changed, the asserts should be updated");
}

extern const char *compression_get_algorithm_name(CompressionAlgorithm alg);
extern CompressionStorage compression_get_toast_storage(CompressionAlgorithm algo);
extern CompressionAlgorithm compression_get_default_algorithm(Oid typeoid);

Expand Down
1 change: 1 addition & 0 deletions tsl/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ CrossModuleFunctions tsl_cm_functions = {
.compressed_data_recv = tsl_compressed_data_recv,
.compressed_data_in = tsl_compressed_data_in,
.compressed_data_out = tsl_compressed_data_out,
.compressed_data_info = tsl_compressed_data_info,
.deltadelta_compressor_append = tsl_deltadelta_compressor_append,
.deltadelta_compressor_finish = tsl_deltadelta_compressor_finish,
.gorilla_compressor_append = tsl_gorilla_compressor_append,
Expand Down
Loading

0 comments on commit 500abf5

Please sign in to comment.