Skip to content

Commit

Permalink
Account for uncompressed rows in 'create_compressed_chunk'
Browse files Browse the repository at this point in the history
`_timescaledb_internal.create_compressed_chunk` can be used to create a
compressed chunk with existing compressed data. It did not account for
the fact that the chunk can contain uncompressed data, in which case the
chunk status must be set to partial.

Fixes #5946
  • Loading branch information
JamesGuthrie committed Aug 28, 2023
1 parent a323547 commit 01e480d
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 35 deletions.
4 changes: 4 additions & 0 deletions .unreleased/bugfix_5951
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Implements: #5951 _timescaledb_internal.create_compressed_chunk doesn't account for existing uncompressed rows

Fixes: #5946

26 changes: 16 additions & 10 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -3674,22 +3674,28 @@ chunk_set_compressed_id_in_tuple(TupleInfo *ti, void *data)
bool
ts_chunk_set_compressed_chunk(Chunk *chunk, int32 compressed_chunk_id)
{
bool success = false;
ScanKeyData scankey[1];
ScanKeyInit(&scankey[0],
Anum_chunk_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(chunk->fd.id));
return chunk_scan_internal(CHUNK_ID_INDEX,
scankey,
1,
chunk_check_ignorearg_dropped_filter,
chunk_set_compressed_id_in_tuple,
&compressed_chunk_id,
0,
ForwardScanDirection,
RowExclusiveLock,
CurrentMemoryContext) > 0;
success = chunk_scan_internal(CHUNK_ID_INDEX,
scankey,
1,
chunk_check_ignorearg_dropped_filter,
chunk_set_compressed_id_in_tuple,
&compressed_chunk_id,
0,
ForwardScanDirection,
RowExclusiveLock,
CurrentMemoryContext) > 0;
if (success)
{
chunk->fd.status = ts_set_flags_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED);
}
return success;
}

/*Assume permissions are already checked */
Expand Down
27 changes: 2 additions & 25 deletions src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -1364,29 +1364,6 @@ hypertable_check_associated_schema_permissions(const char *schema_name, Oid user
return schema_oid;
}

static bool
relation_has_tuples(Relation rel)
{
TableScanDesc scandesc = table_beginscan(rel, GetActiveSnapshot(), 0, NULL);
TupleTableSlot *slot =
MakeSingleTupleTableSlot(RelationGetDescr(rel), table_slot_callbacks(rel));
bool hastuples = table_scan_getnextslot(scandesc, ForwardScanDirection, slot);

table_endscan(scandesc);
ExecDropSingleTupleTableSlot(slot);
return hastuples;
}

static bool
table_has_tuples(Oid table_relid, LOCKMODE lockmode)
{
Relation rel = table_open(table_relid, lockmode);
bool hastuples = relation_has_tuples(rel);

table_close(rel, lockmode);
return hastuples;
}

