# -*- coding: utf-8 -*- """ @author: Laura C. Kühle Urgent: TODO: Move plotting into separate function TODO: Adapt TCD from Soraya (Dropbox->...->TEST_troubled-cell-detector->Troubled_Cell_Detector) TODO: Add verbose output TODO: Improve file naming (e.g. use '.' instead of '__') TODO: Force input_size for each ANN model to be stencil length TODO: Combine ANN workflows TODO: Unify use of 'length' and 'len' in naming TODO: Add an environment file for Snakemake TODO: Make all directories local variable -> Done Critical, but not urgent: TODO: Use cfl_number for updating, not just time TODO: Adjust code to allow classes for all equations (Burger, linear advection, 1D Euler) Currently not critical: TODO: Replace loops with list comprehension if feasible TODO: Check whether 'projection' is always a np.array() TODO: Check whether all instance variables are sensible TODO: Rename files according to standard TODO: Outsource scripts into separate directory TODO: Allow comparison between ANN training datasets TODO: Add a default model state Not feasible yet or doc-related: TODO: Double-check everything! (also with pylint, pytype, pydoc, pycodestyle) TODO: Check whether documentation style is correct TODO: Check whether all types in doc are correct TODO: Discuss adding kwargs to attributes in documentation TODO: Add type annotations to function heads """ import os import json import numpy as np from sympy import Symbol import math import matplotlib from matplotlib import pyplot as plt import Troubled_Cell_Detector import Initial_Condition import Limiter import Quadrature import Update_Scheme from Basis_Function import OrthonormalLegendre matplotlib.use('Agg') x = Symbol('x') def encode_ndarray(obj): if isinstance(obj, np.ndarray): return obj.tolist() return obj def decode_ndarray(obj): if isinstance(obj, list): return np.asarray(obj) return obj class DGScheme: """Class for Discontinuous Galerkin Method. Approximates linear advection equation using Discontinuous Galerkin Method with troubled-cell-based limiting. Attributes ---------- interval_len : float Length of the interval between left and right boundary. cell_len : float Length of a cell in mesh. basis : Basis object Basis for calculation. mesh : ndarray List of mesh valuation points. inv_mass : ndarray Inverse mass matrix. Methods ------- approximate() Approximates projection. save_plots() Saves plots generated during approximation process. build_training_data(adjustment, stencil_length, initial_condition=None) Builds training data set. """ def __init__(self, detector, **kwargs): """Initializes DGScheme. Parameters ---------- detector : str Name of troubled cell detector class. Other Parameters ---------------- wave_speed : float, optional Speed of wave in rightward direction. Default: 1. polynomial_degree : int, optional Polynomial degree. Default: 2. cfl_number : float, optional CFL number to ensure stability. Default: 0.2. num_grid_cells : int, optional Number of cells in the mesh. Usually exponential of 2. Default: 64. final_time : float, optional Final time for which approximation is calculated. Default: 1. left_bound : float, optional Left boundary of interval. Default: -1. right_bound : float, optional Right boundary of interval. Default: 1. verbose : bool, optional Flag whether commentary in console is wanted. Default: False. history_threshold : float, optional Threshold when history will be recorded. Default: math.ceil(0.2/cfl_number). detector_config : dict, optional Additional parameters for detector object. Default: {}. init_cond : str, optional Name of initial condition for evaluation. Default: 'Sine' init_config : dict, optional Additional parameters for initial condition object. Default: {}. limiter : str, optional Name of limiter for evaluation. Default: 'ModifiedMinMod'. limiter_config : dict, optional Additional parameters for limiter. object. Default: {}: quadrature : str, optional Name of quadrature for evaluation. Default: 'Gauss'. quadrature_config : dict, optional Additional parameters for quadrature object. Default: {}. update_scheme : str, optional Name of update scheme for evaluation. Default: 'SSPRK3'. """ # Unpack keyword arguments self._wave_speed = kwargs.pop('wave_speed', 1) self._polynomial_degree = kwargs.pop('polynomial_degree', 2) self._cfl_number = kwargs.pop('cfl_number', 0.2) self._num_grid_cells = kwargs.pop('num_grid_cells', 64) self._final_time = kwargs.pop('final_time', 1) self._left_bound = kwargs.pop('left_bound', -1) self._right_bound = kwargs.pop('right_bound', 1) self._verbose = kwargs.pop('verbose', False) self._history_threshold = kwargs.pop('history_threshold', math.ceil(0.2/self._cfl_number)) self._detector = detector self._detector_config = kwargs.pop('detector_config', {}) self._init_cond = kwargs.pop('init_cond', 'Sine') self._init_config = kwargs.pop('init_config', {}) self._limiter = kwargs.pop('limiter', 'ModifiedMinMod') self._limiter_config = kwargs.pop('limiter_config', {}) self._quadrature = kwargs.pop('quadrature', 'Gauss') self._quadrature_config = kwargs.pop('quadrature_config', {}) self._update_scheme = kwargs.pop('update_scheme', 'SSPRK3') # Throw an error if there are extra keyword arguments if len(kwargs) > 0: extra = ', '.join('"%s"' % k for k in list(kwargs.keys())) raise ValueError('Unrecognized arguments: %s' % extra) # Make sure all classes actually exist if not hasattr(Troubled_Cell_Detector, self._detector): raise ValueError('Invalid detector: "%s"' % self._detector) if not hasattr(Initial_Condition, self._init_cond): raise ValueError('Invalid initial condition: "%s"' % self._init_cond) if not hasattr(Limiter, self._limiter): raise ValueError('Invalid limiter: "%s"' % self._limiter) if not hasattr(Quadrature, self._quadrature): raise ValueError('Invalid quadrature: "%s"' % self._quadrature) if not hasattr(Update_Scheme, self._update_scheme): raise ValueError('Invalid update scheme: "%s"' % self._update_scheme) self._reset() # Replace the string names with the actual class instances # (and add the instance variables for the quadrature) self._init_cond = getattr(Initial_Condition, self._init_cond)( self._left_bound, self._right_bound, self._init_config) self._limiter = getattr(Limiter, self._limiter)(self._limiter_config) self._quadrature = getattr(Quadrature, self._quadrature)( self._quadrature_config) self._detector = getattr(Troubled_Cell_Detector, self._detector)( self._detector_config, self._mesh, self._wave_speed, self._polynomial_degree, self._num_grid_cells, self._final_time, self._left_bound, self._right_bound, self._basis, self._init_cond, self._quadrature) self._update_scheme = getattr(Update_Scheme, self._update_scheme)( self._polynomial_degree, self._num_grid_cells, self._detector, self._limiter) def approximate(self, data_dir, data_name): """Approximates projection. Initializes projection and evolves it in time. Each time step consists of three parts: A projection update, a troubled-cell detection, and limiting based on the detected cells. At final time, result and error plots are generated and, if verbose flag is set, also displayed. Attributes ---------- data_dir: str Path to directory in which data is saved. data_name : str Name of data. """ projection = self._do_initial_projection(self._init_cond) time_step = abs(self._cfl_number * self._cell_len / self._wave_speed) current_time = 0 iteration = 0 troubled_cell_history = [] time_history = [] while current_time < self._final_time: # Adjust for last cell cfl_number = self._cfl_number if current_time+time_step > self._final_time: time_step = self._final_time-current_time cfl_number = self._wave_speed * time_step / self._cell_len # Update projection projection, troubled_cells = self._update_scheme.step(projection, cfl_number) iteration += 1 if (iteration % self._history_threshold) == 0: troubled_cell_history.append(troubled_cells) time_history.append(current_time) current_time += time_step # Save approximation results in dictionary approx_stats = {'projection': projection, 'time_history': time_history, 'troubled_cell_history': troubled_cell_history} # Encode all ndarrays to fit JSON format approx_stats = {key: encode_ndarray(approx_stats[key]) for key in approx_stats.keys()} # Save approximation results in JSON format with open(data_dir + '/' + data_name + '.json', 'w') \ as json_file: json_file.write(json.dumps(approx_stats)) # Read approximation results with open(data_dir + '/' + data_name + '.json') as json_file: approx_stats = json.load(json_file) # Decode all ndarrays by converting lists approx_stats = {key: decode_ndarray(approx_stats[key]) for key in approx_stats.keys()} # Plot exact/approximate results, errors, shock tubes, # and any detector-dependant plots self._detector.plot_results(**approx_stats) def save_plots(self, plot_dir, plot_name): """Saves plotted results. Sets plot directory, if not already existing, and saves plots generated during the last approximation. Parameters ---------- plot_dir: str Path to directory in which plots are saved. plot_name : str Name of plot. """ # Set paths for plot files if not existing already if not os.path.exists(plot_dir): os.makedirs(plot_dir) # Save plots for identifier in plt.get_figlabels(): # Set path for figure directory if not existing already if not os.path.exists(plot_dir + '/' + identifier): os.makedirs(plot_dir + '/' + identifier) plt.figure(identifier) plt.savefig(plot_dir + '/' + identifier + '/' + plot_name + '.pdf') def _reset(self): """Resets instance variables.""" # Set additional necessary instance variables self._interval_len = self._right_bound-self._left_bound self._cell_len = self._interval_len / self._num_grid_cells self._basis = OrthonormalLegendre(self._polynomial_degree) # Set additional necessary config parameters self._limiter_config['cell_len'] = self._cell_len # Set mesh with one ghost point on each side self._mesh = np.arange(self._left_bound - (3/2*self._cell_len), self._right_bound + (5/2*self._cell_len), self._cell_len) # +3/2 # Set inverse mass matrix mass_matrix = [] for i in range(self._polynomial_degree+1): new_row = [] for j in range(self._polynomial_degree+1): new_entry = 0.0 if i == j: new_entry = 1.0 new_row.append(new_entry) mass_matrix.append(new_row) self._inv_mass = np.array(mass_matrix) def _do_initial_projection(self, initial_condition, adjustment=0): """Calculates initial projection. Calculates a projection at time step 0 and adds ghost cells on both sides of the array. Parameters ---------- initial_condition : InitialCondition object Initial condition used for calculation. May differ from instance variable. adjustment: float Extent of adjustment of each evaluation point in x-direction. Returns ------- ndarray Matrix containing projection of size (N+2, p+1) with N being the number of grid cells and p being the polynomial degree. """ # Initialize matrix and set first entry to accommodate for ghost cell output_matrix = [0] basis_vector = self._basis.get_basis_vector() for cell in range(self._num_grid_cells): new_row = [] eval_point = self._left_bound + (cell+0.5)*self._cell_len for degree in range(self._polynomial_degree + 1): new_entry = sum( initial_condition.calculate( eval_point + self._cell_len/2 * self._quadrature.get_eval_points()[point] - adjustment) * basis_vector[degree].subs( x, self._quadrature.get_eval_points()[point]) * self._quadrature.get_weights()[point] for point in range(self._quadrature.get_num_points())) new_row.append(np.float64(new_entry)) new_row = np.array(new_row) output_matrix.append(self._inv_mass @ new_row) # Set ghost cells to respective value output_matrix[0] = output_matrix[self._num_grid_cells] output_matrix.append(output_matrix[1]) # print(np.array(output_matrix).shape) return np.transpose(np.array(output_matrix)) def build_training_data(self, adjustment, stencil_length, add_reconstructions, initial_condition=None): """Builds training data set. Initializes projection and calculates cell averages and reconstructions for it. Parameters ---------- adjustment : float Extent of adjustment of each evaluation point in x-direction. stencil_length : int Size of training data array. add_reconstructions: bool Flag whether reconstructions of the middle cell are included. initial_condition : InitialCondition object, optional Initial condition used for calculation. Default: None (i.e. instance variable). Returns ------- ndarray Matrix containing cell averages and reconstructions for initial projection. """ if initial_condition is None: initial_condition = self._init_cond projection = self._do_initial_projection(initial_condition, adjustment) return self._detector.calculate_cell_average(projection[:, 1:-1], stencil_length, add_reconstructions)