Skip to content
Snippets Groups Projects
Select Git revision
  • 6bc2407ffbd25cfa25ad6f81eeea856a7492f4e1
  • develop default
  • release protected
  • v0.x
  • v2.2.0
  • v2.1.0
6 results

server.ts

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    Equation.py 24.75 KiB
    # -*- coding: utf-8 -*-
    """Module for equation class.
    
    @author: Laura C. Kühle
    """
    from typing import Tuple
    from abc import ABC, abstractmethod
    import numpy as np
    from numpy import ndarray
    from sympy import Symbol
    
    from .Basis_Function import Basis
    from .Boundary_Condition import enforce_boundary
    from .Initial_Condition import InitialCondition
    from .Mesh import Mesh
    from .projection_utils import do_initial_projection
    from .Quadrature import Quadrature
    
    
    x = Symbol('x')
    
    
    class Equation(ABC):
        """Abstract class for equation.
    
        Methods
        -------
        initialize_approximation()
            Initialize projection and time step for approximation.
        update_time_step(current_time, time_step)
            Update time step.
        solve_exactly(mesh)
            Calculate exact solution.
        update_right_hand_side(projection)
            Update right-hand side.
        """
    
        def __init__(self, quadrature: Quadrature, init_cond: InitialCondition,
                     basis: Basis, mesh: Mesh, final_time: float,
                     wave_speed: float, cfl_number: float) -> None:
            """Initialize equation class.
    
            Parameters
            ----------
            quadrature : Quadrature object
                Quadrature for evaluation.
            init_cond : InitialCondition object
                Initial condition for evaluation.
            basis : Basis object
                Basis for calculation.
            mesh : Mesh object
                Mesh for calculation.
            final_time : float
                Final time for which approximation is calculated.
            wave_speed : float
                Speed of wave in rightward direction.
            cfl_number : float
                CFL number to ensure stability.
    
            Raises
            ------
            ValueError
                If number of ghost cells in mesh in not positive.
    
            """
            self._quadrature = quadrature
            self._init_cond = init_cond
            self._basis = basis
            self._mesh = mesh
            self._final_time = final_time
            self._wave_speed = wave_speed
            self._cfl_number = cfl_number
    
            if self._mesh.num_ghost_cells <= 0:
                raise ValueError('Number of ghost cells for calculations has to '
                                 'be positive.')
    
            self._reset()
    
        @property
        def basis(self) -> Basis:
            """Return basis."""
            return self._basis
    
        @property
        def mesh(self) -> Mesh:
            """Return basis."""
            return self._mesh
    
        @property
        def quadrature(self) -> Quadrature:
            """Return basis."""
            return self._quadrature
    
        @property
        def final_time(self) -> float:
            """Return final time."""
            return self._final_time
    
        @property
        def wave_speed(self) -> float:
            """Return wave speed."""
            return self._wave_speed
    
        @property
        def cfl_number(self) -> float:
            """Return CFL number."""
            return self._cfl_number
    
        def _reset(self) -> None:
            """Reset instance variables."""
            pass
    
        def initialize_approximation(self) -> Tuple[float, ndarray]:
            """Initialize projection and time step for approximation."""
            return self._initialize_time_step(), self._initialize_projection()
    
        def _initialize_time_step(self):
            """Initialize time step."""
            return abs(self._cfl_number * self._mesh.cell_len / self._wave_speed)
    
        @abstractmethod
        @enforce_boundary()
        def _initialize_projection(self) -> ndarray:
            """Initialize projection."""
            pass
    
        def create_data_dict(self):
            """Return dictionary with data necessary to construct equation."""
            return {'basis': self._basis.create_data_dict(),
                    'mesh': self._mesh.create_data_dict(),
                    'final_time': self._final_time,
                    'wave_speed': self._wave_speed,
                    'cfl_number': self._cfl_number
                    }
    
        @abstractmethod
        def update_time_step(self, projection: ndarray, current_time: float,
                             time_step: float) -> Tuple[float, float]:
            """Update time step.
    
            Parameters
            ----------
            projection : ndarray
                Current projection during approximation.
            current_time : float
                Current time during approximation.
            time_step : float
                Length of time step during approximation.
            Returns
            -------
            cfl_number : float
                Updated CFL number to ensure stability.
            time_step : float
                Updated time step for approximation.
    
            """
            pass
    
        @abstractmethod
        def solve_exactly(self, mesh: Mesh) -> Tuple[ndarray, ndarray]:
            """Calculate exact solution.
    
            Parameters
            ----------
            mesh : Mesh
                Mesh for evaluation.
    
            Returns
            -------
            grid : ndarray
                Array containing evaluation grid for a function.
            exact : ndarray
                Array containing exact evaluation of a function.
    
            """
            pass
    
        @abstractmethod
        @enforce_boundary()
        def update_right_hand_side(self, projection: ndarray) -> ndarray:
            """Update right-hand side.
    
            Parameter
            ---------
            projection : ndarray
                Matrix of projection for each polynomial degree.
    
            Returns
            -------
            ndarray
                Matrix of right-hand side.
    
            """
            pass
    
    
    class LinearAdvection(Equation):
        """Class for linear advection equation.
    
        Attributes
        ----------
        volume_integral_matrix : ndarray
            Volume integral matrix.
        flux_matrix : ndarray
            Flux matrix.
    
        Methods
        -------
        update_right_hand_side(projection)
            Update right-hand side.
    
        Notes
        -----
        .. math:: u_t + u_x = 0
    
        """
    
        def _reset(self) -> None:
            """Reset instance variables."""
            matrix_shape = (self._basis.polynomial_degree+1,
                            self._basis.polynomial_degree+1)
            root_vector = np.array([np.sqrt(degree+0.5) for degree in range(
                self._basis.polynomial_degree+1)])
            degree_matrix = np.matmul(root_vector[:, np.newaxis],
                                      root_vector[:, np.newaxis].T)
    
            # Set volume integral matrix
            matrix = np.ones(matrix_shape)
            if self._wave_speed > 0:
                matrix[np.fromfunction(
                    lambda i, j: (j >= i) | ((i+j) % 2 == 0), matrix_shape)] = -1.0
            else:
                matrix[np.fromfunction(
                    lambda i, j: (j > i) & ((i+j) % 2 == 1), matrix_shape)] = -1.0
            self._volume_integral_matrix = matrix * degree_matrix
    
            # Set flux matrix
            matrix = np.fromfunction(lambda i, j: (-1.0)**i, matrix_shape) \
                if self._wave_speed > 0 \
                else np.fromfunction(lambda i, j: (-1.0)**j, matrix_shape)
            self._flux_matrix = matrix * degree_matrix
    
        @enforce_boundary()
        def _initialize_projection(self) -> ndarray:
            """Initialize projection."""
            return do_initial_projection(init_cond=self._init_cond,
                                         mesh=self._mesh, basis=self._basis,
                                         quadrature=self._quadrature)
    
        def update_time_step(self, projection: ndarray, current_time: float,
                             time_step: float) -> Tuple[float, float]:
            """Update time step.
    
            Parameters
            ----------
            projection : ndarray
                Current projection during approximation.
            current_time : float
                Current time during approximation.
            time_step : float
                Length of time step during approximation.
            Returns
            -------
            cfl_number : float
                Updated CFL number to ensure stability.
            time_step : float
                Updated time step for approximation.
    
            """
            cfl_number = self._cfl_number
    
            # Adjust for final time-step
            if current_time+time_step > self.final_time:
                time_step = self.final_time-current_time
                cfl_number = self.wave_speed * time_step / self._mesh.cell_len
    
            return cfl_number, time_step
    
        def solve_exactly(self, mesh: Mesh) -> Tuple[ndarray, ndarray]:
            """Calculate exact solution.
    
            Parameters
            ----------
            mesh : Mesh
                Mesh for evaluation.
    
            Returns
            -------
            grid : ndarray
                Array containing evaluation grid for a function.
            exact : ndarray
                Array containing exact evaluation of a function.
    
            """
            num_periods = np.floor(self.wave_speed * self.final_time /
                                   mesh.interval_len)
    
            grid = np.repeat(mesh.non_ghost_cells, self._quadrature.num_nodes) + \
                mesh.cell_len / 2 * np.tile(self._quadrature.nodes, mesh.num_cells)
    
            # Project points into correct periodic interval
            points = np.array([point-self.wave_speed *
                               self.final_time+num_periods * mesh.interval_len
                               for point in grid])
            left_bound, right_bound = mesh.bounds
            while np.any(points < left_bound):
                points[points < left_bound] += mesh.interval_len
            while np.any(points) > right_bound:
                points[points > right_bound] -= mesh.interval_len
    
            exact = np.array([self._init_cond.calculate(mesh=mesh, x=point) for
                              point in points])
    
            grid = np.reshape(grid, (1, grid.size))
            exact = np.reshape(exact, (1, exact.size))
    
            return grid, exact
    
        @enforce_boundary()
        def update_right_hand_side(self, projection: ndarray) -> ndarray:
            """Update right-hand side.
    
            Parameter
            ---------
            projection : ndarray
                Matrix of projection for each polynomial degree.
    
            Returns
            -------
            ndarray
                Matrix of right-hand side.
    
            """
            right_hand_side = np.zeros_like(projection)
            if self._wave_speed > 0:
                right_hand_side[:, self._mesh.num_ghost_cells:
                                -self._mesh.num_ghost_cells] = \
                    2 * (self._flux_matrix @
                         projection[:, self._mesh.num_ghost_cells-1:
                                    -self._mesh.num_ghost_cells-1] +
                         self._volume_integral_matrix @
                         projection[:, self._mesh.num_ghost_cells:
                                    -self._mesh.num_ghost_cells])
            else:
                right_hand_side[:, self._mesh.num_ghost_cells:
                                -self._mesh.num_ghost_cells] = \
                    2 * (self._flux_matrix @
                         projection[:, self._mesh.num_ghost_cells+1:] +
                         self._volume_integral_matrix @
                         projection[:, self._mesh.num_ghost_cells:
                                    -self._mesh.num_ghost_cells])
    
            return right_hand_side
    
    
    class Burgers(Equation):
        """Class for Burgers' equation.
    
        Attributes
        ----------
        volume_integral_matrix : ndarray
            Volume integral matrix.
        flux_matrix : ndarray
            Flux matrix.
    
        Methods
        -------
        update_right_hand_side(projection)
            Update right-hand side.
    
        Notes
        -----
        .. math:: u_t + u*u_x = 0
    
        """
    
        def _reset(self) -> None:
            """Reset instance variables."""
            pass
            # matrix_shape = (self._basis.polynomial_degree+1,
            #                 self._basis.polynomial_degree+1)
            # root_vector = np.array([np.sqrt(degree+0.5) for degree in range(
            #     self._basis.polynomial_degree+1)])
            # degree_matrix = np.matmul(root_vector[:, np.newaxis],
            #                           root_vector[:, np.newaxis].T)
            #
            # # Set volume integral matrix
            # matrix = np.ones(matrix_shape)
            # if self._wave_speed > 0:
            #     matrix[np.fromfunction(
            #         lambda i, j: (j >= i) | ((i+j) % 2 == 0), matrix_shape)] = -1.0
            # else:
            #     matrix[np.fromfunction(
            #         lambda i, j: (j > i) & ((i+j) % 2 == 1), matrix_shape)] = -1.0
            # self._volume_integral_matrix = matrix * degree_matrix
            #
            # # Set flux matrix
            # matrix = np.fromfunction(lambda i, j: (-1.0)**i, matrix_shape) \
            #     if self._wave_speed > 0 \
            #     else np.fromfunction(lambda i, j: (-1.0)**j, matrix_shape)
            # self._flux_matrix = matrix * degree_matrix
    
        @enforce_boundary()
        def _initialize_projection(self) -> ndarray:
            """Initialize projection."""
            return do_initial_projection(init_cond=self._init_cond,
                                         mesh=self._mesh, basis=self._basis,
                                         quadrature=self._quadrature)
    
        def update_time_step(self, projection: ndarray, current_time: float,
                             time_step: float) -> Tuple[float, float]:
            """Update time step.
    
            Adapt CFL number to ensure right-hand side is multiplied with dt/dx.
    
            Parameters
            ----------
            projection : ndarray
                Current projection during approximation.
            current_time : float
                Current time during approximation.
            time_step : float
                Length of time step during approximation.
            Returns
            -------
            cfl_number : float
                Updated CFL number to ensure stability.
            time_step : float
                Updated time step for approximation.
    
            """
            cfl_number = self._cfl_number
    
            max_velocity = max(abs(projection[0, :] * np.sqrt(0.5)))
            time_step = cfl_number * self._mesh.cell_len / max_velocity
            cfl_number = time_step / self._mesh.cell_len
    
            # Adjust for final time-step
            if current_time+time_step > self._final_time:
                time_step = self._final_time-current_time
                cfl_number = time_step / self._mesh.cell_len
    
            return cfl_number, time_step
    
        def solve_exactly(self, mesh: Mesh) -> Tuple[ndarray, ndarray]:
            """Calculate exact solution.
    
            Parameters
            ----------
            mesh : Mesh
                Mesh for evaluation.
    
            Returns
            -------
            grid : ndarray
                Array containing evaluation grid for a function.
            exact : ndarray
                Array containing exact evaluation of a function.
    
            """
            grid = np.repeat(mesh.non_ghost_cells, self._quadrature.num_nodes) + \
                mesh.cell_len / 2 * np.tile(self._quadrature.nodes, mesh.num_cells)
            #
            # # Project points into correct periodic interval
            # points = np.array([point-self.wave_speed *
            #                    self.final_time+num_periods * mesh.interval_len
            #                    for point in grid])
            # left_bound, right_bound = mesh.bounds
            # while np.any(points < left_bound):
            #     points[points < left_bound] += mesh.interval_len
            # while np.any(points) > right_bound:
            #     points[points > right_bound] -= mesh.interval_len
            #
            # exact = np.array([self._init_cond.calculate(mesh=mesh, x=point) for
            #                   point in points])
            #
            grid = np.reshape(grid, (1, grid.size))
            # exact = np.reshape(exact, (1, exact.size))
            #
            # return grid, exact
            exact = []
    
            if self._init_cond.__name__ == 'Sine':
                # u(x,t) = u(x - u0*time, 0)
                exact = self._init_cond._height_adjustment + \
                        self._init_cond._stretch_factor * \
                        self.implicit_burgers_solver(grid, mesh)
            elif self._init_cond.__name__ == 'DiscontinuousConstant':
                # u(x,t) = u(x - u0*time, 0)
                exact = self.rarefaction_wave(grid)
    
            return grid, exact
    
        def rarefaction_wave(self, grid_values):
            uexact = 0 * grid_values
            N = np.size(grid_values)
    
            for i in range(N):
                if grid_values[0, i] <= - self._final_time:
                    uexact[0, i] = -1
                elif - self._final_time < grid_values[0, i] < self._final_time:
                    uexact[0, i] = grid_values[0, i] / self._final_time
                else:
                    uexact[0, i] = 1
    
            return uexact
    
        def implicit_burgers_solver(self, grid_values, mesh):
            """
    
            Parameters
            ----------
            grid_values
            mesh
    
            Returns
            -------
    
            Notes
            -----
            Code adapted from DG code by Hesthaven.
    
            """
            sspeed = self._init_cond._height_adjustment
            uexact = np.zeros(np.size(grid_values))
    
            # initialize values
            xx = np.linspace(0, 1, 200)
            uu = np.sin(self._init_cond._factor * np.pi * xx)
    
            te = self._final_time * (2 / mesh.interval_len)
            xt0 = (2 / mesh.interval_len) * (
                        grid_values-sspeed * self._final_time).reshape(
                np.size(grid_values))
            for ix in range(len(xt0)):
                if xt0[ix] > 1:
                    xt0[ix] -= 2 * np.floor(0.5 * (xt0[ix]+1))
                elif xt0[ix] < -1:
                    xt0[ix] += 2 * np.floor(0.5 * (1-xt0[ix]))
    
                ay = abs(xt0[ix])
                # Finding the closest characteristic
                i0 = 0
                for j in range(200):
                    xt = xx[j]+uu[j] * te
                    if ay < xt:
                        break
                    else:
                        i0 = j
    
                # This is our initial guess
                us = uu[i0]
                un = us
                for k in range(400):
                    us = un
                    x0 = ay-us * te
                    un = us-(us-np.sin(self._init_cond._factor * np.pi * x0)) / (
                            1+self._init_cond._factor * np.pi * np.cos(
                        self._init_cond._factor * np.pi * x0) * te)
    
                    if abs(un-us) < 1e-15:
                        break
    
                y = np.sign(xt0[ix]) * un
                if abs(ay-1) < 1e-15:
                    y = 0
                if abs(self._final_time) < 1e-15:
                    y = np.sin(self._init_cond._factor * np.pi * xt0[ix])
                uexact[ix] = y
    
            burgers_exact = uexact.reshape((1, np.size(grid_values)))
    
            return burgers_exact
    
        @enforce_boundary()
        def update_right_hand_side(self, projection: ndarray) -> ndarray:
            """Update right-hand side.
    
            Parameter
            ---------
            projection : ndarray
                Matrix of projection for each polynomial degree.
    
            Returns
            -------
            ndarray
                Matrix of right-hand side.
    
            """
            pass
            volume_integral_matrix = self._burgers_volume_integral(projection)
            boundary_matrix = self._burgers_boundary_matrix(projection)
    
            # Initialize vector and set first entry to accommodate for ghost cell
            right_hand_side = [0]
    
            for j in range(self._mesh.num_cells):
                right_hand_side.append(2*(volume_integral_matrix[:, j + 1] +
                                          boundary_matrix[:, j + 1]))
    
            # Set ghost cells to respective value
            # (Periodic, Updated to Dirichlet in enforce_boundary_condition
            # for DiscontinuousConstant problem)
            # right_hand_side[0] = right_hand_side[self._mesh.num_cells]
            # right_hand_side.append(right_hand_side[1])
    
            return np.transpose(right_hand_side)
            # right_hand_side = np.zeros_like(projection)
            # if self._wave_speed > 0:
            #     right_hand_side[:, self._mesh.num_ghost_cells:
            #                     -self._mesh.num_ghost_cells] = \
            #         2 * (self._flux_matrix @
            #              projection[:, self._mesh.num_ghost_cells-1:
            #                         -self._mesh.num_ghost_cells-1] +
            #              self._volume_integral_matrix @
            #              projection[:, self._mesh.num_ghost_cells:
            #                         -self._mesh.num_ghost_cells])
            # else:
            #     right_hand_side[:, self._mesh.num_ghost_cells:
            #                     -self._mesh.num_ghost_cells] = \
            #         2 * (self._flux_matrix @
            #              projection[:, self._mesh.num_ghost_cells+1:] +
            #              self._volume_integral_matrix @
            #              projection[:, self._mesh.num_ghost_cells:
            #                         -self._mesh.num_ghost_cells])
            #
            # return right_hand_side
    
        def _burgers_volume_integral(self, projection):
            basis_vector = self._basis.basis
            derivative_basis_vector = self._basis.derivative
    
            # Initialize matrix and set first entry to accommodate for ghost cell
            volume_integral_matrix = [0]
    
            for cell in range(self._mesh.num_cells):
                new_row = []
                # Approximation u at Gauss-Legendre points
                approx_eval = [sum(projection[degree][cell + 1]
                                   * basis_vector[degree].subs(
                    x, self._quadrature.nodes[point])
                                   for degree in range(
                    self._basis.polynomial_degree + 1))
                               for point in range(self._quadrature.nodes)]
                approx_eval = np.array(approx_eval)
    
                # print("u", approx_eval)
                # Quadrature Evaluation
                for degree in range(self._basis.polynomial_degree + 1):
                    weighted_derivatives = [
                        derivative_basis_vector[degree].subs(
                            x, self._quadrature.nodes[point])
                        * self._quadrature.weights[point]
                        for point in range(self._quadrature.num_nodes)]
                    new_entry = sum(list(approx_eval**2 * weighted_derivatives)) / 2
    
                    new_row.append(np.float64(new_entry))
    
                new_row = np.array(new_row).T
                volume_integral_matrix.append(new_row)
    
            # Set ghost cells to respective value
            # (Periodic, Updated to Dirichlet in enforce_boundary_condition
            # for DiscontinuousConstant problem)
            # volume_integral_matrix[0] = volume_integral_matrix[
            # self._mesh.num_cells]
            # volume_integral_matrix.append(volume_integral_matrix[1])
    
            return np.transpose(np.array(volume_integral_matrix))
    
        def _burgers_local_Lax_Friedrichs(self, projection):
            # Calculating the left and right boundaries for each cell,
            # u_{j-1/2}^+, u_{j+1/2}^-
            boundary_left, boundary_right = self._calculate_boundary_points(
                projection, self._basis.polynomial_degree + 1)
            # print("shape BL", boundary_left.shape)
    
            # Initializing Burgers local Lax-Friedrichs flux matrix
            burgers_flux = [0]
            for j in range(self._mesh.num_cells):
                # approximations j+1/2^-, j+1/2^+ for interior cells
                # and max velocity
                approx_minus = boundary_right[:, j+1]
                approx_plus = boundary_left[:, j + 2]
                max_velocity = max(abs(approx_minus), abs(approx_plus))
    
                # respective fluxes
                flux_minus = approx_minus**2 / 2
                flux_plus = approx_plus**2 / 2
    
                # local Lax-Friedrichs flux
                numerical_flux = 0.5 * (flux_minus + flux_plus - max_velocity *
                                        (approx_plus - approx_minus))
    
                burgers_flux.append(numerical_flux)
    
            # Set Ghost cells to respective value
            # (Periodic, Updated to Dirichlet in enforce_boundary_condition
            # for DiscontinuousConstant problem)
            # burgers_flux[0] = burgers_flux[self._mesh.num_cells]
            # burgers_flux.append(burgers_flux[1])
    
            return np.array(burgers_flux)
    
        def _burgers_boundary_matrix(self, projection):
            burgers_LF_flux = self._burgers_local_Lax_Friedrichs(projection)
    
            # Initializing boundary matrix
            boundary_matrix = [0]
    
            for j in range(self._mesh.num_cells):
                new_row = []
                for degree in range(self._basis.polynomial_degree + 1):
                    new_row.append(np.sqrt(degree + 0.5) *
                                   (-burgers_LF_flux[j + 1] + (-1)**degree *
                                    burgers_LF_flux[j]))
    
                boundary_matrix.append(new_row)
    
            # Set Ghost cells to respective value
            # (Periodic, Updated to Dirichlet in enforce_boundary_condition
            # for DiscontinuousConstant problem)
            # boundary_matrix[0] = boundary_matrix[self._mesh.num_cells]
            # boundary_matrix.append(boundary_matrix[1])
    
            boundary_matrix = np.transpose(np.array(boundary_matrix))
    
            return boundary_matrix[0]
    
        @staticmethod
        def _calculate_boundary_points(projection, max_degree):
            # Approximation at j-1/2 ^+
            boundary_left = [sum(projection[degree][cell] * (-1) ** degree *
                                 np.sqrt(degree + 0.5)
                                 for degree in range(max_degree))
                             for cell in range(len(projection[0]))]
    
            # Approximation at j+1/2 ^-
            boundary_right = [sum(projection[degree][cell]* np.sqrt(degree + 0.5)
                                  for degree in range(max_degree))
                              for cell in range(len(projection[0]))]
    
            return np.reshape(np.array(boundary_left), (1, len(boundary_left))), \
                np.reshape(np.array(boundary_right), (1, len(boundary_right)))