Skip to content

Commit

Permalink
pgx shouldn't use register_xact_callback to help track a transact…
Browse files Browse the repository at this point in the history
…ion's mutability.

This can cause problems if for some reason the pgx extension is unloaded from a running postgres backend as Postgres will still have a pointer to the transaction hook function that has now been unloaded. (#1104)

Instead, we can use the current `TransactionId` to determine if the transaction has been mutated, which still allows us (more correctly, even), to ensure Spi sets the `read_only` flag appropriately when executing a statement.

This also gets rid of a pgx-specific static.
  • Loading branch information
eeeebbbbrrrr authored Apr 19, 2023
1 parent 9f4ee63 commit a9f248a
Showing 1 changed file with 43 additions and 29 deletions.
72 changes: 43 additions & 29 deletions pgrx/src/spi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ Use of this source code is governed by the MIT license that can be found in the

//! Safe access to Postgres' *Server Programming Interface* (SPI).

use crate::{
pg_sys, register_xact_callback, FromDatum, IntoDatum, Json, PgMemoryContexts, PgOid,
PgXactCallbackEvent, TryFromDatumError,
};
use crate::{pg_sys, FromDatum, IntoDatum, Json, PgMemoryContexts, PgOid, TryFromDatumError};
use core::fmt::Formatter;
use pgrx_pg_sys::panic::ErrorReportable;
use std::ffi::{CStr, CString};
Expand All @@ -21,7 +18,6 @@ use std::marker::PhantomData;
use std::mem;
use std::ops::{Deref, Index};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicBool, Ordering};

pub type Result<T> = std::result::Result<T, Error>;

Expand Down Expand Up @@ -188,18 +184,13 @@ pub enum Error {

pub struct Spi;

static MUTABLE_MODE: AtomicBool = AtomicBool::new(false);
impl Spi {
#[inline]
fn is_read_only() -> bool {
MUTABLE_MODE.load(Ordering::Relaxed) == false
}

#[inline]
fn clear_mutable() {
MUTABLE_MODE.store(false, Ordering::Relaxed)
}

/// Determines if the current transaction can still be `read_only = true` for purposes of Spi
/// queries. This is detected in such a way that prior mutable commands within this transaction
/// (even those not executed via pgx' Spi) will influence whether or not we con consider the
/// transaction `read_only = true`. This is what we want as the user will expect an otherwise
/// read only statement like SELECT to see the results of prior statements.
///
/// Postgres docs say:
///
/// ```text
Expand All @@ -208,16 +199,35 @@ impl Spi {
/// would not see the results of any database updates done by the read-write queries.
///```
///
/// We extend this to mean "within a single transaction". We set the static `MUTABLE_MODE`
/// here, and register callbacks for both transaction COMMIT and ABORT to clear it, if it's
/// the first time in. This way, once Spi has entered "mutable mode", it stays that way until
/// the current transaction is finished.
/// pgx interprets this to mean:
/// ```text
/// Within a transaction, it's fine to execute Spi commands as `read_only = true` until the
/// first mutable statement (DDL or DML). From that point forward **all** statements
/// must be executed as `read_only = false`.
/// ```
fn is_xact_still_immutable() -> bool {
unsafe {
// SAFETY: `pg_sys::GetCurrentTransactionIdIfAny()` will always return a valid
// TransactionId value, even if it's `InvalidTransactionId`.
let current_xid = pg_sys::GetCurrentTransactionIdIfAny();

// no assigned TransactionId means no mutation has occurred in this transaction
current_xid == pg_sys::InvalidTransactionId
}
}

/// Let Postgres know that we intend to perform some kind of mutating operation in this transaction.
///
/// From this point forward, within the current transaction, [`Spi::is_xact_still_immutable()`] will
/// return `false`.
fn mark_mutable() {
if Spi::is_read_only() {
register_xact_callback(PgXactCallbackEvent::Commit, || Spi::clear_mutable());
register_xact_callback(PgXactCallbackEvent::Abort, || Spi::clear_mutable());
unsafe {
// SAFETY: `pg_sys::GetCurrentTransactionId()` will return a valid, possibly newly-created
// TransactionId or it'll raise an ERROR trying.

MUTABLE_MODE.store(true, Ordering::Relaxed)
// The act of marking this transaction mutable is simply asking Postgres for the current
// TransactionId in a way where it will assign one if necessary
let _ = pg_sys::GetCurrentTransactionId();
}
}
}
Expand Down Expand Up @@ -349,14 +359,18 @@ impl<'a> Query for &'a str {
argtypes.as_mut_ptr(),
datums.as_mut_ptr(),
nulls.as_ptr(),
Spi::is_read_only(),
Spi::is_xact_still_immutable(),
limit.unwrap_or(0),
)
}
}
// SAFETY: arguments are prepared above
None => unsafe {
pg_sys::SPI_execute(src.as_ptr(), Spi::is_read_only(), limit.unwrap_or(0))
pg_sys::SPI_execute(
src.as_ptr(),
Spi::is_xact_still_immutable(),
limit.unwrap_or(0),
)
},
};

Expand Down Expand Up @@ -386,7 +400,7 @@ impl<'a> Query for &'a str {
argtypes.as_mut_ptr(),
datums.as_mut_ptr(),
nulls.as_ptr(),
Spi::is_read_only(),
Spi::is_xact_still_immutable(),
0,
))
};
Expand Down Expand Up @@ -893,7 +907,7 @@ impl<'a: 'b, 'b> Query for &'b PreparedStatement<'a> {
self.plan.as_ptr(),
datums.as_mut_ptr(),
nulls.as_mut_ptr(),
Spi::is_read_only(),
Spi::is_xact_still_immutable(),
limit.unwrap_or(0),
)
};
Expand All @@ -918,7 +932,7 @@ impl<'a: 'b, 'b> Query for &'b PreparedStatement<'a> {
self.plan.as_ptr(),
datums.as_mut_ptr(),
nulls.as_ptr(),
Spi::is_read_only(),
Spi::is_xact_still_immutable(),
))
};
SpiCursor { ptr, __marker: PhantomData }
Expand Down

0 comments on commit a9f248a

Please sign in to comment.