Skip to content

Commit

Permalink
Type hints in PdeData and PdePCE
Browse files Browse the repository at this point in the history
  • Loading branch information
NovakLBUT committed Nov 21, 2023
1 parent edbce33 commit d35d561
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 71 deletions.
8 changes: 4 additions & 4 deletions docs/code/surrogates/pce/plot_pce_euler_UQ.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ def ref_sol(physical_coordinate, q):


# define geometry of physical space
geometry_xmin = np.array([0])
geometry_xmax = np.array([1])
geometry_xmin = [0]
geometry_xmax = [1]

# %% md
#
Expand All @@ -129,7 +129,7 @@ def ref_sol(physical_coordinate, q):
# derivation orders of prescribed BCs
der_orders = [0, 2]
# normals associated to precribed BCs
bc_normals = np.array([0, 0])
bc_normals = [0, 0]
# sampling of BC points

bc_xtotal = bc_sampling(20)
Expand All @@ -147,7 +147,7 @@ def ref_sol(physical_coordinate, q):
#
# Further we construct an object containing PDE physical data and PC :math:`^2` definitions of PDE

pde_pce = PdePCE(pde_data, pde_func, pde_source=pde_res, boundary_conditions=bc_res)
pde_pce = PdePCE(pde_data, pde_func, pde_source=pde_res, boundary_conditions_evaluate=bc_res)

# %% md
#
Expand Down
13 changes: 7 additions & 6 deletions docs/code/surrogates/pce/plot_pce_wave.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ def bc_res(nsim, pce):
der_order = 0
deriv_0_init = np.sum(
derivative_basis(bc_init_s, pce, derivative_order=der_order, leading_variable=0) * (
(2 / 1) ** der_order) * np.array(
(2 / 1) ** der_order) * np.array(
pce.coefficients).T, axis=1)

der_order = 1
deriv_1_init = np.sum(
derivative_basis(bc_init_s, pce, derivative_order=der_order, leading_variable=1) * (
(2 / 1) ** der_order) * np.array(
(2 / 1) ** der_order) * np.array(
pce.coefficients).T, axis=1)

return deriv_0_pce + np.abs(deriv_0_init - bc_init_y[:, 0]) + deriv_1_init
Expand Down Expand Up @@ -147,8 +147,8 @@ def ref_sol(x):

# define geometry of physical space

geometry_xmin = np.array([0, 0])
geometry_xmax = np.array([1, 2])
geometry_xmin = [0, 0]
geometry_xmax = [1, 2]

# %% md
#
Expand All @@ -164,7 +164,7 @@ def ref_sol(x):
# derivation orders of prescribed BCs
der_orders = [0, 1, 0]
# normals associated to prescribed BCs
bc_normals = np.array([0, 1, 1])
bc_normals = [0, 1, 1]
# sampling of BC points
bc_x = bc_sampling(nbc)
bc_y = np.zeros(len(bc_x))
Expand All @@ -184,7 +184,8 @@ def ref_sol(x):
# Further we construct an object containing PDE physical data and PC :math:`^2` definitions of PDE


pde_pce = PdePCE(pde_data, pde_func, pde_source=pde_res, virtual_points_function=virt_sampling, boundary_condition_function=bc_sampling, boundary_conditions=bc_res)
pde_pce = PdePCE(pde_data, pde_func, pde_source=pde_res, boundary_conditions_evaluate=bc_res,
virtual_points_sampling=virt_sampling)

