Skip to content

Commit

Permalink
Update bgw job table when procedure altered
Browse files Browse the repository at this point in the history
This deals with the following modifications to the name of a procedure
used by a background job:

- If a procedure that exists in the jobs table is renamed, the
  corresponding names in the table will also be changed.

- When a procedure that is used for a background worker
  job is moved to a different schema, modify the job entry as well.

- When a schema is renamed, rename the schema name in the procedures as
  well.

- When a procedure or function used by a job is dropped, it will error
  out.
  • Loading branch information
mkindahl committed Nov 12, 2024
1 parent 062a119 commit ec8be6f
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 42 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7409
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #7409 Update bgw job table when altering procedure
192 changes: 183 additions & 9 deletions src/process_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <parser/parse_type.h>
#include <parser/parse_utilcmd.h>
#include <storage/lmgr.h>
#include <storage/lockdefs.h>
#include <tcop/utility.h>
#include <utils/acl.h>
#include <utils/builtins.h>
Expand All @@ -42,6 +43,7 @@
#include <utils/inval.h>
#include <utils/lsyscache.h>
#include <utils/palloc.h>
#include <utils/regproc.h>
#include <utils/rel.h>
#include <utils/snapmgr.h>
#include <utils/syscache.h>
Expand Down Expand Up @@ -472,6 +474,162 @@ process_drop_trigger_start(ProcessUtilityArgs *args, DropStmt *stmt)
ts_cache_release(hcache);
}

static void
process_drop_procedure_start(ProcessUtilityArgs *args, DropStmt *stmt)
{
ListCell *cell;

ScanIterator iterator =
ts_scan_iterator_create(BGW_JOB, RowExclusiveLock, CurrentMemoryContext);
ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
bool schema_isnull, name_isnull;
Name proc_schema =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_schema, &schema_isnull));
Name proc_name = DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_name, &name_isnull));
if (!schema_isnull && !name_isnull)
{
foreach (cell, stmt->objects)
{
ObjectWithArgs *object = castNode(ObjectWithArgs, lfirst(cell));
RangeVar *rel = makeRangeVarFromNameList(object->objname);
if (namestrcmp(proc_schema, rel->schemaname) == 0 &&
namestrcmp(proc_name, rel->relname) == 0)
{
Assert(stmt->removeType == OBJECT_PROCEDURE ||
stmt->removeType == OBJECT_FUNCTION);
bool job_id_isnull;
const int32 job_id =
DatumGetInt32(slot_getattr(ti->slot, Anum_bgw_job_id, &job_id_isnull));
Ensure(!job_id_isnull, "job id was null");
ereport(ERROR,
errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("cannot drop %s because background job %d depends on it",
NameListToString(object->objname),
job_id),
errhint("Use delete_job() to drop the job first."));
}
}
}
}
}

static void
replace_attr_if_changed(AttrNumber attno, const char *newvalue, Name name_buf, Datum *values,
bool *replace)
{
if (newvalue)
{
const Name orig_value = DatumGetName(values[AttrNumberGetAttrOffset(attno)]);
if (namestrcmp(orig_value, newvalue) != 0)
{
namestrcpy(name_buf, newvalue);
values[AttrNumberGetAttrOffset(attno)] = NameGetDatum(name_buf);
replace[AttrNumberGetAttrOffset(attno)] = true;
}
}
}

/*
* Update the schema or name of a procedure in the jobs tuple.
*/
static void
ts_bgw_job_update_proc(Relation rel, HeapTuple tuple, TupleDesc tupledesc, const char *newschema,
const char *newname)
{
bool isnull[Natts_bgw_job];
Datum values[Natts_bgw_job];
bool replace[Natts_bgw_job] = { false };

/* Allocated here to make sure that they are alive at the call of
* heap_modify_tuple */
NameData proc_name_buf;
NameData proc_schema_buf;

heap_deform_tuple(tuple, tupledesc, values, isnull);

replace_attr_if_changed(Anum_bgw_job_proc_name, newname, &proc_name_buf, values, replace);
replace_attr_if_changed(Anum_bgw_job_proc_schema, newschema, &proc_schema_buf, values, replace);

HeapTuple new_tuple = heap_modify_tuple(tuple, tupledesc, values, isnull, replace);
ts_catalog_update(rel, new_tuple);
heap_freetuple(new_tuple);
}

static void
ts_bgw_job_rename_schema_name(const char *old_schema_name, const char *new_schema_name)
{
ScanIterator iterator =
ts_scan_iterator_create(BGW_JOB, RowExclusiveLock, CurrentMemoryContext);
ts_scanner_foreach(&iterator)
{
bool should_free, curr_schema_isnull, curr_name_isnull;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
Name curr_proc_schema =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_schema, &curr_schema_isnull));
Name curr_proc_name =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_name, &curr_name_isnull));
if (!curr_schema_isnull && namestrcmp(curr_proc_schema, old_schema_name) == 0)
{
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
ts_bgw_job_update_proc(ti->scanrel,
tuple,
ts_scanner_get_tupledesc(ti),
new_schema_name,
NameStr(*curr_proc_name));

if (should_free)
heap_freetuple(tuple);

Check warning on line 583 in src/process_utility.c

View check run for this annotation

Codecov / codecov/patch

src/process_utility.c#L583

Added line #L583 was not covered by tests
}
}
}

