# -*- coding: utf-8 -*- """ @author: Laura C. Kühle Discussion: TODO: Contemplate saving 5-CV split and evaluating models separately TODO: Contemplate separating cell average and reconstruction calculations completely TODO: Contemplate removing Methods section from class docstring Urgent: TODO: Investigate protected vs. public variables -> Done TODO: Investigate decorators and properties -> Done TODO: Investigate how to make variables read-only -> Done TODO: Replace getter with property attributes for basis -> Done TODO: Contain polynomial degree in basis TODO: Improve docstring for Basis class TODO: Enforce abstract classes/methods (abc.ABC, abc.abstractmethod) TODO: Introduce Mesh class (mesh, cell_len, num_grid_cells, bounds, num_ghost_cells, etc.) TODO: Check whether ghost cells are handled/set correctly TODO: Find error in centering for ANN training TODO: Remove use of DGScheme from ANN_Data_Generator TODO: Adapt TCD from Soraya (Dropbox->...->TEST_troubled-cell-detector->Troubled_Cell_Detector) TODO: Add verbose output TODO: Improve file naming (e.g. use '.' instead of '__') TODO: Combine ANN workflows TODO: Add an environment file for Snakemake Critical, but not urgent: TODO: Investigate g-mesh(?) TODO: Force input_size for each ANN model to be stencil length TODO: Use full path for ANN model state TODO: Extract object initialization from DGScheme TODO: Use cfl_number for updating, not just time Currently not critical: TODO: Unify use of 'length' and 'len' in naming TODO: Replace loops with list comprehension if feasible TODO: Check whether 'projection' is always a ndarray TODO: Check whether all instance variables are sensible TODO: Rename files according to standard TODO: Outsource scripts into separate directory TODO: Allow comparison between ANN training datasets TODO: Add a default model state TODO: Look into validators for variable checks Not feasible yet or doc-related: TODO: Adjust code to allow classes for all equations (Burger, linear advection, 1D Euler) TODO: Double-check everything! (also with pylint, pytype, pydoc, pycodestyle, pydocstyle) TODO: Check whether documentation style is correct TODO: Check whether all types in doc are correct TODO: Discuss adding kwargs to attributes in documentation TODO: Add type annotations to function heads TODO: Clean up docstrings """ import json import numpy as np from sympy import Symbol import math import seaborn as sns import Troubled_Cell_Detector import Initial_Condition import Limiter import Quadrature import Update_Scheme from Basis_Function import OrthonormalLegendre from encoding_utils import encode_ndarray x = Symbol('x') sns.set() class DGScheme: """Class for Discontinuous Galerkin Method. Approximates linear advection equation using Discontinuous Galerkin Method with troubled-cell-based limiting. Attributes ---------- interval_len : float Length of the interval between left and right boundary. cell_len : float Length of a cell in mesh. basis : Basis object Basis for calculation. mesh : ndarray List of mesh valuation points. inv_mass : ndarray Inverse mass matrix. Methods ------- approximate() Approximates projection. save_plots() Saves plots generated during approximation process. build_training_data(adjustment, stencil_length, initial_condition=None) Builds training data set. """ def __init__(self, detector, **kwargs): """Initializes DGScheme. Parameters ---------- detector : str Name of troubled cell detector class. Other Parameters ---------------- wave_speed : float, optional Speed of wave in rightward direction. Default: 1. polynomial_degree : int, optional Polynomial degree. Default: 2. cfl_number : float, optional CFL number to ensure stability. Default: 0.2. num_grid_cells : int, optional Number of cells in the mesh. Usually exponential of 2. Default: 64. final_time : float, optional Final time for which approximation is calculated. Default: 1. left_bound : float, optional Left boundary of interval. Default: -1. right_bound : float, optional Right boundary of interval. Default: 1. verbose : bool, optional Flag whether commentary in console is wanted. Default: False. history_threshold : float, optional Threshold when history will be recorded. Default: math.ceil(0.2/cfl_number). detector_config : dict, optional Additional parameters for detector object. Default: {}. init_cond : str, optional Name of initial condition for evaluation. Default: 'Sine' init_config : dict, optional Additional parameters for initial condition object. Default: {}. limiter : str, optional Name of limiter for evaluation. Default: 'ModifiedMinMod'. limiter_config : dict, optional Additional parameters for limiter. object. Default: {}: quadrature : str, optional Name of quadrature for evaluation. Default: 'Gauss'. quadrature_config : dict, optional Additional parameters for quadrature object. Default: {}. update_scheme : str, optional Name of update scheme for evaluation. Default: 'SSPRK3'. """ # Unpack keyword arguments self._wave_speed = kwargs.pop('wave_speed', 1) self._polynomial_degree = kwargs.pop('polynomial_degree', 2) self._cfl_number = kwargs.pop('cfl_number', 0.2) self._num_grid_cells = kwargs.pop('num_grid_cells', 64) self._final_time = kwargs.pop('final_time', 1) self._left_bound = kwargs.pop('left_bound', -1) self._right_bound = kwargs.pop('right_bound', 1) self._verbose = kwargs.pop('verbose', False) self._history_threshold = kwargs.pop('history_threshold', math.ceil(0.2/self._cfl_number)) self._detector = detector self._detector_config = kwargs.pop('detector_config', {}) self._init_cond = kwargs.pop('init_cond', 'Sine') self._init_config = kwargs.pop('init_config', {}) self._limiter = kwargs.pop('limiter', 'ModifiedMinMod') self._limiter_config = kwargs.pop('limiter_config', {}) self._quadrature = kwargs.pop('quadrature', 'Gauss') self._quadrature_config = kwargs.pop('quadrature_config', {}) self._update_scheme = kwargs.pop('update_scheme', 'SSPRK3') # Throw an error if there are extra keyword arguments if len(kwargs) > 0: extra = ', '.join('"%s"' % k for k in list(kwargs.keys())) raise ValueError('Unrecognized arguments: %s' % extra) # Make sure all classes actually exist if not hasattr(Troubled_Cell_Detector, self._detector): raise ValueError('Invalid detector: "%s"' % self._detector) if not hasattr(Initial_Condition, self._init_cond): raise ValueError('Invalid initial condition: "%s"' % self._init_cond) if not hasattr(Limiter, self._limiter): raise ValueError('Invalid limiter: "%s"' % self._limiter) if not hasattr(Quadrature, self._quadrature): raise ValueError('Invalid quadrature: "%s"' % self._quadrature) if not hasattr(Update_Scheme, self._update_scheme): raise ValueError('Invalid update scheme: "%s"' % self._update_scheme) self._reset() # Replace the string names with the actual class instances # (and add the instance variables for the quadrature) self._init_cond = getattr(Initial_Condition, self._init_cond)( left_bound=self._left_bound, right_bound=self._right_bound, config=self._init_config) self._limiter = getattr(Limiter, self._limiter)( config=self._limiter_config) self._quadrature = getattr(Quadrature, self._quadrature)( config=self._quadrature_config) self._detector = getattr(Troubled_Cell_Detector, self._detector)( config=self._detector_config, mesh=self._mesh, wave_speed=self._wave_speed, num_grid_cells=self._num_grid_cells, polynomial_degree=self._polynomial_degree, final_time=self._final_time, left_bound=self._left_bound, right_bound=self._right_bound, basis=self._basis, init_cond=self._init_cond, quadrature=self._quadrature) self._update_scheme = getattr(Update_Scheme, self._update_scheme)( polynomial_degree=self._polynomial_degree, num_grid_cells=self._num_grid_cells, detector=self._detector, limiter=self._limiter) def approximate(self, data_file): """Approximates projection. Initializes projection and evolves it in time. Each time step consists of three parts: A projection update, a troubled-cell detection, and limiting based on the detected cells. At final time, results are saved in JSON file. Attributes ---------- data_file: str Path to file in which data will be saved. """ projection = do_initial_projection( initial_condition=self._init_cond, basis=self._basis, quadrature=self._quadrature, num_grid_cells=self._num_grid_cells, left_bound=self._left_bound, right_bound=self._right_bound, polynomial_degree=self._polynomial_degree) time_step = abs(self._cfl_number * self._cell_len / self._wave_speed) current_time = 0 iteration = 0 troubled_cell_history = [] time_history = [] while current_time < self._final_time: # Adjust for last cell cfl_number = self._cfl_number if current_time+time_step > self._final_time: time_step = self._final_time-current_time cfl_number = self._wave_speed * time_step / self._cell_len # Update projection projection, troubled_cells = self._update_scheme.step(projection, cfl_number) iteration += 1 if (iteration % self._history_threshold) == 0: troubled_cell_history.append(troubled_cells) time_history.append(current_time) current_time += time_step # Save detector-specific data in dictionary approx_stats = self._detector.create_data_dict(projection) # Save approximation results in dictionary approx_stats['time_history'] = time_history approx_stats['troubled_cell_history'] = troubled_cell_history # Encode all ndarrays to fit JSON format approx_stats = {key: encode_ndarray(approx_stats[key]) for key in approx_stats.keys()} # Save approximation results in JSON format with open(data_file + '.json', 'w') \ as json_file: json_file.write(json.dumps(approx_stats)) def _reset(self): """Resets instance variables.""" # Set additional necessary instance variables self._interval_len = self._right_bound-self._left_bound self._cell_len = self._interval_len / self._num_grid_cells self._basis = OrthonormalLegendre(self._polynomial_degree) # Set additional necessary config parameters self._limiter_config['cell_len'] = self._cell_len # Set mesh with one ghost point on each side self._mesh = np.arange(self._left_bound - (3/2*self._cell_len), self._right_bound + (5/2*self._cell_len), self._cell_len) # +3/2 def build_training_data(self, adjustment, stencil_length, add_reconstructions, initial_condition=None): """Builds training data set. Initializes projection and calculates cell averages and reconstructions for it. Parameters ---------- adjustment : float Extent of adjustment of each evaluation point in x-direction. stencil_length : int Size of training data array. add_reconstructions: bool Flag whether reconstructions of the middle cell are included. initial_condition : InitialCondition object, optional Initial condition used for calculation. Default: None (i.e. instance variable). Returns ------- ndarray Matrix containing cell averages and reconstructions for initial projection. """ if initial_condition is None: initial_condition = self._init_cond projection = do_initial_projection( initial_condition=initial_condition, basis=self._basis, quadrature=self._quadrature, num_grid_cells=self._num_grid_cells, left_bound=self._left_bound, right_bound=self._right_bound, polynomial_degree=self._polynomial_degree, adjustment=adjustment) return self._basis.calculate_cell_average( projection=projection[:, 1:-1], stencil_length=stencil_length, add_reconstructions=add_reconstructions) def do_initial_projection(initial_condition, basis, quadrature, num_grid_cells, left_bound, right_bound, polynomial_degree, adjustment=0): """Calculates initial projection. Calculates a projection at time step 0 and adds ghost cells on both sides of the array. Parameters ---------- initial_condition : InitialCondition object Initial condition used for calculation. basis: Basis object Basis used for calculation. quadrature: Quadrature object Quadrature used for evaluation. num_grid_cells : int Number of cells in the mesh. Usually exponential of 2. left_bound : float Left boundary of interval. right_bound : float Right boundary of interval. polynomial_degree : int Polynomial degree. adjustment: float, optional Extent of adjustment of each evaluation point in x-direction. Default: 0. Returns ------- ndarray Matrix containing projection of size (N+2, p+1) with N being the number of grid cells and p being the polynomial degree. """ # Initialize matrix and set first entry to accommodate for ghost cell output_matrix = [0] basis_vector = basis.basis cell_len = (right_bound-left_bound)/num_grid_cells for cell in range(num_grid_cells): new_row = [] eval_point = left_bound + (cell+0.5)*cell_len for degree in range(polynomial_degree + 1): new_row.append(np.float64(sum(initial_condition.calculate( eval_point + cell_len/2 * quadrature.get_eval_points()[point] - adjustment) * basis_vector[degree].subs( x, quadrature.get_eval_points()[point]) * quadrature.get_weights()[point] for point in range(quadrature.get_num_points())))) new_row = np.array(new_row) output_matrix.append(basis.inverse_mass_matrix @ new_row) # Set ghost cells to respective value output_matrix[0] = output_matrix[num_grid_cells] output_matrix.append(output_matrix[1]) # print(np.array(output_matrix).shape) return np.transpose(np.array(output_matrix))