Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update bgw job table when procedure altered
Browse files Browse the repository at this point in the history
This deals with the following modifications to the name of a procedure
used by a background job:

- If a procedure that exists in the jobs table is renamed, the
  corresponding names in the table will also be changed.

- When a procedure that is used for a background worker
  job is moved to a different schema, modify the job entry as well.

- When a schema is renamed, rename the schema name in the procedures as
  well.

- When a schema is dropped and there are procedures used by jobs, it
  will either remove the entry or throw an error, depending on whether
  `CASCADE` or `RESTRICT` is in effect.

- When a procedure or function used by a job is dropped, it will error
  out or delete the job depending on whether `CASCADE` or `RESTRICT` is
  in effect.
mkindahl committed Nov 13, 2024

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 062a119 commit e106498
Showing 4 changed files with 451 additions and 52 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7409
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #7409 Update bgw job table when altering procedure
258 changes: 247 additions & 11 deletions src/process_utility.c
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@
#include <parser/parse_type.h>
#include <parser/parse_utilcmd.h>
#include <storage/lmgr.h>
#include <storage/lockdefs.h>
#include <tcop/utility.h>
#include <utils/acl.h>
#include <utils/builtins.h>
@@ -42,6 +43,7 @@
#include <utils/inval.h>
#include <utils/lsyscache.h>
#include <utils/palloc.h>
#include <utils/regproc.h>
#include <utils/rel.h>
#include <utils/snapmgr.h>
#include <utils/syscache.h>
@@ -472,6 +474,221 @@ process_drop_trigger_start(ProcessUtilityArgs *args, DropStmt *stmt)
ts_cache_release(hcache);
}

static DDLResult
process_drop_schema_start(DropStmt *stmt)
{
/*
* An error will be raised when we start dropping the functions used by a
* background worker, so there is no point in doing anything here.
*/
if (stmt->behavior == DROP_RESTRICT)
return DDL_CONTINUE;

/*
* Here we are relying on that if we fail to drop one of the
* procedures/functions, this transaction will be rolled back so these
* changes will not be committed.
*/
ScanIterator iterator =
ts_scan_iterator_create(BGW_JOB, RowExclusiveLock, CurrentMemoryContext);
ts_scanner_foreach(&iterator)
{
ListCell *cell;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
bool schema_isnull, job_id_isnull;
int32 job_id = DatumGetInt32(slot_getattr(ti->slot, Anum_bgw_job_id, &job_id_isnull));
Name proc_schema =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_schema, &schema_isnull));
Ensure(!job_id_isnull, "corrupt job entry: job id is null");
Ensure(!schema_isnull, "corrupt job entry: schema for job %d is null", job_id);
foreach (cell, stmt->objects)
{
String *object = castNode(String, lfirst(cell));
if (namestrcmp(proc_schema, object->sval) == 0)
{
CatalogSecurityContext sec_ctx;
Assert(stmt->behavior == DROP_CASCADE);
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ereport(NOTICE, errmsg("drop cascades to job %d", job_id));
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
ts_catalog_restore_user(&sec_ctx);
}
}
}
return DDL_CONTINUE;
}

/*
* Start of dropping a procedure.
*
* We can abort the drop here by throwing an error.
*/
static void
process_drop_procedure_start(DropStmt *stmt)
{
ScanIterator iterator =
ts_scan_iterator_create(BGW_JOB, RowExclusiveLock, CurrentMemoryContext);
ts_scanner_foreach(&iterator)
{
ListCell *cell;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
bool schema_isnull, name_isnull, job_id_isnull;
Name proc_schema =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_schema, &schema_isnull));
Name proc_name = DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_name, &name_isnull));
int32 job_id = DatumGetInt32(slot_getattr(ti->slot, Anum_bgw_job_id, &job_id_isnull));
Ensure(!job_id_isnull, "corrupt job entry: job id was null");
Ensure(!schema_isnull, "corrupt job entry: schema for job %d was null", job_id);
Ensure(!name_isnull, "corrupt job entry: name for job %d was null", job_id);
TS_DEBUG_LOG("looking at job %d with %s.%s",
job_id,
NameStr(*proc_schema),
NameStr(*proc_name));
foreach (cell, stmt->objects)
{
ObjectWithArgs *object = castNode(ObjectWithArgs, lfirst(cell));
RangeVar *rel = makeRangeVarFromNameList(object->objname);
if (namestrcmp(proc_schema, rel->schemaname) == 0 &&
namestrcmp(proc_name, rel->relname) == 0)
{
Assert(stmt->removeType == OBJECT_PROCEDURE || stmt->removeType == OBJECT_FUNCTION);
if (stmt->behavior == DROP_RESTRICT)
{
ereport(ERROR,
errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("cannot drop %s because background job %d depends on it",
NameListToString(object->objname),
job_id),
errhint("Use delete_job() to drop the job first."));
}
else
{
CatalogSecurityContext sec_ctx;
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ereport(NOTICE, errmsg("drop cascades to job %d", job_id));
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
ts_catalog_restore_user(&sec_ctx);
}
}
}
}
}

