Skip to content
Snippets Groups Projects
Select Git revision
  • 18b5725cacfdfcd6fe14a24b3308580fb445c7ed
  • master default protected
  • emoUS
  • add_default_vectorizer_and_pretrained_loading
  • clean_code
  • readme
  • issue127
  • generalized_action_dicts
  • ppo_num_dialogues
  • crossowoz_ddpt
  • issue_114
  • robust_masking_feature
  • scgpt_exp
  • e2e-soloist
  • convlab_exp
  • change_system_act_in_env
  • pre-training
  • nlg-scgpt
  • remapping_actions
  • soloist
20 results

run.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    ANN_Data_Generator.py 9.87 KiB
    # -*- coding: utf-8 -*-
    """
    @author: Soraya Terrab (sorayaterrab), Laura C. Kühle
    
    """
    import os
    import time
    import numpy as np
    
    from DG_Approximation import do_initial_projection
    from projection_utils import Mesh
    from Quadrature import Gauss
    from Basis_Function import OrthonormalLegendre
    
    
    class TrainingDataGenerator:
        """Class for training data generator.
    
        Generates random training data for given initial conditions.
    
        Attributes
        ----------
        basis_list : list
            List of basis instances for degree 1 to 4.
        quadrature_list : list
            List of Gauss quadrature instances for degree 2 to 5.
        mesh_list : list
            List of Mesh instances for 2**(3 to 8) cells.
    
        Methods
        -------
        build_training_data(num_samples)
            Builds random training data.
    
        """
        def __init__(self):
            """Initializes TrainingDataGenerator."""
            self._basis_list = [OrthonormalLegendre(pol_deg)
                                for pol_deg in range(7)]
            self._quadrature_list = [Gauss({'num_nodes': pol_deg+1})
                                     for pol_deg in range(7)]
            self._mesh_list = [Mesh(left_bound=-1, right_bound=1,
                                    num_ghost_cells=0, num_cells=2**exp)
                               for exp in range(5, 12)]
    
        def build_training_data(self, init_cond_list, num_samples, balance=0.5,
                                directory='test_data', add_reconstructions=True,
                                stencil_len=3):
            """Builds random training data.
    
            Creates training data consisting of random ANN input and saves it.
    
            Parameters
            ----------
            init_cond_list : list
                List of names of initial conditions for training.
            num_samples : int
                Number of training data samples to generate.
            balance : float, optional
                Ratio between smooth and discontinuous training data. Default: 0.5.
            directory : str, optional
                Path to directory in which training data is saved.
                Default: 'test_data'.
            add_reconstructions : bool, optional
                Flag whether reconstructions of the middle cell are included.
                Default: True.
            stencil_len : int, optional
                Size of training data array. Default: 3.
    
            Returns
            -------
            data_dict : dict
                Dictionary containing input (normalized and non-normalized) and
                output data.
    
            """
            tic = time.perf_counter()
    
            # Set stencil length
            if stencil_len % 2 == 0:
                raise ValueError('Invalid stencil length (even value): "%d"'
                                 % stencil_len)
    
            print('Calculating training data...\n')
            data_dict = self._calculate_data_set(init_cond_list,
                                                 num_samples, balance,
                                                 add_reconstructions,
                                                 stencil_len)
            print('Finished calculating training data!')
    
            self._save_data(directory=directory, data=data_dict)
            toc = time.perf_counter()
            print(f'Total runtime: {toc - tic:0.4f}s')
            return data_dict
    
        def _calculate_data_set(self, init_cond_list, num_samples, balance,
                                add_reconstructions, stencil_len):
            """Calculates random training data of given stencil length.
    
            Creates training data with a given ratio between smooth and
            discontinuous samples and fixed stencil length.
    
            Parameters
            ----------
            init_cond_list : list
                List of names of initial conditions for training.
            num_samples : int
                Number of training data samples to generate.
            balance : float
                Ratio between smooth and discontinuous training data.
            add_reconstructions : bool
                Flag whether reconstructions of the middle cell are included.
            stencil_len : int
                Size of training data array.
    
            Returns
            -------
            dict
                Dictionary containing input (normalized and non-normalized) and
                output data.
    
            """
            # print(type(init_cond_list))
            # Separate smooth and discontinuous initial conditions
            smooth_functions = []
            troubled_functions = []
            for function in init_cond_list:
                if function['function'].is_smooth():
                    smooth_functions.append(function)
                else:
                    troubled_functions.append(function)
    
            num_smooth_samples = round(num_samples * balance)
            smooth_input, smooth_output = self._generate_cell_data(
                num_smooth_samples, smooth_functions, add_reconstructions,
                stencil_len, True)
    
            num_troubled_samples = num_samples - num_smooth_samples
            troubled_input, troubled_output = self._generate_cell_data(
                num_troubled_samples, troubled_functions, add_reconstructions,
                stencil_len, False)
    
            # Merge Data
            input_matrix = np.concatenate((smooth_input, troubled_input), axis=0)
            output_matrix = np.concatenate((smooth_output, troubled_output),
                                           axis=0)
    
            # Shuffle data while keeping correct input/output matches
            order = np.random.permutation(
                num_smooth_samples + num_troubled_samples)
            input_matrix = input_matrix[order]
            output_matrix = output_matrix[order]
    
            # Create normalized input data
            norm_input_matrix = self._normalize_data(input_matrix)
    
            return {'input_data.raw': input_matrix, 'output_data': output_matrix,
                    'input_data.normalized': norm_input_matrix}
    
        def _generate_cell_data(self, num_samples, init_cond_list,
                                add_reconstructions, stencil_len, is_smooth):
            """Generates random training input and output.
    
            Generates random training input and output for either smooth or
            discontinuous initial conditions. For each input the output has the
            shape [is_smooth, is_troubled].
    
            Parameters
            ----------
            num_samples : int
                Number of training data samples to generate.
            init_cond_list : list
                List of names of initial conditions for training.
            add_reconstructions : bool
                Flag whether reconstructions of the middle cell are included.
            stencil_len : int
                Size of training data array.
            is_smooth : bool
                Flag whether initial conditions are smooth.
    
            Returns
            -------
            input_data : ndarray
                Array containing input data.
            output_data : ndarray
                Array containing output data.
    
            """
            # print(type(init_cond_list))
            troubled_indicator = 'without' if is_smooth else 'with'
            print(f'Calculating data {troubled_indicator} troubled cells...')
            print(f'Samples to complete: {num_samples}')
            tic = time.perf_counter()
    
            num_datapoints = stencil_len
            if add_reconstructions:
                num_datapoints += 2
            input_data = np.zeros((num_samples, num_datapoints))
            num_init_cond = len(init_cond_list)
            count = 0
            for i in range(num_samples):
                # Select and initialize initial condition
                function_id = i % num_init_cond
                init_cond = init_cond_list[function_id]['function']
                init_cond.randomize(
                    init_cond_list[function_id]['config'].copy())
    
                # Build mesh for random stencil of given length
                mesh = self._mesh_list[int(np.random.randint(
                    5, high=12, size=1))-5].random_stencil(stencil_len)
    
                # Induce shift to capture troubled cells
                shift = 0 if init_cond.is_smooth() \
                    else mesh.non_ghost_cells[stencil_len//2]
                if init_cond.discontinuity_position == 'left':
                    shift -= mesh.cell_len/2
                elif init_cond.discontinuity_position == 'right':
                    shift += mesh.cell_len/2
    
                # Calculate basis coefficients for stencil
                polynomial_degree = np.random.randint(1, high=7)
                projection = do_initial_projection(
                    init_cond=init_cond, mesh=mesh,
                    basis=self._basis_list[polynomial_degree],
                    quadrature=self._quadrature_list[polynomial_degree],
                    x_shift=shift)
                input_data[i] = self._basis_list[
                    polynomial_degree].calculate_cell_average(
                    projection=projection[:, 1:-1],
                    stencil_len=stencil_len,
                    add_reconstructions=add_reconstructions)
    
                count += 1
                if count % 1000 == 0:
                    print(f'{count} samples completed.')
    
            toc = time.perf_counter()
            print(f'Finished calculating data {troubled_indicator} '
                  f'troubled cells!')
            print(f'Calculation time: {toc - tic:0.4f}s\n')
    
            # Set output data
            output_data = np.zeros((num_samples, 2))
            output_data[:, int(not is_smooth)] = np.ones(num_samples)
    
            return input_data, output_data
    
        @staticmethod
        def _normalize_data(input_data):
            """Normalizes data.
    
            Parameters
            ----------
            input_data : ndarray
                Array containing input data.
    
            Returns
            -------
            ndarray
                Array containing normalized input data.
    
            """
            normalized_input_data = []
            for entry in input_data:
                max_function_value = max(max(np.absolute(entry)), 1)
                normalized_input_data.append(entry / max_function_value)
            return np.array(normalized_input_data)
    
        @staticmethod
        def _save_data(directory, data):
            """Saves data."""
            # Set directory
            if not os.path.exists(directory):
                os.makedirs(directory)
    
            print('Saving training data.')
            for key in data.keys():
                name = directory + '/' + key + '.npy'
                np.save(name, data[key])