static bool
table_is_logged(Oid table_relid)
{
Expand Down Expand Up @@ -1641,7 +1618,7 @@ ts_hypertable_insert_blocker_trigger_add(PG_FUNCTION_ARGS)

ts_hypertable_permissions_check(relid, GetUserId());

if (table_has_tuples(relid, AccessShareLock))
if (ts_table_has_tuples(relid, AccessShareLock))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("hypertable \"%s\" has data in the root table", get_rel_name(relid)),
Expand Down Expand Up @@ -2208,7 +2185,7 @@ ts_hypertable_create_from_info(Oid table_relid, int32 hypertable_id, uint32 flag
/* Check that the table doesn't have any unsupported constraints */
hypertable_validate_constraints(table_relid, replication_factor);

table_has_data = relation_has_tuples(rel);
table_has_data = ts_relation_has_tuples(rel);

if ((flags & HYPERTABLE_CREATE_MIGRATE_DATA) == 0 && table_has_data)
ereport(ERROR,
Expand Down
24 changes: 24 additions & 0 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <utils/fmgrprotos.h>
#include <utils/lsyscache.h>
#include <utils/relcache.h>
#include <utils/snapmgr.h>
#include <utils/syscache.h>

#include "compat/compat.h"
Expand Down Expand Up @@ -1348,3 +1349,26 @@ ts_map_attno(Oid src_rel, Oid dst_rel, AttrNumber attno)
pfree(attname);
return dst_attno;
}

bool
ts_relation_has_tuples(Relation rel)
{
TableScanDesc scandesc = table_beginscan(rel, GetActiveSnapshot(), 0, NULL);
TupleTableSlot *slot =
MakeSingleTupleTableSlot(RelationGetDescr(rel), table_slot_callbacks(rel));
bool hastuples = table_scan_getnextslot(scandesc, ForwardScanDirection, slot);

table_endscan(scandesc);
ExecDropSingleTupleTableSlot(slot);
return hastuples;
}

bool
ts_table_has_tuples(Oid table_relid, LOCKMODE lockmode)
{
Relation rel = table_open(table_relid, lockmode);
bool hastuples = ts_relation_has_tuples(rel);

table_close(rel, lockmode);
return hastuples;
}
3 changes: 3 additions & 0 deletions src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ extern TSDLLEXPORT void ts_copy_relation_acl(const Oid source_relid, const Oid t
extern TSDLLEXPORT bool ts_data_node_is_available_by_server(const ForeignServer *server);
extern TSDLLEXPORT bool ts_data_node_is_available(const char *node_name);

extern TSDLLEXPORT bool ts_relation_has_tuples(Relation rel);
extern TSDLLEXPORT bool ts_table_has_tuples(Oid table_relid, LOCKMODE lockmode);

extern TSDLLEXPORT AttrNumber ts_map_attno(Oid src_rel, Oid dst_rel, AttrNumber attno);

/*
Expand Down
9 changes: 9 additions & 0 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ tsl_create_compressed_chunk(PG_FUNCTION_ARGS)
Chunk *compress_ht_chunk;
Cache *hcache;
CompressChunkCxt cxt;
bool chunk_was_compressed;

Assert(!PG_ARGISNULL(0));
Assert(!PG_ARGISNULL(1));
Expand Down Expand Up @@ -812,7 +813,15 @@ tsl_create_compressed_chunk(PG_FUNCTION_ARGS)
numrows_pre_compression,
numrows_post_compression);

chunk_was_compressed = ts_chunk_is_compressed(cxt.srcht_chunk);
ts_chunk_set_compressed_chunk(cxt.srcht_chunk, compress_ht_chunk->fd.id);
if (!chunk_was_compressed && ts_table_has_tuples(cxt.srcht_chunk->table_id, AccessShareLock))
{
/* The chunk was not compressed before it had the compressed chunk
* attached to it, and it contains rows, so we set it to be partial.
*/
ts_chunk_set_partial(cxt.srcht_chunk);
}
ts_cache_release(hcache);

PG_RETURN_OID(chunk_relid);
Expand Down
69 changes: 69 additions & 0 deletions tsl/test/expected/compression_create_compressed_table.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_SUPERUSER
-- create compressed hypertable
CREATE TABLE "public"."metrics" (
"time" timestamp with time zone NOT NULL,
"device_id" "text",
"val" double precision
);
SELECT create_hypertable('public.metrics', 'time');
create_hypertable
----------------------
(1,public,metrics,t)
(1 row)

ALTER TABLE metrics SET (timescaledb.compress, timescaledb.compress_orderby = 'time', timescaledb.compress_segmentby = 'device_id');
-- insert uncompressed row into hypertable
INSERT INTO metrics (time, device_id, val)
VALUES('2023-05-01T00:00:00Z'::timestamptz, 1, 1.0);
SELECT count(*) FROM _timescaledb_internal._hyper_1_1_chunk;
count
-------
1
(1 row)

-- compress these rows, we do this to get compressed row data for the test
SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk');
compress_chunk
----------------------------------------
_timescaledb_internal._hyper_1_1_chunk
(1 row)

-- create custom compressed chunk table
CREATE TABLE "_timescaledb_internal"."custom_compressed_chunk"() INHERITS ("_timescaledb_internal"."_compressed_hypertable_2");
-- copy compressed row from compressed table into custom compressed chunk table
INSERT INTO "_timescaledb_internal"."custom_compressed_chunk" SELECT * FROM "_timescaledb_internal"."compress_hyper_2_2_chunk";
-- decompress the rows, moving them back to uncompressed space
SELECT decompress_chunk('"_timescaledb_internal"."_hyper_1_1_chunk"');
decompress_chunk
----------------------------------------
_timescaledb_internal._hyper_1_1_chunk
(1 row)

-- attach compressed chunk to parent chunk
SELECT _timescaledb_internal.create_compressed_chunk(
'"_timescaledb_internal"."_hyper_1_1_chunk"'::TEXT::REGCLASS,
'"_timescaledb_internal"."custom_compressed_chunk"'::TEXT::REGCLASS,
8192,
8192,
16384,
8192,
8192,
16384,
1,
1
);
create_compressed_chunk
----------------------------------------
_timescaledb_internal._hyper_1_1_chunk
(1 row)

-- select total rows in chunk (should be 2)
SELECT count(*) FROM "_timescaledb_internal"."_hyper_1_1_chunk";
count
-------
2
(1 row)

1 change: 1 addition & 0 deletions tsl/test/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ set(TEST_FILES
cagg_refresh.sql
cagg_watermark.sql
compressed_collation.sql
compression_create_compressed_table.sql
compression_bgw.sql
compression_conflicts.sql
compression_insert.sql
Expand Down
50 changes: 50 additions & 0 deletions tsl/test/sql/compression_create_compressed_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.

\c :TEST_DBNAME :ROLE_SUPERUSER

-- create compressed hypertable
CREATE TABLE "public"."metrics" (
"time" timestamp with time zone NOT NULL,
"device_id" "text",
"val" double precision
);
SELECT create_hypertable('public.metrics', 'time');
ALTER TABLE metrics SET (timescaledb.compress, timescaledb.compress_orderby = 'time', timescaledb.compress_segmentby = 'device_id');

-- insert uncompressed row into hypertable
INSERT INTO metrics (time, device_id, val)
VALUES('2023-05-01T00:00:00Z'::timestamptz, 1, 1.0);

SELECT count(*) FROM _timescaledb_internal._hyper_1_1_chunk;

-- compress these rows, we do this to get compressed row data for the test
SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk');

-- create custom compressed chunk table
CREATE TABLE "_timescaledb_internal"."custom_compressed_chunk"() INHERITS ("_timescaledb_internal"."_compressed_hypertable_2");

-- copy compressed row from compressed table into custom compressed chunk table
INSERT INTO "_timescaledb_internal"."custom_compressed_chunk" SELECT * FROM "_timescaledb_internal"."compress_hyper_2_2_chunk";

-- decompress the rows, moving them back to uncompressed space
SELECT decompress_chunk('"_timescaledb_internal"."_hyper_1_1_chunk"');

-- attach compressed chunk to parent chunk
SELECT _timescaledb_internal.create_compressed_chunk(
'"_timescaledb_internal"."_hyper_1_1_chunk"'::TEXT::REGCLASS,
'"_timescaledb_internal"."custom_compressed_chunk"'::TEXT::REGCLASS,
8192,
8192,
16384,
8192,
8192,
16384,
1,
1
);

-- select total rows in chunk (should be 2)
SELECT count(*) FROM "_timescaledb_internal"."_hyper_1_1_chunk";

0 comments on commit 01e480d

Please sign in to comment.