Skip to content
Snippets Groups Projects
Commit 156372c7 authored by zqwerty's avatar zqwerty
Browse files

add merge_res for milu

parent 9bb720b5
No related branches found
No related tags found
No related merge requests found
...@@ -36,6 +36,11 @@ $ python test.py --config_path path_to_a_config_file ...@@ -36,6 +36,11 @@ $ python test.py --config_path path_to_a_config_file
The result (`output.json`) will be saved under the `output_dir` of the config file. The result (`output.json`) will be saved under the `output_dir` of the config file.
To generate `predictions.json` that merges test data and model predictions under the same directory of the `output.json`:
```sh
$ python merge_predict_res.py -d dataset_name -s {user,system,all} -c CONTEXT_WINDOW_SIZE -p path_to_output.json
```
#### Predict #### Predict
See `nlu.py` for usage. See `nlu.py` for usage.
...@@ -28,7 +28,10 @@ We support training MILU on datasets that are in our unified format. ...@@ -28,7 +28,10 @@ We support training MILU on datasets that are in our unified format.
Takes MultiWOZ 2.1 (unified format) as an example, Takes MultiWOZ 2.1 (unified format) as an example,
```bash ```bash
$ python train.py unified_datasets/configs/multiwoz21_user_context3.jsonnet -s serialization_dir $ python train.py unified_datasets/configs/multiwoz21_user_context3.jsonnet -s serialization_dir
$ python evaluate.py serialization_dir/model.tar.gz test --cuda-device {CUDA_DEVICE} $ python evaluate.py serialization_dir/model.tar.gz test --cuda-device {CUDA_DEVICE} --output_file output/multiwoz21_user/output.json
# to generate output/multiwoz21_user/predictions.json that merges test data and model predictions.
$ python unified_datasets/merge_predict_res.py -d multiwoz21 -s user -p output/multiwoz21_user/output.json
``` ```
Note that the config file is different from the above. You should set: Note that the config file is different from the above. You should set:
- `"use_unified_datasets": true` in `dataset_reader` and `model` - `"use_unified_datasets": true` in `dataset_reader` and `model`
......
...@@ -16,7 +16,7 @@ from allennlp.common.util import prepare_environment ...@@ -16,7 +16,7 @@ from allennlp.common.util import prepare_environment
from allennlp.data.dataset_readers.dataset_reader import DatasetReader from allennlp.data.dataset_readers.dataset_reader import DatasetReader
from allennlp.data.iterators import DataIterator from allennlp.data.iterators import DataIterator
from allennlp.models.archival import load_archive from allennlp.models.archival import load_archive
from allennlp.training.util import evaluate from convlab2.nlu.milu.util import evaluate
from convlab2.nlu.milu import dataset_reader, model from convlab2.nlu.milu import dataset_reader, model
...@@ -28,7 +28,7 @@ argparser.add_argument('archive_file', type=str, help='path to an archived train ...@@ -28,7 +28,7 @@ argparser.add_argument('archive_file', type=str, help='path to an archived train
argparser.add_argument('input_file', type=str, help='path to the file containing the evaluation data') argparser.add_argument('input_file', type=str, help='path to the file containing the evaluation data')
argparser.add_argument('--output-file', type=str, help='path to output file') argparser.add_argument('--output_file', type=str, help='path to output file')
argparser.add_argument('--weights-file', argparser.add_argument('--weights-file',
type=str, type=str,
...@@ -105,7 +105,7 @@ def evaluate_from_args(args: argparse.Namespace) -> Dict[str, Any]: ...@@ -105,7 +105,7 @@ def evaluate_from_args(args: argparse.Namespace) -> Dict[str, Any]:
iterator = DataIterator.from_params(iterator_params) iterator = DataIterator.from_params(iterator_params)
iterator.index_with(model.vocab) iterator.index_with(model.vocab)
metrics = evaluate(model, instances, iterator, args.cuda_device, args.batch_weight_key) metrics, predict_results = evaluate(model, instances, iterator, args.cuda_device, args.batch_weight_key)
logger.info("Finished evaluating.") logger.info("Finished evaluating.")
logger.info("Metrics:") logger.info("Metrics:")
...@@ -114,8 +114,8 @@ def evaluate_from_args(args: argparse.Namespace) -> Dict[str, Any]: ...@@ -114,8 +114,8 @@ def evaluate_from_args(args: argparse.Namespace) -> Dict[str, Any]:
output_file = args.output_file output_file = args.output_file
if output_file: if output_file:
with open(output_file, "w") as file: with open(output_file, "w", encoding='utf-8') as file:
json.dump(metrics, file, indent=4) json.dump(predict_results, file, indent=2, ensure_ascii=False)
return metrics return metrics
......
import json
import os
from convlab2.util import load_dataset, load_nlu_data
def merge(dataset_name, speaker, save_dir, context_window_size, predict_result):
assert os.path.exists(predict_result)
dataset = load_dataset(dataset_name)
data = load_nlu_data(dataset, data_split='test', speaker=speaker, use_context=context_window_size>0, context_window_size=context_window_size)['test']
if save_dir is None:
save_dir = os.path.dirname(predict_result)
else:
os.makedirs(save_dir, exist_ok=True)
predict_result = json.load(open(predict_result))
for sample, prediction in zip(data, predict_result):
sample['predictions'] = {'dialogue_acts': prediction}
json.dump(data, open(os.path.join(save_dir, 'predictions.json'), 'w', encoding='utf-8'), indent=2, ensure_ascii=False)
if __name__ == '__main__':
from argparse import ArgumentParser
parser = ArgumentParser(description="merge predict results with original data for unified NLU evaluation")
parser.add_argument('--dataset', '-d', metavar='dataset_name', type=str, help='name of the unified dataset')
parser.add_argument('--speaker', '-s', type=str, choices=['user', 'system', 'all'], help='speaker(s) of utterances')
parser.add_argument('--save_dir', type=str, help='merged data will be saved as $save_dir/predictions.json. default: on the same directory as predict_result')
parser.add_argument('--context_window_size', '-c', type=int, default=0, help='how many contextual utterances are considered')
parser.add_argument('--predict_result', '-p', type=str, required=True, help='path to the output file generated by ../test.py')
args = parser.parse_args()
print(args)
merge(args.dataset, args.speaker, args.save_dir, args.context_window_size, args.predict_result)
"""
Helper functions for Trainers
"""
from typing import Any, Union, Dict, Iterable, List, Optional, Tuple
import datetime
import json
import logging
import pathlib
import os
import shutil
import torch
from torch.nn.parallel import replicate, parallel_apply
from torch.nn.parallel.scatter_gather import gather
from allennlp.common.checks import ConfigurationError, check_for_gpu
from allennlp.common.params import Params
from allennlp.common.tqdm import Tqdm
from allennlp.data.dataset_readers import DatasetReader
from allennlp.data import Instance
from allennlp.data.iterators import DataIterator
from allennlp.data.iterators.data_iterator import TensorDict
from allennlp.models.model import Model
from allennlp.models.archival import CONFIG_NAME
from allennlp.nn import util as nn_util
logger = logging.getLogger(__name__)
# We want to warn people that tqdm ignores metrics that start with underscores
# exactly once. This variable keeps track of whether we have.
class HasBeenWarned:
tqdm_ignores_underscores = False
def sparse_clip_norm(parameters, max_norm, norm_type=2) -> float:
"""Clips gradient norm of an iterable of parameters.
The norm is computed over all gradients together, as if they were
concatenated into a single vector. Gradients are modified in-place.
Supports sparse gradients.
Parameters
----------
parameters : ``(Iterable[torch.Tensor])``
An iterable of Tensors that will have gradients normalized.
max_norm : ``float``
The max norm of the gradients.
norm_type : ``float``
The type of the used p-norm. Can be ``'inf'`` for infinity norm.
Returns
-------
Total norm of the parameters (viewed as a single vector).
"""
# pylint: disable=invalid-name,protected-access
parameters = list(filter(lambda p: p.grad is not None, parameters))
max_norm = float(max_norm)
norm_type = float(norm_type)
if norm_type == float('inf'):
total_norm = max(p.grad.data.abs().max() for p in parameters)
else:
total_norm = 0
for p in parameters:
if p.grad.is_sparse:
# need to coalesce the repeated indices before finding norm
grad = p.grad.data.coalesce()
param_norm = grad._values().norm(norm_type)
else:
param_norm = p.grad.data.norm(norm_type)
total_norm += param_norm ** norm_type
total_norm = total_norm ** (1. / norm_type)
clip_coef = max_norm / (total_norm + 1e-6)
if clip_coef < 1:
for p in parameters:
if p.grad.is_sparse:
p.grad.data._values().mul_(clip_coef)
else:
p.grad.data.mul_(clip_coef)
return total_norm
def move_optimizer_to_cuda(optimizer):
"""
Move the optimizer state to GPU, if necessary.
After calling, any parameter specific state in the optimizer
will be located on the same device as the parameter.
"""
for param_group in optimizer.param_groups:
for param in param_group['params']:
if param.is_cuda:
param_state = optimizer.state[param]
for k in param_state.keys():
if isinstance(param_state[k], torch.Tensor):
param_state[k] = param_state[k].cuda(device=param.get_device())
def get_batch_size(batch: Union[Dict, torch.Tensor]) -> int:
"""
Returns the size of the batch dimension. Assumes a well-formed batch,
returns 0 otherwise.
"""
if isinstance(batch, torch.Tensor):
return batch.size(0) # type: ignore
elif isinstance(batch, Dict):
return get_batch_size(next(iter(batch.values())))
else:
return 0
def time_to_str(timestamp: int) -> str:
"""
Convert seconds past Epoch to human readable string.
"""
datetimestamp = datetime.datetime.fromtimestamp(timestamp)
return '{:04d}-{:02d}-{:02d}-{:02d}-{:02d}-{:02d}'.format(
datetimestamp.year, datetimestamp.month, datetimestamp.day,
datetimestamp.hour, datetimestamp.minute, datetimestamp.second
)
def str_to_time(time_str: str) -> datetime.datetime:
"""
Convert human readable string to datetime.datetime.
"""
pieces: Any = [int(piece) for piece in time_str.split('-')]
return datetime.datetime(*pieces)
def datasets_from_params(params: Params,
cache_directory: str = None,
cache_prefix: str = None) -> Dict[str, Iterable[Instance]]:
"""
Load all the datasets specified by the config.
Parameters
----------
params : ``Params``
cache_directory : ``str``, optional
If given, we will instruct the ``DatasetReaders`` that we construct to cache their
instances in this location (or read their instances from caches in this location, if a
suitable cache already exists). This is essentially a `base` directory for the cache, as
we will additionally add the ``cache_prefix`` to this directory, giving an actual cache
location of ``cache_directory + cache_prefix``.
cache_prefix : ``str``, optional
This works in conjunction with the ``cache_directory``. The idea is that the
``cache_directory`` contains caches for all different parameter settings, while the
``cache_prefix`` captures a specific set of parameters that led to a particular cache file.
That is, if you change the tokenization settings inside your ``DatasetReader``, you don't
want to read cached data that used the old settings. In order to avoid this, we compute a
hash of the parameters used to construct each ``DatasetReader`` and use that as a "prefix"
to the cache files inside the base ``cache_directory``. So, a given ``input_file`` would
be cached essentially as ``cache_directory + cache_prefix + input_file``, where you specify
a ``cache_directory``, the ``cache_prefix`` is based on the dataset reader parameters, and
the ``input_file`` is whatever path you provided to ``DatasetReader.read()``. In order to
allow you to give recognizable names to these prefixes if you want them, you can manually
specify the ``cache_prefix``. Note that in some rare cases this can be dangerous, as we'll
use the `same` prefix for both train and validation dataset readers.
"""
dataset_reader_params = params.pop('dataset_reader')
validation_dataset_reader_params = params.pop('validation_dataset_reader', None)
train_cache_dir, validation_cache_dir = _set_up_cache_files(dataset_reader_params,
validation_dataset_reader_params,
cache_directory,
cache_prefix)
dataset_reader = DatasetReader.from_params(dataset_reader_params)
validation_and_test_dataset_reader: DatasetReader = dataset_reader
if validation_dataset_reader_params is not None:
logger.info("Using a separate dataset reader to load validation and test data.")
validation_and_test_dataset_reader = DatasetReader.from_params(validation_dataset_reader_params)
if train_cache_dir:
dataset_reader.cache_data(train_cache_dir)
validation_and_test_dataset_reader.cache_data(validation_cache_dir)
train_data_path = params.pop('train_data_path')
logger.info("Reading training data from %s", train_data_path)
train_data = dataset_reader.read(train_data_path)
datasets: Dict[str, Iterable[Instance]] = {"train": train_data}
validation_data_path = params.pop('validation_data_path', None)
if validation_data_path is not None:
logger.info("Reading validation data from %s", validation_data_path)
validation_data = validation_and_test_dataset_reader.read(validation_data_path)
datasets["validation"] = validation_data
test_data_path = params.pop("test_data_path", None)
if test_data_path is not None:
logger.info("Reading test data from %s", test_data_path)
test_data = validation_and_test_dataset_reader.read(test_data_path)
datasets["test"] = test_data
return datasets
def _set_up_cache_files(train_params: Params,
validation_params: Params = None,
cache_directory: str = None,
cache_prefix: str = None) -> Tuple[str, str]:
if not cache_directory:
return None, None
# We need to compute the parameter hash before the parameters get destroyed when they're
# passed to `DatasetReader.from_params`.
if not cache_prefix:
cache_prefix = _dataset_reader_param_hash(train_params)
if validation_params:
validation_cache_prefix = _dataset_reader_param_hash(validation_params)
else:
validation_cache_prefix = cache_prefix
else:
validation_cache_prefix = cache_prefix
train_cache_dir = pathlib.Path(cache_directory) / cache_prefix
validation_cache_dir = pathlib.Path(cache_directory) / validation_cache_prefix
# For easy human inspection of what parameters were used to create the cache. This will
# overwrite old files, but they should be identical. This could bite someone who gave
# their own prefix instead of letting us compute it, and then _re-used_ that name with
# different parameters, without clearing the cache first. But correctly handling that case
# is more work than it's worth.
os.makedirs(train_cache_dir, exist_ok=True)
with open(train_cache_dir / 'params.json', 'w') as param_file:
json.dump(train_params.as_dict(quiet=True), param_file)
os.makedirs(validation_cache_dir, exist_ok=True)
with open(validation_cache_dir / 'params.json', 'w') as param_file:
if validation_params:
json.dump(validation_params.as_dict(quiet=True), param_file)
else:
json.dump(train_params.as_dict(quiet=True), param_file)
return str(train_cache_dir), str(validation_cache_dir)
def _dataset_reader_param_hash(params: Params) -> str:
copied_params = params.duplicate()
# Laziness doesn't affect how the data is computed, so it shouldn't affect the hash.
copied_params.pop('lazy', default=None)
return copied_params.get_hash()
def create_serialization_dir(
params: Params,
serialization_dir: str,
recover: bool,
force: bool) -> None:
"""
This function creates the serialization directory if it doesn't exist. If it already exists
and is non-empty, then it verifies that we're recovering from a training with an identical configuration.
Parameters
----------
params: ``Params``
A parameter object specifying an AllenNLP Experiment.
serialization_dir: ``str``
The directory in which to save results and logs.
recover: ``bool``
If ``True``, we will try to recover from an existing serialization directory, and crash if
the directory doesn't exist, or doesn't match the configuration we're given.
force: ``bool``
If ``True``, we will overwrite the serialization directory if it already exists.
"""
if recover and force:
raise ConfigurationError("Illegal arguments: both force and recover are true.")
if os.path.exists(serialization_dir) and force:
shutil.rmtree(serialization_dir)
if os.path.exists(serialization_dir) and os.listdir(serialization_dir):
if not recover:
raise ConfigurationError(f"Serialization directory ({serialization_dir}) already exists and is "
f"not empty. Specify --recover to recover training from existing output.")
logger.info(f"Recovering from prior training at {serialization_dir}.")
recovered_config_file = os.path.join(serialization_dir, CONFIG_NAME)
if not os.path.exists(recovered_config_file):
raise ConfigurationError("The serialization directory already exists but doesn't "
"contain a config.json. You probably gave the wrong directory.")
loaded_params = Params.from_file(recovered_config_file)
# Check whether any of the training configuration differs from the configuration we are
# resuming. If so, warn the user that training may fail.
fail = False
flat_params = params.as_flat_dict()
flat_loaded = loaded_params.as_flat_dict()
for key in flat_params.keys() - flat_loaded.keys():
logger.error(f"Key '{key}' found in training configuration but not in the serialization "
f"directory we're recovering from.")
fail = True
for key in flat_loaded.keys() - flat_params.keys():
logger.error(f"Key '{key}' found in the serialization directory we're recovering from "
f"but not in the training config.")
fail = True
for key in flat_params.keys():
if flat_params.get(key, None) != flat_loaded.get(key, None):
logger.error(f"Value for '{key}' in training configuration does not match that the value in "
f"the serialization directory we're recovering from: "
f"{flat_params[key]} != {flat_loaded[key]}")
fail = True
if fail:
raise ConfigurationError("Training configuration does not match the configuration we're "
"recovering from.")
else:
if recover:
raise ConfigurationError(f"--recover specified but serialization_dir ({serialization_dir}) "
"does not exist. There is nothing to recover from.")
os.makedirs(serialization_dir, exist_ok=True)
def data_parallel(batch_group: List[TensorDict],
model: Model,
cuda_devices: List) -> Dict[str, torch.Tensor]:
"""
Performs a forward pass using multiple GPUs. This is a simplification
of torch.nn.parallel.data_parallel to support the allennlp model
interface.
"""
assert len(batch_group) <= len(cuda_devices)
moved = [nn_util.move_to_device(batch, device)
for batch, device in zip(batch_group, cuda_devices)]
used_device_ids = cuda_devices[:len(moved)]
# Counterintuitively, it appears replicate expects the source device id to be the first element
# in the device id list. See torch.cuda.comm.broadcast_coalesced, which is called indirectly.
replicas = replicate(model, used_device_ids)
# We pass all our arguments as kwargs. Create a list of empty tuples of the
# correct shape to serve as (non-existent) positional arguments.
inputs = [()] * len(batch_group)
outputs = parallel_apply(replicas, inputs, moved, used_device_ids)
# Only the 'loss' is needed.
# a (num_gpu, ) tensor with loss on each GPU
losses = gather([output['loss'].unsqueeze(0) for output in outputs], used_device_ids[0], 0)
return {'loss': losses.mean()}
def enable_gradient_clipping(model: Model, grad_clipping: Optional[float]) -> None:
if grad_clipping is not None:
for parameter in model.parameters():
if parameter.requires_grad:
parameter.register_hook(lambda grad: nn_util.clamp_tensor(grad,
minimum=-grad_clipping,
maximum=grad_clipping))
def rescale_gradients(model: Model, grad_norm: Optional[float] = None) -> Optional[float]:
"""
Performs gradient rescaling. Is a no-op if gradient rescaling is not enabled.
"""
if grad_norm:
parameters_to_clip = [p for p in model.parameters()
if p.grad is not None]
return sparse_clip_norm(parameters_to_clip, grad_norm)
return None
def get_metrics(model: Model, total_loss: float, num_batches: int, reset: bool = False) -> Dict[str, float]:
"""
Gets the metrics but sets ``"loss"`` to
the total loss divided by the ``num_batches`` so that
the ``"loss"`` metric is "average loss per batch".
"""
metrics = model.get_metrics(reset=reset)
metrics["loss"] = float(total_loss / num_batches) if num_batches > 0 else 0.0
return metrics
def evaluate(model: Model,
instances: Iterable[Instance],
data_iterator: DataIterator,
cuda_device: int,
batch_weight_key: str) -> Dict[str, Any]:
check_for_gpu(cuda_device)
predict_results = []
with torch.no_grad():
model.eval()
iterator = data_iterator(instances,
num_epochs=1,
shuffle=False)
logger.info("Iterating over dataset")
generator_tqdm = Tqdm.tqdm(iterator, total=data_iterator.get_num_batches(instances))
# Number of batches in instances.
batch_count = 0
# Number of batches where the model produces a loss.
loss_count = 0
# Cumulative weighted loss
total_loss = 0.0
# Cumulative weight across all batches.
total_weight = 0.0
for batch in generator_tqdm:
batch_count += 1
batch = nn_util.move_to_device(batch, cuda_device)
output_dict = model(**batch)
loss = output_dict.get("loss")
predict_results += output_dict["dialog_act"]
metrics = model.get_metrics()
if loss is not None:
loss_count += 1
if batch_weight_key:
weight = output_dict[batch_weight_key].item()
else:
weight = 1.0
total_weight += weight
total_loss += loss.item() * weight
# Report the average loss so far.
metrics["loss"] = total_loss / total_weight
if (not HasBeenWarned.tqdm_ignores_underscores and
any(metric_name.startswith("_") for metric_name in metrics)):
logger.warning("Metrics with names beginning with \"_\" will "
"not be logged to the tqdm progress bar.")
HasBeenWarned.tqdm_ignores_underscores = True
description = ', '.join(["%s: %.2f" % (name, value) for name, value
in metrics.items() if not name.startswith("_")]) + " ||"
generator_tqdm.set_description(description, refresh=False)
final_metrics = model.get_metrics(reset=True)
if loss_count > 0:
# Sanity check
if loss_count != batch_count:
raise RuntimeError("The model you are trying to evaluate only sometimes " +
"produced a loss!")
final_metrics["loss"] = total_loss / total_weight
return final_metrics, predict_results
def description_from_metrics(metrics: Dict[str, float]) -> str:
if (not HasBeenWarned.tqdm_ignores_underscores and
any(metric_name.startswith("_") for metric_name in metrics)):
logger.warning("Metrics with names beginning with \"_\" will "
"not be logged to the tqdm progress bar.")
HasBeenWarned.tqdm_ignores_underscores = True
return ', '.join(["%s: %.4f" % (name, value)
for name, value in
metrics.items() if not name.startswith("_")]) + " ||"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment