Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poynting flux Adjoint #2191

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 137 additions & 1 deletion python/adjoint/objective.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@

Grid = namedtuple("Grid", ["x", "y", "z", "w"])

#3 possible components for E x n and H x n
#signs are handled in code
EH_TRANSVERSE = [ [mp.Hy, mp.Hz, mp.Ey, mp.Ez],
[mp.Hz, mp.Hx, mp.Ez, mp.Ex],
[mp.Hx, mp.Hy, mp.Ex, mp.Ey] ]

#Holds the components for each current source
#for the cases of x,y, and z normal vectors.
#This is the same as swapping H and E in the above list
JK_TRANSVERSE = [ [mp.Ey, mp.Ez, mp.Hy, mp.Hz],
Alex-Kaylor marked this conversation as resolved.
Show resolved Hide resolved
[mp.Ez, mp.Ex, mp.Hz, mp.Hx],
[mp.Ex, mp.Ey, mp.Hx, mp.Hy] ]

class ObjectiveQuantity(abc.ABC):
"""A differentiable objective quantity.
Expand Down Expand Up @@ -316,7 +328,7 @@ def place_adjoint_source(self, dJ):
self.all_fouriersrcdata = self._monitor.swigobj.fourier_sourcedata(
self.volume.swigobj, self.component, self.sim.fields, dJ
)

for fourier_data in self.all_fouriersrcdata:
amp_arr = np.array(fourier_data.amp_arr).reshape(-1, self.num_freq)
scale = amp_arr * self._adj_src_scale(include_resolution=False)
Expand Down Expand Up @@ -483,3 +495,127 @@ def __call__(self):
self.ldos_scale = self.sim.ldos_scale
self.ldos_Jdata = self.sim.ldos_Jdata
return np.array(self._eval)

class PoyntingFlux(ObjectiveQuantity):
"""A frequency-dependent Poynting Flux adjoint source.
Attributes:
volume: The volume over which the Poynting Flux is calculated.
This function currently only works for monitors with a defined
normal vector (e.g. planes in 3d or lines in 2d). User supplied
normal vectors may be implemented in the future. It also only
works with monitors aligned to a coordinate direction.
decimation_factor: Whether to skip points in the time series every
decimation_factor timesteps. See "add_dft_fields" documentation.
The biggest warning there is to be careful to avoid aliasing if
the fields vary quickly in time.
Note on yee_grid: For the Poynting Flux to work, H and E components
must lie at the same points. Therefore, the Yee grid will always be false.
"""

def __init__(self, sim,volume, decimation_factor=0):
Alex-Kaylor marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(sim)
#_fit_volume_to_simulation increases the dimensionality of
#the volume, so we'll use the user input volume
self.volume = sim._fit_volume_to_simulation(volume)
self.decimation_factor = decimation_factor
#get_normal returns an index for the two
# dictionaries of cross products
self.normal = self.get_normal(volume)



def register_monitors(self, frequencies):
self._frequencies = np.asarray(frequencies)

#Add the dft monitors for the interesting components
self._monitor =self.sim.add_dft_fields(EH_TRANSVERSE[self.normal],
frequencies,
where = self.volume,
yee_grid = False)

return self._monitor


def place_adjoint_source(self, dJ):
dJ = np.atleast_1d(dJ)
if dJ.ndim == 2:
dJ = np.sum(dJ, axis=1)
time_src = self._create_time_profile()
scale = self._adj_src_scale()
if self._frequencies.size == 1:
amp = dJ * scale
src = time_src
else:
scale = dJ * scale
src = FilteredSource(
time_src.frequency,
self._frequencies,
scale,
self.sim.fields.dt,
)
amp = 1
source = []
#TODO: account for sign in the current sources
#(the K sources should have the negative)
for field_t,field in enumerate(JK_TRANSVERSE[self.normal]):
#dft_fields returns a scalar value for components that aren't evaluated
#in these cases, we don't need to add an adjoint source
if(self.field_component_evaluations[field_t].size != 1):
tuple_elements = np.zeros((1,self.metadata[3].size,1), dtype=np.complex128)
#TODO: Load up the source data for other normal vectors
#TODO: Remove this unnecessary for loop
for index,element in enumerate(self.field_component_evaluations[field_t]):
tuple_elements[0,index,0] = element
source.append( mp.Source(
src,
component = field,
amplitude=amp,
size=self.volume.size,
center=self.volume.center,
amp_data = tuple_elements
Alex-Kaylor marked this conversation as resolved.
Show resolved Hide resolved
))

return source

#returns 0,1, or 2 corresponding to x,y, or z normal vectors
#TODO: Handle user-specified normal vectors and cases when 2d
#is projected into a dimension other than z
def get_normal(self,volume):
#I'll add cylindrical later (since the normal vector gets a little different)
if (volume.dims == 2):
if (volume.size.x == 0):
return 0
elif(volume.size.y == 0):
return 1
else:
return 2
elif (volume.dims ==1):
if (volume.size.x == 0):
return 0
else:
return 1

else :
return "Supported volumes are 1d or 2d"

#Returns an array containing the Poynting Flux at each frequency point
#Note the first two FourierFields objectives are H fields, the second
#two are E fields.
#This assumes all the normal vectors point in the positive x,y,or z
#direction. The user may need to multiply by -1 to get the direction
#they expect if they're doing, for example, a box.
#TODO: Add an extra parameter to let the user negate the flux, similar
#to the default meep ponynting flux
def __call__(self):
self.field_component_evaluations = []
#Loop over the relevant field components for this case of normal vector
for field in EH_TRANSVERSE[self.normal]:
field_here = self.sim.get_dft_array(self.monitor_list,field,0)
self.field_component_evaluations.append(field_here)
#Get weights for integration from cubature rule
self.metadata = self.sim.get_array_metadata(dft_cell = self.monitor_list)
flux_density =np.real(-(self.field_component_evaluations[0]*np.conj(self.field_component_evaluations[3])) + (np.conj(self.field_component_evaluations[2])*self.field_component_evaluations[1]))
#Apply cubature weights
self._eval = np.sum(self.metadata[3]*flux_density)
return self._eval

41 changes: 40 additions & 1 deletion python/tests/test_adjoint_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from autograd import tensor_jacobian_product
from utils import ApproxComparisonTestCase

MonitorObject = Enum("MonitorObject", "EIGENMODE DFT LDOS")
MonitorObject = Enum("MonitorObject", "EIGENMODE DFT LDOS POYNTING")


class TestAdjointSolver(ApproxComparisonTestCase):
Expand Down Expand Up @@ -169,6 +169,17 @@ def J(mode_mon):

def J(mode_mon):
return npa.array(mode_mon)

elif mon_type.name == "POYNTING":
obj_list = [
mpa.PoyntingFlux(
sim,
mp.Volume(center=mp.Vector3(1.25), size=mp.Vector3(0, 1, 0))
)
]

def J(mode_mon):
return mode_mon

opt = mpa.OptimizationProblem(
simulation=sim,
Expand Down Expand Up @@ -358,6 +369,34 @@ def test_DFT_fields(self):

tol = 0.07 if mp.is_single_precision() else 0.006
self.assertClose(adj_dd, fnd_dd, epsilon=tol)

def test_Poynting_Flux(self):
print("*** TESTING POYNTING OBJECTIVE ***")

# test the single frequency and multi frequency cases
for frequencies in [[self.fcen], [1 / 1.58, self.fcen, 1 / 1.53]]:
# compute objective value and its gradient for unperturbed design
unperturbed_val, unperturbed_grad = self.adjoint_solver(
self.p, MonitorObject.POYNTING, frequencies
)

# compute objective value for perturbed design
perturbed_val = self.adjoint_solver(
self.p + self.dp, MonitorObject.POYNTING, frequencies, need_gradient=False
)

# compare directional derivative
if unperturbed_grad.ndim < 2:
unperturbed_grad = np.expand_dims(unperturbed_grad, axis=1)
adj_dd = (self.dp[None, :] @ unperturbed_grad).flatten()
fnd_dd = perturbed_val - unperturbed_val
print(
f"directional derivative:, {adj_dd} (adjoint solver), {fnd_dd} (finite difference)"
)

tol = 0.07 if mp.is_single_precision() else 0.006
self.assertClose(adj_dd, fnd_dd, epsilon=tol)


def test_eigenmode(self):
print("*** TESTING EIGENMODE OBJECTIVE ***")
Expand Down