# %% md
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@ def estimate_error(self, pce: PolynomialChaosExpansion, standardized_sample: np.
err_data = (np.sum((pce.experimental_design_output - ypce) ** 2) / len(ypce))

err_pde = np.abs(
self.pde_pce.evaluate_pde(standardized_sample, pce, coefficients=pce.coefficients) - self.pde_pce.pderes_eval(
standardized_sample,
multindex=pce.multi_index_set,
coefficients=pce.coefficients))
self.pde_pce.evaluate_pde(standardized_sample, pce, coefficients=pce.coefficients) - self.pde_pce.evaluate_pde_source(
standardized_sample, multindex=pce.multi_index_set, coefficients=pce.coefficients))
err_pde = np.mean(err_pde ** 2)
err_bc = self.pde_pce.evaluate_boundary_conditions(len(standardized_sample), pce)
err_bc = np.mean(err_bc ** 2)
Expand Down Expand Up @@ -87,14 +85,14 @@ def lar(self,
logger = logging.getLogger(__name__)
pce = copy.deepcopy(self.initial_pce)

if self.pde_pce.virtual_functions is None:
if self.pde_pce.virtual_points_sampling is None:
virtual_samples = ortho_grid(n_error_points, pce.inputs_number, -1.0, 1.0)
else:
virtual_x = self.pde_pce.virtual_functions(n_error_points)
virtual_x = self.pde_pce.virtual_points_sampling(n_error_points)
virtual_samples = Polynomials.standardize_sample(virtual_x, pce.polynomial_basis.distributions)

if max_iterations is None:
max_iterations = self.pde_data.nconst + 200
max_iterations = self.pde_data.nconstraints + 200

lar_path = regresion.lars_path(self.basis_extended, self.y_extended, max_iter=max_iterations)[1]

Expand All @@ -113,7 +111,7 @@ def lar(self,
lar_error = []

if virtual_niters == True and min_basis_functions == 1:
min_basis_functions = self.pde_data.nconst + 1
min_basis_functions = self.pde_data.nconstraints + 1

if min_basis_functions > steps - 2 or no_iterations == True:
min_basis_functions = steps - 3
Expand Down Expand Up @@ -197,7 +195,7 @@ def ols(self, pce: PolynomialChaosExpansion = None,

y = pce.experimental_design_output

n_constraints = self.pde_data.nconst
n_constraints = self.pde_data.nconstraints
card_basis, nvar = multindex.shape

if nvirtual == -1:
Expand All @@ -218,17 +216,17 @@ def ols(self, pce: PolynomialChaosExpansion = None,
kkt = np.zeros((card_basis + n_constraints + nvirtual, card_basis + n_constraints + nvirtual))
right_vector = np.zeros((card_basis + n_constraints + nvirtual, 1))

if self.pde_pce.virtual_functions is None:
if self.pde_pce.virtual_points_sampling is None:
virtual_x = pce.polynomial_basis.distributions.rvs(nvirtual)
else:
virtual_x = self.pde_pce.virtual_functions(nvirtual)
virtual_x = self.pde_pce.virtual_points_sampling(nvirtual)

virtual_s = Polynomials.standardize_sample(virtual_x, pce.polynomial_basis.distributions)

self.virtual_s = virtual_s
self.virtual_x = virtual_x

b[-nvirtual:, 0] = self.pde_pce.pderes_eval(self.virtual_s)
b[-nvirtual:, 0] = self.pde_pce.evaluate_pde_source(self.virtual_s)

a_pde = self.pde_pce.evaluate_pde(self.virtual_s, pce)

Expand Down Expand Up @@ -282,10 +280,10 @@ def ols(self, pce: PolynomialChaosExpansion = None,

if not return_coefficients:
self.initial_pce.coefficients = a_opt_c
if self.pde_pce.virtual_functions is None:
if self.pde_pce.virtual_points_sampling is None:
standardized_sample = ortho_grid(n_error_points, pce.inputs_number, -1.0, 1.0)
else:
virtual_x = self.pde_pce.virtual_functions(n_error_points)
virtual_x = self.pde_pce.virtual_points_sampling(n_error_points)
standardized_sample = Polynomials.standardize_sample(virtual_x,
pce.polynomial_basis.distributions)
err = self.estimate_error(self.initial_pce, standardized_sample)
Expand Down
16 changes: 8 additions & 8 deletions src/UQpy/surrogates/polynomial_chaos/physics_informed/PdeData.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ class PdeData:
@beartype
def __init__(self, upper_bounds: list,
lower_bounds: list,
derivative_orders,
boundary_normals,
boundary_coordinates,
boundary_values):
derivative_orders: list,
boundary_normals: list,
boundary_coordinates: list,
boundary_values: list):
"""
Class containing information about PDE solved by Physics-informed PCE
Expand All @@ -27,7 +27,7 @@ def __init__(self, upper_bounds: list,
self.bc_x = boundary_coordinates
self.bc_y = boundary_values

self.nconst = 0
self.nconstraints = 0
self.dirichlet = None
self.extract_dirichlet()

Expand All @@ -50,7 +50,7 @@ def extract_dirichlet(self):
else:
nconst = nconst + len((self.bc_x[i]))

self.nconst = nconst
self.nconstraints = nconst

if len(coord) > 0:
coord = np.concatenate(coord)
Expand All @@ -60,11 +60,11 @@ def extract_dirichlet(self):
self.dirichlet = dirichletbc

@beartype
def get_boundary_samples(self, order:int):
def get_boundary_samples(self, order: int):
"""
Extract boundary conditions of defined order
:param order: order of extracted boundary conditions
:param order: derivative order of extracted boundary conditions
:return: extracted bc samples in form [coordinates, prescribed values]
"""
coord = []
Expand Down
64 changes: 29 additions & 35 deletions src/UQpy/surrogates/polynomial_chaos/physics_informed/PdePCE.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,59 @@
import numpy as np

from UQpy.surrogates.polynomial_chaos.physics_informed.PdeData import PdeData
from typing import Callable
from UQpy.surrogates.polynomial_chaos.PolynomialChaosExpansion import PolynomialChaosExpansion

class PdePCE:
def __init__(self, pde_data,
pde_functions,
pde_source=None,
boundary_conditions=None,
boundary_condition_function=None,
virtual_points_function=None,
nonlinear=False):
def __init__(self, pde_data: PdeData,
pde_basis: Callable,
pde_source: Callable = None,
boundary_conditions_evaluate: Callable = None,
boundary_conditions_sampling: Callable = None,
virtual_points_sampling: Callable = None,
nonlinear: bool = False):
"""
Class containing information about PDE needed for physics-informed PCE
:param pde_data: an object of the :code:`UQpy` :class:`.PdeData` class
:param pde_functions: pde defined in basis functions
:param pde_basis: pde defined in basis functions
:param pde_source: source term of pde
:param boundary_conditions: evaluation of boundary conditions for estimation of an error
:param boundary_condition_function: function for sampling of boundary conditions
:param virtual_points_function: function for sampling of virtual samples
:param boundary_conditions_evaluate: evaluation of boundary conditions for estimation of an error
:param boundary_conditions_sampling: function for sampling of boundary conditions
:param virtual_points_sampling: function for sampling of virtual samples
:param nonlinear: if True, prescribed pde is non-linear
"""

self.pde_data = pde_data
self.pde_function = pde_functions
self.pde_basis = pde_basis
self.pde_source = pde_source
self.boundary_condition_function = boundary_condition_function
self.boundary_conditions = boundary_conditions
self.virtual_functions = virtual_points_function
self.boundary_conditions_sampling = boundary_conditions_sampling
self.boundary_conditions_evaluate = boundary_conditions_evaluate
self.virtual_points_sampling = virtual_points_sampling
self.nonlinear = nonlinear

def evaluate_pde(self, s,
pce,
coefficients=None):
def evaluate_pde(self, standardized_sample: np.ndarray,
pce: PolynomialChaosExpansion,
coefficients: np.ndarray = None):

pde_basis = self.pde_function(s, pce)
pde_basis = self.pde_basis(standardized_sample, pce)

if coefficients is not None:
return np.sum(pde_basis * np.array(coefficients).T, axis=1)
else:
return pde_basis

def evaluate_boundary_conditions(self,
nsim,
pce):
return self.boundary_conditions(nsim, pce)
nsim: np.ndarray,
pce: PolynomialChaosExpansion):
return self.boundary_conditions_evaluate(nsim, pce)

def evaluate_pde_source(self, s, multindex=None, coefficients=None):
def evaluate_pde_source(self, standardized_sample: np.ndarray, multindex: np.ndarray = None,
coefficients: np.ndarray = None):
if self.pde_source is not None:
if self.nonlinear:
return self.pde_source(s, multindex, coefficients)
return self.pde_source(standardized_sample, multindex, coefficients)
else:
return self.pde_source(s)
return self.pde_source(standardized_sample)
else:
return 0

def evaluate_boundary_condition_function(self, s, multindex=None, coefficients=None):
if self.boundary_condition_function is not None:
if coefficients is not None:
bc_pce = self.boundary_condition_function(s, multindex, coefficients)
else:
bc_pce = self.boundary_condition_function(s)
return bc_pce
else:
return 0
8 changes: 4 additions & 4 deletions tests/unit_tests/surrogates/test_pce.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,16 +519,16 @@ def ref_sol(physical_coordinate, q):
joint = JointIndependent(marginals=marg)

# define geometry of physical space
geometry_xmin = np.array([0])
geometry_xmax = np.array([1])
geometry_xmin = [0]
geometry_xmax = [1]

# number of BC samples
nbc = 2 * 10

# derivation orders of prescribed BCs
der_orders = [0, 2]
# normals associated to precribed BCs
bc_normals = np.array([0, 0])
bc_normals = [0, 0]
# sampling of BC points

bc_xtotal = bc_sampling(20)
Expand All @@ -539,7 +539,7 @@ def ref_sol(physical_coordinate, q):

pde_data = PdeData(geometry_xmax, geometry_xmin, der_orders, bc_normals, bc_x, bc_y)

pde_pce = PdePCE(pde_data, pde_func, pde_source=pde_res, boundary_conditions=bc_res)
pde_pce = PdePCE(pde_data, pde_func, pde_source=pde_res, boundary_conditions_evaluate=bc_res)

dirichlet_bc = pde_data.dirichlet
x_train = dirichlet_bc[:, :-1]
Expand Down

0 comments on commit d35d561

Please sign in to comment.