Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No xact callback for spi mutability #1104

Merged
merged 1 commit into from
Apr 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 43 additions & 29 deletions pgrx/src/spi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ Use of this source code is governed by the MIT license that can be found in the

//! Safe access to Postgres' *Server Programming Interface* (SPI).

use crate::{
pg_sys, register_xact_callback, FromDatum, IntoDatum, Json, PgMemoryContexts, PgOid,
PgXactCallbackEvent, TryFromDatumError,
};
use crate::{pg_sys, FromDatum, IntoDatum, Json, PgMemoryContexts, PgOid, TryFromDatumError};
use core::fmt::Formatter;
use pgrx_pg_sys::panic::ErrorReportable;
use std::ffi::{CStr, CString};
Expand All @@ -21,7 +18,6 @@ use std::marker::PhantomData;
use std::mem;
use std::ops::{Deref, Index};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicBool, Ordering};

pub type Result<T> = std::result::Result<T, Error>;

Expand Down Expand Up @@ -188,18 +184,13 @@ pub enum Error {

pub struct Spi;

static MUTABLE_MODE: AtomicBool = AtomicBool::new(false);
impl Spi {
#[inline]
fn is_read_only() -> bool {
MUTABLE_MODE.load(Ordering::Relaxed) == false
}

#[inline]
fn clear_mutable() {
MUTABLE_MODE.store(false, Ordering::Relaxed)
}

/// Determines if the current transaction can still be `read_only = true` for purposes of Spi
/// queries. This is detected in such a way that prior mutable commands within this transaction
/// (even those not executed via pgx' Spi) will influence whether or not we con consider the
/// transaction `read_only = true`. This is what we want as the user will expect an otherwise
/// read only statement like SELECT to see the results of prior statements.
///
/// Postgres docs say:
///
/// ```text
Expand All @@ -208,16 +199,35 @@ impl Spi {
/// would not see the results of any database updates done by the read-write queries.
///```
///
/// We extend this to mean "within a single transaction". We set the static `MUTABLE_MODE`
/// here, and register callbacks for both transaction COMMIT and ABORT to clear it, if it's
/// the first time in. This way, once Spi has entered "mutable mode", it stays that way until
/// the current transaction is finished.
/// pgx interprets this to mean:
/// ```text
/// Within a transaction, it's fine to execute Spi commands as `read_only = true` until the
/// first mutable statement (DDL or DML). From that point forward **all** statements
/// must be executed as `read_only = false`.
/// ```
fn is_xact_still_immutable() -> bool {
unsafe {
// SAFETY: `pg_sys::GetCurrentTransactionIdIfAny()` will always return a valid
// TransactionId value, even if it's `InvalidTransactionId`.
let current_xid = pg_sys::GetCurrentTransactionIdIfAny();

// no assigned TransactionId means no mutation has occurred in this transaction
current_xid == pg_sys::InvalidTransactionId
}
}

/// Let Postgres know that we intend to perform some kind of mutating operation in this transaction.
///
/// From this point forward, within the current transaction, [`Spi::is_xact_still_immutable()`] will
/// return `false`.
fn mark_mutable() {
if Spi::is_read_only() {
register_xact_callback(PgXactCallbackEvent::Commit, || Spi::clear_mutable());
register_xact_callback(PgXactCallbackEvent::Abort, || Spi::clear_mutable());
unsafe {
// SAFETY: `pg_sys::GetCurrentTransactionId()` will return a valid, possibly newly-created
// TransactionId or it'll raise an ERROR trying.

MUTABLE_MODE.store(true, Ordering::Relaxed)
// The act of marking this transaction mutable is simply asking Postgres for the current
// TransactionId in a way where it will assign one if necessary
let _ = pg_sys::GetCurrentTransactionId();
}
}
}
Expand Down Expand Up @@ -349,14 +359,18 @@ impl<'a> Query for &'a str {
argtypes.as_mut_ptr(),
datums.as_mut_ptr(),
nulls.as_ptr(),
Spi::is_read_only(),
Spi::is_xact_still_immutable(),
limit.unwrap_or(0),
)
}
}
// SAFETY: arguments are prepared above
None => unsafe {
pg_sys::SPI_execute(src.as_ptr(), Spi::is_read_only(), limit.unwrap_or(0))
pg_sys::SPI_execute(
src.as_ptr(),
Spi::is_xact_still_immutable(),
limit.unwrap_or(0),
)
},
};

Expand Down Expand Up @@ -386,7 +400,7 @@ impl<'a> Query for &'a str {
argtypes.as_mut_ptr(),
datums.as_mut_ptr(),
nulls.as_ptr(),
Spi::is_read_only(),
Spi::is_xact_still_immutable(),
0,
))
};
Expand Down Expand Up @@ -893,7 +907,7 @@ impl<'a: 'b, 'b> Query for &'b PreparedStatement<'a> {
self.plan.as_ptr(),
datums.as_mut_ptr(),
nulls.as_mut_ptr(),
Spi::is_read_only(),
Spi::is_xact_still_immutable(),
limit.unwrap_or(0),
)
};
Expand All @@ -918,7 +932,7 @@ impl<'a: 'b, 'b> Query for &'b PreparedStatement<'a> {
self.plan.as_ptr(),
datums.as_mut_ptr(),
nulls.as_ptr(),
Spi::is_read_only(),
Spi::is_xact_still_immutable(),
))
};
SpiCursor { ptr, __marker: PhantomData }
Expand Down