static DDLResult
ts_bgw_job_rename_proc(ObjectAddress address, const char *newschema, const char *newname)
{
ScanIterator iterator =
ts_scan_iterator_create(BGW_JOB, RowExclusiveLock, CurrentMemoryContext);
ts_scanner_foreach(&iterator)
{
bool should_free, curr_schema_isnull, curr_name_isnull;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
Name curr_proc_schema =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_schema, &curr_schema_isnull));
Name curr_proc_name =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_name, &curr_name_isnull));
const char *old_proc_schema = get_namespace_name(get_func_namespace(address.objectId));
const char *old_proc_name = get_func_name(address.objectId);
if (!curr_schema_isnull && !curr_name_isnull &&
namestrcmp(curr_proc_name, old_proc_name) == 0 &&
namestrcmp(curr_proc_schema, old_proc_schema) == 0)
{
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
ts_bgw_job_update_proc(ti->scanrel,
tuple,
ts_scanner_get_tupledesc(ti),
newschema,
newname);

if (should_free)
heap_freetuple(tuple);

Check warning on line 615 in src/process_utility.c

View check run for this annotation

Codecov / codecov/patch

src/process_utility.c#L615

Added line #L615 was not covered by tests
}
}
return DDL_CONTINUE;
}

static void
process_alterprocedureschema(ProcessUtilityArgs *args)
{
AlterObjectSchemaStmt *stmt = (AlterObjectSchemaStmt *) args->parsetree;
Relation relation;

Assert(stmt->objectType == OBJECT_PROCEDURE || stmt->objectType == OBJECT_FUNCTION);
ObjectAddress address =
get_object_address(stmt->objectType, stmt->object, &relation, AccessExclusiveLock, false);
ts_bgw_job_rename_proc(address, stmt->newschema, NULL);
}

/* We use this for both materialized views and views. */
static void
process_alterviewschema(ProcessUtilityArgs *args)
Expand Down Expand Up @@ -558,6 +716,10 @@ process_alterobjectschema(ProcessUtilityArgs *args)
case OBJECT_VIEW:
process_alterviewschema(args);
break;
case OBJECT_PROCEDURE:
case OBJECT_FUNCTION:
process_alterprocedureschema(args);
break;
default:
break;
}
Expand Down Expand Up @@ -1775,6 +1937,10 @@ process_drop_start(ProcessUtilityArgs *args)
case OBJECT_TRIGGER:
process_drop_trigger_start(args, stmt);
break;
case OBJECT_PROCEDURE:
case OBJECT_FUNCTION:
process_drop_procedure_start(args, stmt);
break;
default:
break;
}
Expand Down Expand Up @@ -2074,12 +2240,23 @@ process_rename_schema(RenameStmt *stmt)
}
}

ts_bgw_job_rename_schema_name(stmt->subname, stmt->newname);
ts_chunks_rename_schema_name(stmt->subname, stmt->newname);
ts_dimensions_rename_schema_name(stmt->subname, stmt->newname);
ts_hypertables_rename_schema_name(stmt->subname, stmt->newname);
ts_continuous_agg_rename_schema_name(stmt->subname, stmt->newname);
}

static void
process_rename_procedure(ProcessUtilityArgs *args)
{
RenameStmt *stmt = (RenameStmt *) args->parsetree;
Relation relation;
ObjectAddress address =
get_object_address(stmt->renameType, stmt->object, &relation, AccessExclusiveLock, false);
ts_bgw_job_rename_proc(address, NULL, stmt->newname);
}

static void
rename_hypertable_constraint(Hypertable *ht, Oid chunk_relid, void *arg)
{
Expand Down Expand Up @@ -2145,6 +2322,8 @@ process_rename_constraint_or_trigger(ProcessUtilityArgs *args, Cache *hcache, Oi

ht = ts_hypertable_cache_get_entry(hcache, relid, CACHE_FLAG_MISSING_OK);

Assert(stmt->relation != NULL);

if (NULL != ht)
{
relation_not_only(stmt->relation);
Expand Down Expand Up @@ -2180,15 +2359,6 @@ process_rename(ProcessUtilityArgs *args)
if (!OidIsValid(relid))
return DDL_CONTINUE;
}
else
{
/*
* stmt->relation never be NULL unless we are renaming a schema or
* other objects, like foreign server
*/
if (stmt->renameType != OBJECT_SCHEMA)
return DDL_CONTINUE;
}

hcache = ts_hypertable_cache_pin();

Expand All @@ -2214,6 +2384,10 @@ process_rename(ProcessUtilityArgs *args)
case OBJECT_SCHEMA:
process_rename_schema(stmt);
break;
case OBJECT_PROCEDURE:
case OBJECT_FUNCTION:
process_rename_procedure(args);
break;
default:
break;
}
Expand Down
Loading

0 comments on commit ec8be6f

Please sign in to comment.