Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
Equation.py 24.75 KiB
# -*- coding: utf-8 -*-
"""Module for equation class.
@author: Laura C. Kühle
"""
from typing import Tuple
from abc import ABC, abstractmethod
import numpy as np
from numpy import ndarray
from sympy import Symbol
from .Basis_Function import Basis
from .Boundary_Condition import enforce_boundary
from .Initial_Condition import InitialCondition
from .Mesh import Mesh
from .projection_utils import do_initial_projection
from .Quadrature import Quadrature
x = Symbol('x')
class Equation(ABC):
"""Abstract class for equation.
Methods
-------
initialize_approximation()
Initialize projection and time step for approximation.
update_time_step(current_time, time_step)
Update time step.
solve_exactly(mesh)
Calculate exact solution.
update_right_hand_side(projection)
Update right-hand side.
"""
def __init__(self, quadrature: Quadrature, init_cond: InitialCondition,
basis: Basis, mesh: Mesh, final_time: float,
wave_speed: float, cfl_number: float) -> None:
"""Initialize equation class.
Parameters
----------
quadrature : Quadrature object
Quadrature for evaluation.
init_cond : InitialCondition object
Initial condition for evaluation.
basis : Basis object
Basis for calculation.
mesh : Mesh object
Mesh for calculation.
final_time : float
Final time for which approximation is calculated.
wave_speed : float
Speed of wave in rightward direction.
cfl_number : float
CFL number to ensure stability.
Raises
------
ValueError
If number of ghost cells in mesh in not positive.
"""
self._quadrature = quadrature
self._init_cond = init_cond
self._basis = basis
self._mesh = mesh
self._final_time = final_time
self._wave_speed = wave_speed
self._cfl_number = cfl_number
if self._mesh.num_ghost_cells <= 0:
raise ValueError('Number of ghost cells for calculations has to '
'be positive.')
self._reset()
@property
def basis(self) -> Basis:
"""Return basis."""
return self._basis
@property
def mesh(self) -> Mesh:
"""Return basis."""
return self._mesh
@property
def quadrature(self) -> Quadrature:
"""Return basis."""
return self._quadrature
@property
def final_time(self) -> float:
"""Return final time."""
return self._final_time
@property
def wave_speed(self) -> float:
"""Return wave speed."""
return self._wave_speed
@property
def cfl_number(self) -> float:
"""Return CFL number."""
return self._cfl_number
def _reset(self) -> None:
"""Reset instance variables."""
pass
def initialize_approximation(self) -> Tuple[float, ndarray]:
"""Initialize projection and time step for approximation."""
return self._initialize_time_step(), self._initialize_projection()
def _initialize_time_step(self):
"""Initialize time step."""
return abs(self._cfl_number * self._mesh.cell_len / self._wave_speed)
@abstractmethod
@enforce_boundary()
def _initialize_projection(self) -> ndarray:
"""Initialize projection."""
pass
def create_data_dict(self):
"""Return dictionary with data necessary to construct equation."""
return {'basis': self._basis.create_data_dict(),
'mesh': self._mesh.create_data_dict(),
'final_time': self._final_time,
'wave_speed': self._wave_speed,
'cfl_number': self._cfl_number
}
@abstractmethod
def update_time_step(self, projection: ndarray, current_time: float,
time_step: float) -> Tuple[float, float]:
"""Update time step.
Parameters
----------
projection : ndarray
Current projection during approximation.
current_time : float
Current time during approximation.
time_step : float
Length of time step during approximation.
Returns
-------
cfl_number : float
Updated CFL number to ensure stability.
time_step : float
Updated time step for approximation.
"""
pass
@abstractmethod
def solve_exactly(self, mesh: Mesh) -> Tuple[ndarray, ndarray]:
"""Calculate exact solution.
Parameters
----------
mesh : Mesh
Mesh for evaluation.
Returns
-------
grid : ndarray
Array containing evaluation grid for a function.
exact : ndarray
Array containing exact evaluation of a function.
"""
pass
@abstractmethod
@enforce_boundary()
def update_right_hand_side(self, projection: ndarray) -> ndarray:
"""Update right-hand side.
Parameter
---------
projection : ndarray
Matrix of projection for each polynomial degree.
Returns
-------
ndarray
Matrix of right-hand side.
"""
pass
class LinearAdvection(Equation):
"""Class for linear advection equation.
Attributes
----------
volume_integral_matrix : ndarray
Volume integral matrix.
flux_matrix : ndarray
Flux matrix.
Methods
-------
update_right_hand_side(projection)
Update right-hand side.
Notes
-----
.. math:: u_t + u_x = 0
"""
def _reset(self) -> None:
"""Reset instance variables."""
matrix_shape = (self._basis.polynomial_degree+1,
self._basis.polynomial_degree+1)
root_vector = np.array([np.sqrt(degree+0.5) for degree in range(
self._basis.polynomial_degree+1)])
degree_matrix = np.matmul(root_vector[:, np.newaxis],
root_vector[:, np.newaxis].T)
# Set volume integral matrix
matrix = np.ones(matrix_shape)
if self._wave_speed > 0:
matrix[np.fromfunction(
lambda i, j: (j >= i) | ((i+j) % 2 == 0), matrix_shape)] = -1.0
else:
matrix[np.fromfunction(
lambda i, j: (j > i) & ((i+j) % 2 == 1), matrix_shape)] = -1.0
self._volume_integral_matrix = matrix * degree_matrix
# Set flux matrix
matrix = np.fromfunction(lambda i, j: (-1.0)**i, matrix_shape) \
if self._wave_speed > 0 \
else np.fromfunction(lambda i, j: (-1.0)**j, matrix_shape)
self._flux_matrix = matrix * degree_matrix
@enforce_boundary()
def _initialize_projection(self) -> ndarray:
"""Initialize projection."""
return do_initial_projection(init_cond=self._init_cond,
mesh=self._mesh, basis=self._basis,
quadrature=self._quadrature)
def update_time_step(self, projection: ndarray, current_time: float,
time_step: float) -> Tuple[float, float]:
"""Update time step.
Parameters
----------
projection : ndarray
Current projection during approximation.
current_time : float
Current time during approximation.
time_step : float
Length of time step during approximation.
Returns
-------
cfl_number : float
Updated CFL number to ensure stability.
time_step : float
Updated time step for approximation.
"""
cfl_number = self._cfl_number
# Adjust for final time-step
if current_time+time_step > self.final_time:
time_step = self.final_time-current_time
cfl_number = self.wave_speed * time_step / self._mesh.cell_len
return cfl_number, time_step
def solve_exactly(self, mesh: Mesh) -> Tuple[ndarray, ndarray]:
"""Calculate exact solution.
Parameters
----------
mesh : Mesh
Mesh for evaluation.
Returns
-------
grid : ndarray
Array containing evaluation grid for a function.
exact : ndarray
Array containing exact evaluation of a function.
"""
num_periods = np.floor(self.wave_speed * self.final_time /
mesh.interval_len)
grid = np.repeat(mesh.non_ghost_cells, self._quadrature.num_nodes) + \
mesh.cell_len / 2 * np.tile(self._quadrature.nodes, mesh.num_cells)
# Project points into correct periodic interval
points = np.array([point-self.wave_speed *
self.final_time+num_periods * mesh.interval_len
for point in grid])
left_bound, right_bound = mesh.bounds
while np.any(points < left_bound):
points[points < left_bound] += mesh.interval_len
while np.any(points) > right_bound:
points[points > right_bound] -= mesh.interval_len
exact = np.array([self._init_cond.calculate(mesh=mesh, x=point) for
point in points])
grid = np.reshape(grid, (1, grid.size))
exact = np.reshape(exact, (1, exact.size))
return grid, exact
@enforce_boundary()
def update_right_hand_side(self, projection: ndarray) -> ndarray:
"""Update right-hand side.
Parameter
---------
projection : ndarray
Matrix of projection for each polynomial degree.
Returns
-------
ndarray
Matrix of right-hand side.
"""
right_hand_side = np.zeros_like(projection)
if self._wave_speed > 0:
right_hand_side[:, self._mesh.num_ghost_cells:
-self._mesh.num_ghost_cells] = \
2 * (self._flux_matrix @
projection[:, self._mesh.num_ghost_cells-1:
-self._mesh.num_ghost_cells-1] +
self._volume_integral_matrix @
projection[:, self._mesh.num_ghost_cells:
-self._mesh.num_ghost_cells])
else:
right_hand_side[:, self._mesh.num_ghost_cells:
-self._mesh.num_ghost_cells] = \
2 * (self._flux_matrix @
projection[:, self._mesh.num_ghost_cells+1:] +
self._volume_integral_matrix @
projection[:, self._mesh.num_ghost_cells:
-self._mesh.num_ghost_cells])
return right_hand_side
class Burgers(Equation):
"""Class for Burgers' equation.
Attributes
----------
volume_integral_matrix : ndarray
Volume integral matrix.
flux_matrix : ndarray
Flux matrix.
Methods
-------
update_right_hand_side(projection)
Update right-hand side.
Notes
-----
.. math:: u_t + u*u_x = 0
"""
def _reset(self) -> None:
"""Reset instance variables."""
pass
# matrix_shape = (self._basis.polynomial_degree+1,
# self._basis.polynomial_degree+1)
# root_vector = np.array([np.sqrt(degree+0.5) for degree in range(
# self._basis.polynomial_degree+1)])
# degree_matrix = np.matmul(root_vector[:, np.newaxis],
# root_vector[:, np.newaxis].T)
#
# # Set volume integral matrix
# matrix = np.ones(matrix_shape)
# if self._wave_speed > 0:
# matrix[np.fromfunction(
# lambda i, j: (j >= i) | ((i+j) % 2 == 0), matrix_shape)] = -1.0
# else:
# matrix[np.fromfunction(
# lambda i, j: (j > i) & ((i+j) % 2 == 1), matrix_shape)] = -1.0
# self._volume_integral_matrix = matrix * degree_matrix
#
# # Set flux matrix
# matrix = np.fromfunction(lambda i, j: (-1.0)**i, matrix_shape) \
# if self._wave_speed > 0 \
# else np.fromfunction(lambda i, j: (-1.0)**j, matrix_shape)
# self._flux_matrix = matrix * degree_matrix
@enforce_boundary()
def _initialize_projection(self) -> ndarray:
"""Initialize projection."""
return do_initial_projection(init_cond=self._init_cond,
mesh=self._mesh, basis=self._basis,
quadrature=self._quadrature)
def update_time_step(self, projection: ndarray, current_time: float,
time_step: float) -> Tuple[float, float]:
"""Update time step.
Adapt CFL number to ensure right-hand side is multiplied with dt/dx.
Parameters
----------
projection : ndarray
Current projection during approximation.
current_time : float
Current time during approximation.
time_step : float
Length of time step during approximation.
Returns
-------
cfl_number : float
Updated CFL number to ensure stability.
time_step : float
Updated time step for approximation.
"""
cfl_number = self._cfl_number
max_velocity = max(abs(projection[0, :] * np.sqrt(0.5)))
time_step = cfl_number * self._mesh.cell_len / max_velocity
cfl_number = time_step / self._mesh.cell_len
# Adjust for final time-step
if current_time+time_step > self._final_time:
time_step = self._final_time-current_time
cfl_number = time_step / self._mesh.cell_len
return cfl_number, time_step
def solve_exactly(self, mesh: Mesh) -> Tuple[ndarray, ndarray]:
"""Calculate exact solution.
Parameters
----------
mesh : Mesh
Mesh for evaluation.
Returns
-------
grid : ndarray
Array containing evaluation grid for a function.
exact : ndarray
Array containing exact evaluation of a function.
"""
grid = np.repeat(mesh.non_ghost_cells, self._quadrature.num_nodes) + \
mesh.cell_len / 2 * np.tile(self._quadrature.nodes, mesh.num_cells)
#
# # Project points into correct periodic interval
# points = np.array([point-self.wave_speed *
# self.final_time+num_periods * mesh.interval_len
# for point in grid])
# left_bound, right_bound = mesh.bounds
# while np.any(points < left_bound):
# points[points < left_bound] += mesh.interval_len
# while np.any(points) > right_bound:
# points[points > right_bound] -= mesh.interval_len
#
# exact = np.array([self._init_cond.calculate(mesh=mesh, x=point) for
# point in points])
#
grid = np.reshape(grid, (1, grid.size))
# exact = np.reshape(exact, (1, exact.size))
#
# return grid, exact
exact = []
if self._init_cond.__name__ == 'Sine':
# u(x,t) = u(x - u0*time, 0)
exact = self._init_cond._height_adjustment + \
self._init_cond._stretch_factor * \
self.implicit_burgers_solver(grid, mesh)
elif self._init_cond.__name__ == 'DiscontinuousConstant':
# u(x,t) = u(x - u0*time, 0)
exact = self.rarefaction_wave(grid)
return grid, exact
def rarefaction_wave(self, grid_values):
uexact = 0 * grid_values
N = np.size(grid_values)
for i in range(N):
if grid_values[0, i] <= - self._final_time:
uexact[0, i] = -1
elif - self._final_time < grid_values[0, i] < self._final_time:
uexact[0, i] = grid_values[0, i] / self._final_time
else:
uexact[0, i] = 1
return uexact
def implicit_burgers_solver(self, grid_values, mesh):
"""
Parameters
----------
grid_values
mesh
Returns
-------
Notes
-----
Code adapted from DG code by Hesthaven.
"""
sspeed = self._init_cond._height_adjustment
uexact = np.zeros(np.size(grid_values))
# initialize values
xx = np.linspace(0, 1, 200)
uu = np.sin(self._init_cond._factor * np.pi * xx)
te = self._final_time * (2 / mesh.interval_len)
xt0 = (2 / mesh.interval_len) * (
grid_values-sspeed * self._final_time).reshape(
np.size(grid_values))
for ix in range(len(xt0)):
if xt0[ix] > 1:
xt0[ix] -= 2 * np.floor(0.5 * (xt0[ix]+1))
elif xt0[ix] < -1:
xt0[ix] += 2 * np.floor(0.5 * (1-xt0[ix]))
ay = abs(xt0[ix])
# Finding the closest characteristic
i0 = 0
for j in range(200):
xt = xx[j]+uu[j] * te
if ay < xt:
break
else:
i0 = j
# This is our initial guess
us = uu[i0]
un = us
for k in range(400):
us = un
x0 = ay-us * te
un = us-(us-np.sin(self._init_cond._factor * np.pi * x0)) / (
1+self._init_cond._factor * np.pi * np.cos(
self._init_cond._factor * np.pi * x0) * te)
if abs(un-us) < 1e-15:
break
y = np.sign(xt0[ix]) * un
if abs(ay-1) < 1e-15:
y = 0
if abs(self._final_time) < 1e-15:
y = np.sin(self._init_cond._factor * np.pi * xt0[ix])
uexact[ix] = y
burgers_exact = uexact.reshape((1, np.size(grid_values)))
return burgers_exact
@enforce_boundary()
def update_right_hand_side(self, projection: ndarray) -> ndarray:
"""Update right-hand side.
Parameter
---------
projection : ndarray
Matrix of projection for each polynomial degree.
Returns
-------
ndarray
Matrix of right-hand side.
"""
pass
volume_integral_matrix = self._burgers_volume_integral(projection)
boundary_matrix = self._burgers_boundary_matrix(projection)
# Initialize vector and set first entry to accommodate for ghost cell
right_hand_side = [0]
for j in range(self._mesh.num_cells):
right_hand_side.append(2*(volume_integral_matrix[:, j + 1] +
boundary_matrix[:, j + 1]))
# Set ghost cells to respective value
# (Periodic, Updated to Dirichlet in enforce_boundary_condition
# for DiscontinuousConstant problem)
# right_hand_side[0] = right_hand_side[self._mesh.num_cells]
# right_hand_side.append(right_hand_side[1])
return np.transpose(right_hand_side)
# right_hand_side = np.zeros_like(projection)
# if self._wave_speed > 0:
# right_hand_side[:, self._mesh.num_ghost_cells:
# -self._mesh.num_ghost_cells] = \
# 2 * (self._flux_matrix @
# projection[:, self._mesh.num_ghost_cells-1:
# -self._mesh.num_ghost_cells-1] +
# self._volume_integral_matrix @
# projection[:, self._mesh.num_ghost_cells:
# -self._mesh.num_ghost_cells])
# else:
# right_hand_side[:, self._mesh.num_ghost_cells:
# -self._mesh.num_ghost_cells] = \
# 2 * (self._flux_matrix @
# projection[:, self._mesh.num_ghost_cells+1:] +
# self._volume_integral_matrix @
# projection[:, self._mesh.num_ghost_cells:
# -self._mesh.num_ghost_cells])
#
# return right_hand_side
def _burgers_volume_integral(self, projection):
basis_vector = self._basis.basis
derivative_basis_vector = self._basis.derivative
# Initialize matrix and set first entry to accommodate for ghost cell
volume_integral_matrix = [0]
for cell in range(self._mesh.num_cells):
new_row = []
# Approximation u at Gauss-Legendre points
approx_eval = [sum(projection[degree][cell + 1]
* basis_vector[degree].subs(
x, self._quadrature.nodes[point])
for degree in range(
self._basis.polynomial_degree + 1))
for point in range(self._quadrature.nodes)]
approx_eval = np.array(approx_eval)
# print("u", approx_eval)
# Quadrature Evaluation
for degree in range(self._basis.polynomial_degree + 1):
weighted_derivatives = [
derivative_basis_vector[degree].subs(
x, self._quadrature.nodes[point])
* self._quadrature.weights[point]
for point in range(self._quadrature.num_nodes)]
new_entry = sum(list(approx_eval**2 * weighted_derivatives)) / 2
new_row.append(np.float64(new_entry))
new_row = np.array(new_row).T
volume_integral_matrix.append(new_row)
# Set ghost cells to respective value
# (Periodic, Updated to Dirichlet in enforce_boundary_condition
# for DiscontinuousConstant problem)
# volume_integral_matrix[0] = volume_integral_matrix[
# self._mesh.num_cells]
# volume_integral_matrix.append(volume_integral_matrix[1])
return np.transpose(np.array(volume_integral_matrix))
def _burgers_local_Lax_Friedrichs(self, projection):
# Calculating the left and right boundaries for each cell,
# u_{j-1/2}^+, u_{j+1/2}^-
boundary_left, boundary_right = self._calculate_boundary_points(
projection, self._basis.polynomial_degree + 1)
# print("shape BL", boundary_left.shape)
# Initializing Burgers local Lax-Friedrichs flux matrix
burgers_flux = [0]
for j in range(self._mesh.num_cells):
# approximations j+1/2^-, j+1/2^+ for interior cells
# and max velocity
approx_minus = boundary_right[:, j+1]
approx_plus = boundary_left[:, j + 2]
max_velocity = max(abs(approx_minus), abs(approx_plus))
# respective fluxes
flux_minus = approx_minus**2 / 2
flux_plus = approx_plus**2 / 2
# local Lax-Friedrichs flux
numerical_flux = 0.5 * (flux_minus + flux_plus - max_velocity *
(approx_plus - approx_minus))
burgers_flux.append(numerical_flux)
# Set Ghost cells to respective value
# (Periodic, Updated to Dirichlet in enforce_boundary_condition
# for DiscontinuousConstant problem)
# burgers_flux[0] = burgers_flux[self._mesh.num_cells]
# burgers_flux.append(burgers_flux[1])
return np.array(burgers_flux)
def _burgers_boundary_matrix(self, projection):
burgers_LF_flux = self._burgers_local_Lax_Friedrichs(projection)
# Initializing boundary matrix
boundary_matrix = [0]
for j in range(self._mesh.num_cells):
new_row = []
for degree in range(self._basis.polynomial_degree + 1):
new_row.append(np.sqrt(degree + 0.5) *
(-burgers_LF_flux[j + 1] + (-1)**degree *
burgers_LF_flux[j]))
boundary_matrix.append(new_row)
# Set Ghost cells to respective value
# (Periodic, Updated to Dirichlet in enforce_boundary_condition
# for DiscontinuousConstant problem)
# boundary_matrix[0] = boundary_matrix[self._mesh.num_cells]
# boundary_matrix.append(boundary_matrix[1])
boundary_matrix = np.transpose(np.array(boundary_matrix))
return boundary_matrix[0]
@staticmethod
def _calculate_boundary_points(projection, max_degree):
# Approximation at j-1/2 ^+
boundary_left = [sum(projection[degree][cell] * (-1) ** degree *
np.sqrt(degree + 0.5)
for degree in range(max_degree))
for cell in range(len(projection[0]))]
# Approximation at j+1/2 ^-
boundary_right = [sum(projection[degree][cell]* np.sqrt(degree + 0.5)
for degree in range(max_degree))
for cell in range(len(projection[0]))]
return np.reshape(np.array(boundary_left), (1, len(boundary_left))), \
np.reshape(np.array(boundary_right), (1, len(boundary_right)))