static void
replace_attr_if_changed(AttrNumber attno, const char *newvalue, Name name_buf, Datum *values,
bool *replace)
{
if (newvalue)
{
const Name orig_value = DatumGetName(values[AttrNumberGetAttrOffset(attno)]);
if (namestrcmp(orig_value, newvalue) != 0)
{
namestrcpy(name_buf, newvalue);
values[AttrNumberGetAttrOffset(attno)] = NameGetDatum(name_buf);
replace[AttrNumberGetAttrOffset(attno)] = true;
}
}
}

/*
* Update the schema or name of a procedure in the jobs tuple.
*/
static void
ts_bgw_job_update_proc(Relation rel, HeapTuple tuple, TupleDesc tupledesc, const char *newschema,
const char *newname)
{
bool isnull[Natts_bgw_job];
Datum values[Natts_bgw_job];
bool replace[Natts_bgw_job] = { false };

/* Allocated here to make sure that they are alive at the call of
* heap_modify_tuple */
NameData proc_name_buf;
NameData proc_schema_buf;

heap_deform_tuple(tuple, tupledesc, values, isnull);

replace_attr_if_changed(Anum_bgw_job_proc_name, newname, &proc_name_buf, values, replace);
replace_attr_if_changed(Anum_bgw_job_proc_schema, newschema, &proc_schema_buf, values, replace);

HeapTuple new_tuple = heap_modify_tuple(tuple, tupledesc, values, isnull, replace);
ts_catalog_update(rel, new_tuple);
heap_freetuple(new_tuple);
}

static void
ts_bgw_job_rename_schema_name(const char *old_schema_name, const char *new_schema_name)
{
ScanIterator iterator =
ts_scan_iterator_create(BGW_JOB, RowExclusiveLock, CurrentMemoryContext);
ts_scanner_foreach(&iterator)
{
bool should_free, curr_schema_isnull, curr_name_isnull;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
Name curr_proc_schema =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_schema, &curr_schema_isnull));
Name curr_proc_name =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_name, &curr_name_isnull));
if (!curr_schema_isnull && namestrcmp(curr_proc_schema, old_schema_name) == 0)
{
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
ts_bgw_job_update_proc(ti->scanrel,
tuple,
ts_scanner_get_tupledesc(ti),
new_schema_name,
NameStr(*curr_proc_name));

if (should_free)
heap_freetuple(tuple);
}
}
}

static DDLResult
ts_bgw_job_rename_proc(ObjectAddress address, const char *newschema, const char *newname)
{
ScanIterator iterator =
ts_scan_iterator_create(BGW_JOB, RowExclusiveLock, CurrentMemoryContext);
ts_scanner_foreach(&iterator)
{
bool should_free, curr_schema_isnull, curr_name_isnull;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
Name curr_proc_schema =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_schema, &curr_schema_isnull));
Name curr_proc_name =
DatumGetName(slot_getattr(ti->slot, Anum_bgw_job_proc_name, &curr_name_isnull));
const char *old_proc_schema = get_namespace_name(get_func_namespace(address.objectId));
const char *old_proc_name = get_func_name(address.objectId);
if (!curr_schema_isnull && !curr_name_isnull &&
namestrcmp(curr_proc_name, old_proc_name) == 0 &&
namestrcmp(curr_proc_schema, old_proc_schema) == 0)
{
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
ts_bgw_job_update_proc(ti->scanrel,
tuple,
ts_scanner_get_tupledesc(ti),
newschema,
newname);

if (should_free)
heap_freetuple(tuple);
}
}
return DDL_CONTINUE;
}

