Select Git revision
-
zhangzthu authored
* add 'book' in DST evaluation. * Fix TRADE crosswoz training evaluation bug * Add note on deploy Co-authored-by:
zheng <zheng@zhangzheng-PC.lan>
zhangzthu authored* add 'book' in DST evaluation. * Fix TRADE crosswoz training evaluation bug * Add note on deploy Co-authored-by:
zheng <zheng@zhangzheng-PC.lan>
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ANN_Data_Generator.py 9.87 KiB
# -*- coding: utf-8 -*-
"""
@author: Soraya Terrab (sorayaterrab), Laura C. Kühle
"""
import os
import time
import numpy as np
from DG_Approximation import do_initial_projection
from projection_utils import Mesh
from Quadrature import Gauss
from Basis_Function import OrthonormalLegendre
class TrainingDataGenerator:
"""Class for training data generator.
Generates random training data for given initial conditions.
Attributes
----------
basis_list : list
List of basis instances for degree 1 to 4.
quadrature_list : list
List of Gauss quadrature instances for degree 2 to 5.
mesh_list : list
List of Mesh instances for 2**(3 to 8) cells.
Methods
-------
build_training_data(num_samples)
Builds random training data.
"""
def __init__(self):
"""Initializes TrainingDataGenerator."""
self._basis_list = [OrthonormalLegendre(pol_deg)
for pol_deg in range(7)]
self._quadrature_list = [Gauss({'num_nodes': pol_deg+1})
for pol_deg in range(7)]
self._mesh_list = [Mesh(left_bound=-1, right_bound=1,
num_ghost_cells=0, num_cells=2**exp)
for exp in range(5, 12)]
def build_training_data(self, init_cond_list, num_samples, balance=0.5,
directory='test_data', add_reconstructions=True,
stencil_len=3):
"""Builds random training data.
Creates training data consisting of random ANN input and saves it.
Parameters
----------
init_cond_list : list
List of names of initial conditions for training.
num_samples : int
Number of training data samples to generate.
balance : float, optional
Ratio between smooth and discontinuous training data. Default: 0.5.
directory : str, optional
Path to directory in which training data is saved.
Default: 'test_data'.
add_reconstructions : bool, optional
Flag whether reconstructions of the middle cell are included.
Default: True.
stencil_len : int, optional
Size of training data array. Default: 3.
Returns
-------
data_dict : dict
Dictionary containing input (normalized and non-normalized) and
output data.
"""
tic = time.perf_counter()
# Set stencil length
if stencil_len % 2 == 0:
raise ValueError('Invalid stencil length (even value): "%d"'
% stencil_len)
print('Calculating training data...\n')
data_dict = self._calculate_data_set(init_cond_list,
num_samples, balance,
add_reconstructions,
stencil_len)
print('Finished calculating training data!')
self._save_data(directory=directory, data=data_dict)
toc = time.perf_counter()
print(f'Total runtime: {toc - tic:0.4f}s')
return data_dict
def _calculate_data_set(self, init_cond_list, num_samples, balance,
add_reconstructions, stencil_len):
"""Calculates random training data of given stencil length.
Creates training data with a given ratio between smooth and
discontinuous samples and fixed stencil length.
Parameters
----------
init_cond_list : list
List of names of initial conditions for training.
num_samples : int
Number of training data samples to generate.
balance : float
Ratio between smooth and discontinuous training data.
add_reconstructions : bool
Flag whether reconstructions of the middle cell are included.
stencil_len : int
Size of training data array.
Returns
-------
dict
Dictionary containing input (normalized and non-normalized) and
output data.
"""
# print(type(init_cond_list))
# Separate smooth and discontinuous initial conditions
smooth_functions = []
troubled_functions = []
for function in init_cond_list:
if function['function'].is_smooth():
smooth_functions.append(function)
else:
troubled_functions.append(function)
num_smooth_samples = round(num_samples * balance)
smooth_input, smooth_output = self._generate_cell_data(
num_smooth_samples, smooth_functions, add_reconstructions,
stencil_len, True)
num_troubled_samples = num_samples - num_smooth_samples
troubled_input, troubled_output = self._generate_cell_data(
num_troubled_samples, troubled_functions, add_reconstructions,
stencil_len, False)
# Merge Data
input_matrix = np.concatenate((smooth_input, troubled_input), axis=0)
output_matrix = np.concatenate((smooth_output, troubled_output),
axis=0)
# Shuffle data while keeping correct input/output matches
order = np.random.permutation(
num_smooth_samples + num_troubled_samples)
input_matrix = input_matrix[order]
output_matrix = output_matrix[order]
# Create normalized input data
norm_input_matrix = self._normalize_data(input_matrix)
return {'input_data.raw': input_matrix, 'output_data': output_matrix,
'input_data.normalized': norm_input_matrix}
def _generate_cell_data(self, num_samples, init_cond_list,
add_reconstructions, stencil_len, is_smooth):
"""Generates random training input and output.
Generates random training input and output for either smooth or
discontinuous initial conditions. For each input the output has the
shape [is_smooth, is_troubled].
Parameters
----------
num_samples : int
Number of training data samples to generate.
init_cond_list : list
List of names of initial conditions for training.
add_reconstructions : bool
Flag whether reconstructions of the middle cell are included.
stencil_len : int
Size of training data array.
is_smooth : bool
Flag whether initial conditions are smooth.
Returns
-------
input_data : ndarray
Array containing input data.
output_data : ndarray
Array containing output data.
"""
# print(type(init_cond_list))
troubled_indicator = 'without' if is_smooth else 'with'
print(f'Calculating data {troubled_indicator} troubled cells...')
print(f'Samples to complete: {num_samples}')
tic = time.perf_counter()
num_datapoints = stencil_len
if add_reconstructions:
num_datapoints += 2
input_data = np.zeros((num_samples, num_datapoints))
num_init_cond = len(init_cond_list)
count = 0
for i in range(num_samples):
# Select and initialize initial condition
function_id = i % num_init_cond
init_cond = init_cond_list[function_id]['function']
init_cond.randomize(
init_cond_list[function_id]['config'].copy())
# Build mesh for random stencil of given length
mesh = self._mesh_list[int(np.random.randint(
5, high=12, size=1))-5].random_stencil(stencil_len)
# Induce shift to capture troubled cells
shift = 0 if init_cond.is_smooth() \
else mesh.non_ghost_cells[stencil_len//2]
if init_cond.discontinuity_position == 'left':
shift -= mesh.cell_len/2
elif init_cond.discontinuity_position == 'right':
shift += mesh.cell_len/2
# Calculate basis coefficients for stencil
polynomial_degree = np.random.randint(1, high=7)
projection = do_initial_projection(
init_cond=init_cond, mesh=mesh,
basis=self._basis_list[polynomial_degree],
quadrature=self._quadrature_list[polynomial_degree],
x_shift=shift)
input_data[i] = self._basis_list[
polynomial_degree].calculate_cell_average(
projection=projection[:, 1:-1],
stencil_len=stencil_len,
add_reconstructions=add_reconstructions)
count += 1
if count % 1000 == 0:
print(f'{count} samples completed.')
toc = time.perf_counter()
print(f'Finished calculating data {troubled_indicator} '
f'troubled cells!')
print(f'Calculation time: {toc - tic:0.4f}s\n')
# Set output data
output_data = np.zeros((num_samples, 2))
output_data[:, int(not is_smooth)] = np.ones(num_samples)
return input_data, output_data
@staticmethod
def _normalize_data(input_data):
"""Normalizes data.
Parameters
----------
input_data : ndarray
Array containing input data.
Returns
-------
ndarray
Array containing normalized input data.
"""
normalized_input_data = []
for entry in input_data:
max_function_value = max(max(np.absolute(entry)), 1)
normalized_input_data.append(entry / max_function_value)
return np.array(normalized_input_data)
@staticmethod
def _save_data(directory, data):
"""Saves data."""
# Set directory
if not os.path.exists(directory):
os.makedirs(directory)
print('Saving training data.')
for key in data.keys():
name = directory + '/' + key + '.npy'
np.save(name, data[key])