Select Git revision
Troubled_Cell_Detector.py
-
Laura Christine Kühle authoredLaura Christine Kühle authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
DG_Approximation.py 6.99 KiB
# -*- coding: utf-8 -*-
"""
@author: Laura C. Kühle
"""
import json
import numpy as np
from sympy import Symbol
import math
from .Basis_Function import Basis
from .encoding_utils import encode_ndarray
from .Initial_Condition import InitialCondition
from .Mesh import Mesh
from .Quadrature import Quadrature
from .Troubled_Cell_Detector import TroubledCellDetector
from .Update_Scheme import UpdateScheme
x = Symbol('x')
class DGScheme:
"""Class for Discontinuous Galerkin Method.
Approximates linear advection equation using Discontinuous Galerkin Method
with troubled-cell-based limiting.
Methods
-------
approximate()
Approximates projection.
save_plots()
Saves plots generated during approximation process.
build_training_data(x_shift, stencil_length, init_cond=None)
Builds training data set.
"""
def __init__(self, detector: TroubledCellDetector,
quadrature: Quadrature, init_cond: InitialCondition,
update_scheme: UpdateScheme, mesh: Mesh, basis: Basis,
wave_speed, **kwargs):
"""Initializes DGScheme.
Parameters
----------
detector : TroubledCellDetector object
Troubled cell detector.
quadrature : Quadrature object
Quadrature for evaluation.
init_cond : InitialCondition object
Initial condition for evaluation.
update_scheme : UpdateScheme object
Update scheme for evaluation.
mesh : Mesh object
Mesh for calculation.
basis : Basis object
Basis for calculation.
wave_speed : float, optional
Speed of wave in rightward direction.
Other Parameters
----------------
cfl_number : float, optional
CFL number to ensure stability. Default: 0.2.
final_time : float, optional
Final time for which approximation is calculated. Default: 1.
verbose : bool, optional
Flag whether commentary in console is wanted. Default: False.
history_threshold : float, optional
Threshold when history will be recorded.
Default: math.ceil(0.2/cfl_number).
"""
self._detector = detector
self._quadrature = quadrature
self._init_cond = init_cond
self._update_scheme = update_scheme
self._mesh = mesh
self._basis = basis
self._wave_speed = wave_speed
# Unpack keyword arguments
self._cfl_number = kwargs.pop('cfl_number', 0.2)
self._final_time = kwargs.pop('final_time', 1)
self._verbose = kwargs.pop('verbose', False)
self._history_threshold = kwargs.pop('history_threshold',
math.ceil(0.2/self._cfl_number))
# Throw an error if there are extra keyword arguments
if len(kwargs) > 0:
extra = ', '.join('"%s"' % k for k in list(kwargs.keys()))
raise ValueError('Unrecognized arguments: %s' % extra)
def approximate(self, data_file):
"""Approximates projection.
Initializes projection and evolves it in time. Each time step consists
of three parts: A projection update, a troubled-cell detection,
and limiting based on the detected cells.
At final time, results are saved in JSON file.
Attributes
----------
data_file: str
Path to file in which data will be saved.
"""
projection = do_initial_projection(
init_cond=self._init_cond, mesh=self._mesh,
basis=self._basis, quadrature=self._quadrature)
time_step = abs(self._cfl_number * self._mesh.cell_len /
self._wave_speed)
current_time = 0
iteration = 0
troubled_cell_history = []
time_history = []
while current_time < self._final_time:
# Adjust for last cell
cfl_number = self._cfl_number
if current_time+time_step > self._final_time:
time_step = self._final_time-current_time
cfl_number = self._wave_speed * time_step / self._mesh.cell_len
# Update projection
projection, troubled_cells = self._update_scheme.step(projection,
cfl_number)
iteration += 1
if (iteration % self._history_threshold) == 0:
troubled_cell_history.append(troubled_cells)
time_history.append(current_time)
current_time += time_step
# Save detector-specific data in dictionary
approx_stats = self._detector.create_data_dict(projection)
# Save approximation results in dictionary
approx_stats['wave_speed'] = self._wave_speed
approx_stats['final_time'] = self._final_time
approx_stats['time_history'] = time_history
approx_stats['troubled_cell_history'] = troubled_cell_history
# Encode all ndarrays to fit JSON format
approx_stats = {key: encode_ndarray(approx_stats[key])
for key in approx_stats.keys()}
# Save approximation results in JSON format
with open(data_file + '.json', 'w') \
as json_file:
json_file.write(json.dumps(approx_stats))
def do_initial_projection(init_cond, mesh, basis, quadrature,
x_shift=0):
"""Calculates initial projection.
Calculates a projection at time step 0 and adds ghost cells on both
sides of the array.
Parameters
----------
init_cond : InitialCondition object
Initial condition used for calculation.
mesh : Mesh object
Mesh for calculation.
basis: Basis object
Basis used for calculation.
quadrature: Quadrature object
Quadrature used for evaluation.
x_shift: float, optional
Shift in x-direction for each evaluation point. Default: 0.
Returns
-------
ndarray
Matrix containing projection of size (N+2, p+1) with N being the
number of mesh cells and p being the polynomial degree of the basis.
"""
# Initialize output matrix
output_matrix = np.zeros((basis.polynomial_degree+1, mesh.num_cells+2))
# Calculate basis based on quadrature
basis_matrix = np.array([np.vectorize(basis.basis[degree].subs)(
x, quadrature.nodes) for degree in range(basis.polynomial_degree+1)])
# Calculate points based on initial condition
points = quadrature.nodes[:, np.newaxis] @ \
np.repeat(mesh.cell_len/2, mesh.num_cells)[:, np.newaxis].T + \
np.tile(mesh.non_ghost_cells - x_shift, (quadrature.num_nodes, 1))
init_matrix = np.vectorize(init_cond.calculate, otypes=[np.float])(
x=points, mesh=mesh)
# Set output matrix for regular cells
output_matrix[:, 1:-1] = (basis_matrix * quadrature.weights) @ init_matrix
# Set ghost cells
output_matrix[:, 0] = output_matrix[:, -2]
output_matrix[:, -1] = output_matrix[:, 1]
return output_matrix