Select Git revision
EnumerableValue.java
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
DG_Approximation.py 15.26 KiB
# -*- coding: utf-8 -*-
"""
@author: Laura C. Kühle
Discussion:
TODO: Contemplate saving 5-CV split and evaluating models separately
TODO: Contemplate separating cell average and reconstruction calculations
completely
TODO: Contemplate removing Methods section from class docstring
TODO: Contemplate moving stiffness and boundary matrix from update scheme
to basis
Urgent:
TODO: Investigate protected vs. public variables -> Done
TODO: Investigate decorators and properties -> Done
TODO: Investigate how to make variables read-only -> Done
TODO: Replace getter with property attributes for basis -> Done
TODO: Contain polynomial degree in basis -> Done
TODO: Improve docstring for Basis class
TODO: Enforce abstract classes/methods (abc.ABC, abc.abstractmethod)
TODO: Introduce Mesh class
(mesh, cell_len, num_grid_cells, bounds, num_ghost_cells, etc.)
TODO: Check whether ghost cells are handled/set correctly
TODO: Find error in centering for ANN training
TODO: Remove use of DGScheme from ANN_Data_Generator
TODO: Adapt TCD from Soraya
(Dropbox->...->TEST_troubled-cell-detector->Troubled_Cell_Detector)
TODO: Add verbose output
TODO: Improve file naming (e.g. use '.' instead of '__')
TODO: Combine ANN workflows
TODO: Add an environment file for Snakemake
Critical, but not urgent:
TODO: Investigate g-mesh(?)
TODO: Force input_size for each ANN model to be stencil length
TODO: Use full path for ANN model state
TODO: Extract object initialization from DGScheme
TODO: Use cfl_number for updating, not just time
Currently not critical:
TODO: Unify use of 'length' and 'len' in naming
TODO: Replace loops with list comprehension if feasible
TODO: Check whether 'projection' is always a ndarray
TODO: Check whether all instance variables are sensible
TODO: Rename files according to standard
TODO: Outsource scripts into separate directory
TODO: Allow comparison between ANN training datasets
TODO: Add a default model state
TODO: Look into validators for variable checks
Not feasible yet or doc-related:
TODO: Adjust code to allow classes for all equations
(Burger, linear advection, 1D Euler)
TODO: Double-check everything! (also with pylint, pytype, pydoc,
pycodestyle, pydocstyle)
TODO: Check whether documentation style is correct
TODO: Check whether all types in doc are correct
TODO: Discuss adding kwargs to attributes in documentation
TODO: Add type annotations to function heads
TODO: Clean up docstrings
"""
import json
import numpy as np
from sympy import Symbol
import math
import seaborn as sns
import Troubled_Cell_Detector
import Initial_Condition
import Limiter
import Quadrature
import Update_Scheme
from Basis_Function import OrthonormalLegendre
from encoding_utils import encode_ndarray
x = Symbol('x')
sns.set()
class DGScheme:
"""Class for Discontinuous Galerkin Method.
Approximates linear advection equation using Discontinuous Galerkin Method
with troubled-cell-based limiting.
Attributes
----------
interval_len : float
Length of the interval between left and right boundary.
cell_len : float
Length of a cell in mesh.
basis : Basis object
Basis for calculation.
mesh : ndarray
List of mesh valuation points.
inv_mass : ndarray
Inverse mass matrix.
Methods
-------
approximate()
Approximates projection.
save_plots()
Saves plots generated during approximation process.
build_training_data(adjustment, stencil_length, initial_condition=None)
Builds training data set.
"""
def __init__(self, detector, **kwargs):
"""Initializes DGScheme.
Parameters
----------
detector : str
Name of troubled cell detector class.
Other Parameters
----------------
wave_speed : float, optional
Speed of wave in rightward direction. Default: 1.
polynomial_degree : int, optional
Polynomial degree. Default: 2.
cfl_number : float, optional
CFL number to ensure stability. Default: 0.2.
num_grid_cells : int, optional
Number of cells in the mesh. Usually exponential of 2. Default: 64.
final_time : float, optional
Final time for which approximation is calculated. Default: 1.
left_bound : float, optional
Left boundary of interval. Default: -1.
right_bound : float, optional
Right boundary of interval. Default: 1.
verbose : bool, optional
Flag whether commentary in console is wanted. Default: False.
history_threshold : float, optional
Threshold when history will be recorded.
Default: math.ceil(0.2/cfl_number).
detector_config : dict, optional
Additional parameters for detector object. Default: {}.
init_cond : str, optional
Name of initial condition for evaluation. Default: 'Sine'
init_config : dict, optional
Additional parameters for initial condition object. Default: {}.
limiter : str, optional
Name of limiter for evaluation. Default: 'ModifiedMinMod'.
limiter_config : dict, optional
Additional parameters for limiter. object. Default: {}:
quadrature : str, optional
Name of quadrature for evaluation. Default: 'Gauss'.
quadrature_config : dict, optional
Additional parameters for quadrature object. Default: {}.
update_scheme : str, optional
Name of update scheme for evaluation. Default: 'SSPRK3'.
"""
# Unpack keyword arguments
self._wave_speed = kwargs.pop('wave_speed', 1)
self._cfl_number = kwargs.pop('cfl_number', 0.2)
self._num_grid_cells = kwargs.pop('num_grid_cells', 64)
self._final_time = kwargs.pop('final_time', 1)
self._left_bound = kwargs.pop('left_bound', -1)
self._right_bound = kwargs.pop('right_bound', 1)
self._verbose = kwargs.pop('verbose', False)
self._history_threshold = kwargs.pop('history_threshold',
math.ceil(0.2/self._cfl_number))
self._detector = detector
self._detector_config = kwargs.pop('detector_config', {})
self._init_cond = kwargs.pop('init_cond', 'Sine')
self._init_config = kwargs.pop('init_config', {})
self._limiter = kwargs.pop('limiter', 'ModifiedMinMod')
self._limiter_config = kwargs.pop('limiter_config', {})
self._quadrature = kwargs.pop('quadrature', 'Gauss')
self._quadrature_config = kwargs.pop('quadrature_config', {})
self._update_scheme = kwargs.pop('update_scheme', 'SSPRK3')
polynomial_degree = kwargs.pop('polynomial_degree', 2)
self._basis = OrthonormalLegendre(polynomial_degree)
# Throw an error if there are extra keyword arguments
if len(kwargs) > 0:
extra = ', '.join('"%s"' % k for k in list(kwargs.keys()))
raise ValueError('Unrecognized arguments: %s' % extra)
# Make sure all classes actually exist
if not hasattr(Troubled_Cell_Detector, self._detector):
raise ValueError('Invalid detector: "%s"' % self._detector)
if not hasattr(Initial_Condition, self._init_cond):
raise ValueError('Invalid initial condition: "%s"'
% self._init_cond)
if not hasattr(Limiter, self._limiter):
raise ValueError('Invalid limiter: "%s"' % self._limiter)
if not hasattr(Quadrature, self._quadrature):
raise ValueError('Invalid quadrature: "%s"' % self._quadrature)
if not hasattr(Update_Scheme, self._update_scheme):
raise ValueError('Invalid update scheme: "%s"'
% self._update_scheme)
self._reset()
# Replace the string names with the actual class instances
# (and add the instance variables for the quadrature)
self._init_cond = getattr(Initial_Condition, self._init_cond)(
left_bound=self._left_bound, right_bound=self._right_bound,
config=self._init_config)
self._limiter = getattr(Limiter, self._limiter)(
config=self._limiter_config)
self._quadrature = getattr(Quadrature, self._quadrature)(
config=self._quadrature_config)
self._detector = getattr(Troubled_Cell_Detector, self._detector)(
config=self._detector_config, mesh=self._mesh,
wave_speed=self._wave_speed, num_grid_cells=self._num_grid_cells,
final_time=self._final_time, left_bound=self._left_bound,
right_bound=self._right_bound, basis=self._basis,
init_cond=self._init_cond, quadrature=self._quadrature)
self._update_scheme = getattr(Update_Scheme, self._update_scheme)(
polynomial_degree=polynomial_degree,
num_grid_cells=self._num_grid_cells, detector=self._detector,
limiter=self._limiter)
def approximate(self, data_file):
"""Approximates projection.
Initializes projection and evolves it in time. Each time step consists
of three parts: A projection update, a troubled-cell detection,
and limiting based on the detected cells.
At final time, results are saved in JSON file.
Attributes
----------
data_file: str
Path to file in which data will be saved.
"""
projection = do_initial_projection(
initial_condition=self._init_cond, basis=self._basis,
quadrature=self._quadrature, num_grid_cells=self._num_grid_cells,
left_bound=self._left_bound, right_bound=self._right_bound)
time_step = abs(self._cfl_number * self._cell_len / self._wave_speed)
current_time = 0
iteration = 0
troubled_cell_history = []
time_history = []
while current_time < self._final_time:
# Adjust for last cell
cfl_number = self._cfl_number
if current_time+time_step > self._final_time:
time_step = self._final_time-current_time
cfl_number = self._wave_speed * time_step / self._cell_len
# Update projection
projection, troubled_cells = self._update_scheme.step(projection,
cfl_number)
iteration += 1
if (iteration % self._history_threshold) == 0:
troubled_cell_history.append(troubled_cells)
time_history.append(current_time)
current_time += time_step
# Save detector-specific data in dictionary
approx_stats = self._detector.create_data_dict(projection)
# Save approximation results in dictionary
approx_stats['time_history'] = time_history
approx_stats['troubled_cell_history'] = troubled_cell_history
# Encode all ndarrays to fit JSON format
approx_stats = {key: encode_ndarray(approx_stats[key])
for key in approx_stats.keys()}
# Save approximation results in JSON format
with open(data_file + '.json', 'w') \
as json_file:
json_file.write(json.dumps(approx_stats))
def _reset(self):
"""Resets instance variables."""
# Set additional necessary instance variables
self._interval_len = self._right_bound-self._left_bound
self._cell_len = self._interval_len / self._num_grid_cells
# Set additional necessary config parameters
self._limiter_config['cell_len'] = self._cell_len
# Set mesh with one ghost point on each side
self._mesh = np.arange(self._left_bound - (3/2*self._cell_len),
self._right_bound + (5/2*self._cell_len),
self._cell_len) # +3/2
def build_training_data(self, adjustment, stencil_length,
add_reconstructions, initial_condition=None):
"""Builds training data set.
Initializes projection and calculates cell averages and
reconstructions for it.
Parameters
----------
adjustment : float
Extent of adjustment of each evaluation point in x-direction.
stencil_length : int
Size of training data array.
add_reconstructions: bool
Flag whether reconstructions of the middle cell are included.
initial_condition : InitialCondition object, optional
Initial condition used for calculation.
Default: None (i.e. instance variable).
Returns
-------
ndarray
Matrix containing cell averages and reconstructions for initial
projection.
"""
if initial_condition is None:
initial_condition = self._init_cond
projection = do_initial_projection(
initial_condition=initial_condition, basis=self._basis,
quadrature=self._quadrature, num_grid_cells=self._num_grid_cells,
left_bound=self._left_bound, right_bound=self._right_bound,
adjustment=adjustment)
return self._basis.calculate_cell_average(
projection=projection[:, 1:-1], stencil_length=stencil_length,
add_reconstructions=add_reconstructions)
def do_initial_projection(initial_condition, basis, quadrature,
num_grid_cells, left_bound, right_bound,
adjustment=0):
"""Calculates initial projection.
Calculates a projection at time step 0 and adds ghost cells on both
sides of the array.
Parameters
----------
initial_condition : InitialCondition object
Initial condition used for calculation.
basis: Basis object
Basis used for calculation.
quadrature: Quadrature object
Quadrature used for evaluation.
num_grid_cells : int
Number of cells in the mesh. Usually exponential of 2.
left_bound : float
Left boundary of interval.
right_bound : float
Right boundary of interval.
adjustment: float, optional
Extent of adjustment of each evaluation point in x-direction.
Default: 0.
Returns
-------
ndarray
Matrix containing projection of size (N+2, p+1) with N being the
number of grid cells and p being the polynomial degree of the basis.
"""
# Initialize matrix and set first entry to accommodate for ghost cell
output_matrix = [0]
basis_vector = basis.basis
cell_len = (right_bound-left_bound)/num_grid_cells
for cell in range(num_grid_cells):
new_row = []
eval_point = left_bound + (cell+0.5)*cell_len
for degree in range(basis.polynomial_degree + 1):
new_row.append(np.float64(sum(initial_condition.calculate(
eval_point + cell_len/2
* quadrature.get_eval_points()[point] - adjustment)
* basis_vector[degree].subs(
x, quadrature.get_eval_points()[point])
* quadrature.get_weights()[point]
for point in range(quadrature.get_num_points()))))
new_row = np.array(new_row)
output_matrix.append(basis.inverse_mass_matrix @ new_row)
# Set ghost cells to respective value
output_matrix[0] = output_matrix[num_grid_cells]
output_matrix.append(output_matrix[1])
# print(np.array(output_matrix).shape)
return np.transpose(np.array(output_matrix))