static void
process_alterprocedureschema(ProcessUtilityArgs *args)
{
AlterObjectSchemaStmt *stmt = (AlterObjectSchemaStmt *) args->parsetree;
Relation relation;

Assert(stmt->objectType == OBJECT_PROCEDURE || stmt->objectType == OBJECT_FUNCTION);
ObjectAddress address =
get_object_address(stmt->objectType, stmt->object, &relation, AccessExclusiveLock, false);
ts_bgw_job_rename_proc(address, stmt->newschema, NULL);
}

/* We use this for both materialized views and views. */
static void
process_alterviewschema(ProcessUtilityArgs *args)
@@ -558,6 +775,10 @@ process_alterobjectschema(ProcessUtilityArgs *args)
case OBJECT_VIEW:
process_alterviewschema(args);
break;
case OBJECT_PROCEDURE:
case OBJECT_FUNCTION:
process_alterprocedureschema(args);
break;
default:
break;
}
@@ -1775,6 +1996,13 @@ process_drop_start(ProcessUtilityArgs *args)
case OBJECT_TRIGGER:
process_drop_trigger_start(args, stmt);
break;
case OBJECT_PROCEDURE:
case OBJECT_FUNCTION:
process_drop_procedure_start(stmt);
break;
case OBJECT_SCHEMA:
process_drop_schema_start(stmt);
break;
default:
break;
}
@@ -2074,12 +2302,23 @@ process_rename_schema(RenameStmt *stmt)
}
}

ts_bgw_job_rename_schema_name(stmt->subname, stmt->newname);
ts_chunks_rename_schema_name(stmt->subname, stmt->newname);
ts_dimensions_rename_schema_name(stmt->subname, stmt->newname);
ts_hypertables_rename_schema_name(stmt->subname, stmt->newname);
ts_continuous_agg_rename_schema_name(stmt->subname, stmt->newname);
}

static void
process_rename_procedure(ProcessUtilityArgs *args)
{
RenameStmt *stmt = (RenameStmt *) args->parsetree;
Relation relation;
ObjectAddress address =
get_object_address(stmt->renameType, stmt->object, &relation, AccessExclusiveLock, false);
ts_bgw_job_rename_proc(address, NULL, stmt->newname);
}

static void
rename_hypertable_constraint(Hypertable *ht, Oid chunk_relid, void *arg)
{
@@ -2145,6 +2384,8 @@ process_rename_constraint_or_trigger(ProcessUtilityArgs *args, Cache *hcache, Oi

ht = ts_hypertable_cache_get_entry(hcache, relid, CACHE_FLAG_MISSING_OK);

Assert(stmt->relation != NULL);

if (NULL != ht)
{
relation_not_only(stmt->relation);
@@ -2180,15 +2421,6 @@ process_rename(ProcessUtilityArgs *args)
if (!OidIsValid(relid))
return DDL_CONTINUE;
}
else
{
/*
* stmt->relation never be NULL unless we are renaming a schema or
* other objects, like foreign server
*/
if (stmt->renameType != OBJECT_SCHEMA)
return DDL_CONTINUE;
}

hcache = ts_hypertable_cache_pin();

@@ -2214,6 +2446,10 @@ process_rename(ProcessUtilityArgs *args)
case OBJECT_SCHEMA:
process_rename_schema(stmt);
break;
case OBJECT_PROCEDURE:
case OBJECT_FUNCTION:
process_rename_procedure(args);
break;
default:
break;
}
@@ -4628,7 +4864,7 @@ process_drop_table(EventTriggerDropObject *obj)
}

static void
process_drop_schema(EventTriggerDropObject *obj)
process_sql_drop_schema(EventTriggerDropObject *obj)
{
EventTriggerDropSchema *schema = (EventTriggerDropSchema *) obj;
int count;
@@ -4695,7 +4931,7 @@ process_ddl_sql_drop(EventTriggerDropObject *obj)
process_drop_table(obj);
break;
case EVENT_TRIGGER_DROP_SCHEMA:
process_drop_schema(obj);
process_sql_drop_schema(obj);
break;
case EVENT_TRIGGER_DROP_TRIGGER:
process_drop_trigger(obj);
Loading

0 comments on commit e106498

Please sign in to comment.