Skip to content
Snippets Groups Projects
Select Git revision
  • fa265fc077b5b1bee6661cb9499fd7f83b855be7
  • master default protected
2 results

DG_Approximation.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    DG_Approximation.py 8.25 KiB
    # -*- coding: utf-8 -*-
    """
    @author: Laura C. Kühle
    
    Plotter:
    TODO: Double-check everything!
    TODO: Replace loops with list comprehension if feasible
    TODO: Write documentation for all methods
    TODO: Contemplate how to make shock tubes comparable
    TODO: Fix bug in approximation -> Done (used num_grid_cells instead of cell_len for cfl_number in last step)
    
    """
    import numpy as np
    from sympy import Symbol
    import math
    import matplotlib.pyplot as plt
    
    import Troubled_Cell_Detector
    import Initial_Condition
    import Limiter
    import Quadrature
    import Update_Scheme
    from Basis_Function import OrthonormalLegendre
    
    x = Symbol('x')
    
    
    class DGScheme(object):
        """
        Do documentation here.
    
        Here come some parameter.
        """
    
        def __init__(self, detector, **kwargs):
            # Unpack keyword arguments
            self._wave_speed = kwargs.pop('wave_speed', 1)
            self._polynomial_degree = kwargs.pop('polynomial_degree', 2)
            self._cfl_number = kwargs.pop('cfl_number', 0.2)
            self._num_grid_cells = kwargs.pop('num_grid_cells', 64)
            self._final_time = kwargs.pop('final_time', 1)
            self._left_bound = kwargs.pop('left_bound', -1)
            self._right_bound = kwargs.pop('right_bound', 1)
            self._verbose = kwargs.pop('verbose', False)
            self._history_threshold = kwargs.pop('history_threshold', math.ceil(0.2/self._cfl_number))
            self._detector = detector
            self._detector_config = kwargs.pop('detector_config', {})
            self._init_cond = kwargs.pop('init_cond', 'Sine')
            self._init_config = kwargs.pop('init_config', {})
            self._limiter = kwargs.pop('limiter', 'ModifiedMinMod')
            self._limiter_config = kwargs.pop('limiter_config', {})
            self._quadrature = kwargs.pop('quadrature', 'Gauss')
            self._quadrature_config = kwargs.pop('quadrature_config', {})
            self._update_scheme = kwargs.pop('update_scheme', 'SSPRK3')
    
            # Throw an error if there are extra keyword arguments
            if len(kwargs) > 0:
                extra = ', '.join('"%s"' % k for k in list(kwargs.keys()))
                raise ValueError('Unrecognized arguments: %s' % extra)
    
            # Make sure all classes actually exist
            if not hasattr(Troubled_Cell_Detector, self._detector):
                raise ValueError('Invalid detector: "%s"' % self._detector)
            if not hasattr(Initial_Condition, self._init_cond):
                raise ValueError('Invalid initial condition: "%s"' % self._init_cond)
            if not hasattr(Limiter, self._limiter):
                raise ValueError('Invalid limiter: "%s"' % self._limiter)
            if not hasattr(Quadrature, self._quadrature):
                raise ValueError('Invalid quadrature: "%s"' % self._quadrature)
            if not hasattr(Update_Scheme, self._update_scheme):
                raise ValueError('Invalid update scheme: "%s"' % self._update_scheme)
    
            self._reset()
    
            # Replace the string names with the actual class instances
            # (and add the instance variables for the quadrature)
            self._init_cond = getattr(Initial_Condition, self._init_cond)(self._left_bound, self._right_bound,
                                                                          self._init_config)
            self._limiter = getattr(Limiter, self._limiter)(self._limiter_config)
            self._quadrature = getattr(Quadrature, self._quadrature)(self._quadrature_config)
            self._detector = getattr(Troubled_Cell_Detector, self._detector)(
                self._detector_config, self._mesh, self._wave_speed, self._polynomial_degree, self._num_grid_cells,
                self._final_time, self._left_bound, self._right_bound, self._basis, self._init_cond, self._quadrature)
            self._update_scheme = getattr(Update_Scheme, self._update_scheme)(self._polynomial_degree, self._num_grid_cells,
                                                                              self._detector, self._limiter)
    
        def approximate(self):
            """
            Do documentation here.
    
            Here come some parameter.
            """
    
            projection = self._do_initial_projection(self._init_cond)
    
            time_step = abs(self._cfl_number * self._cell_len / self._wave_speed)
    
            current_time = 0
            iteration = 0
            troubled_cell_history = []
            time_history = []
            while current_time < self._final_time:
                # Adjust for last cell
                cfl_number = self._cfl_number
                if current_time+time_step > self._final_time:
                    time_step = self._final_time-current_time
                    cfl_number = self._wave_speed * time_step / self._cell_len
    
                # Update projection
                projection, troubled_cells = self._update_scheme.step(projection, cfl_number)
    
                iteration += 1
    
                if (iteration % self._history_threshold) == 0:
                    troubled_cell_history.append(troubled_cells)
                    time_history.append(current_time)
    
                current_time += time_step
    
            # Plot exact/approximate results, errors, shock tubes and any detector-dependant plots
            self._detector.plot_results(projection, troubled_cell_history, time_history)
    
            if self._verbose:
                plt.show()
    
        def save_plots(self):
            name = self._init_cond.get_name() + '__' + self._detector.get_name() + '__' + self._limiter.get_name() \
                + '__' + self._update_scheme.get_name() + '__' + self._quadrature.get_name() + '__final_time_' \
                + str(self._final_time) + '__wave_speed_' + str(self._wave_speed) + '__number_of_cells_' \
                + str(self._num_grid_cells) + '__polynomial_degree_' + str(self._polynomial_degree)
    
            self._detector.save_plots(name)
    
        def _reset(self):
            # Set additional necessary instance variables
            self._interval_len = self._right_bound-self._left_bound
            self._cell_len = self._interval_len / self._num_grid_cells
            self._basis = OrthonormalLegendre(self._polynomial_degree)
    
            # Set additional necessary config parameters
            self._limiter_config['cell_len'] = self._cell_len
    
            # Set mesh with one ghost point on each side
            self._mesh = np.arange(self._left_bound - (3/2*self._cell_len), self._right_bound + (5/2*self._cell_len),
                                   self._cell_len)  # +3/2
    
            # Set inverse mass matrix
            mass_matrix = []
            for i in range(self._polynomial_degree+1):
                new_row = []
                for j in range(self._polynomial_degree+1):
                    new_entry = 0.0
                    if i == j:
                        new_entry = 1.0
                    new_row.append(new_entry)
                mass_matrix.append(new_row)
            self._inv_mass = np.array(mass_matrix)
    
        def _do_initial_projection(self, initial_condition, adjustment=0):
            # Initialize matrix and set first entry to accommodate for ghost cell
            output_matrix = [0]
            basis_vector = self._basis.get_basis_vector()
    
            for cell in range(self._num_grid_cells):
                new_row = []
                eval_point = self._left_bound + (cell+0.5)*self._cell_len
    
                for degree in range(self._polynomial_degree + 1):
                    new_entry = sum(
                        initial_condition.calculate(
                            eval_point + self._cell_len/2 * self._quadrature.get_eval_points()[point] - adjustment)
                        * basis_vector[degree].subs(x, self._quadrature.get_eval_points()[point])
                        * self._quadrature.get_weights()[point]
                        for point in range(self._quadrature.get_num_points()))
                    new_row.append(np.float64(new_entry))
    
                new_row = np.array(new_row)
                output_matrix.append(self._inv_mass @ new_row)
    
            # Set ghost cells to respective value
            output_matrix[0] = output_matrix[self._num_grid_cells]
            output_matrix.append(output_matrix[1])
    
            print(np.array(output_matrix).shape)
            return np.transpose(np.array(output_matrix))
    
        def build_training_data(self, adjustment, initial_condition=None):
            if initial_condition is None:
                initial_condition = self._init_cond
            projection = self._do_initial_projection(initial_condition, adjustment)
            return projection[:, 1:-1]
    
        def check_wavelet(self, projection, quadrature, polynomial_degree):
            projection = self._detector.calculate_approximate_solution(projection, quadrature, polynomial_degree)
            return projection