Skip to content
Snippets Groups Projects
Unverified Commit 04197837 authored by Lin Hsien-Chin's avatar Lin Hsien-Chin Committed by GitHub
Browse files

Merge pull request #157 from ConvLab/emoUS

Add code for EmoUS model
parents 6f534d25 3818d39e
No related branches found
No related tags found
No related merge requests found
Showing
with 2041 additions and 18 deletions
......@@ -104,5 +104,9 @@ convlab/deploy/templates/dialog_eg.html
*convlab/policy/vector/action_dicts
*.egg-info
.eggs/*
pre-trained-models/
venv
*.zip
*/dummy_data.json
*.csv
\ No newline at end of file
......@@ -259,6 +259,8 @@ class PipelineAgent(Agent):
return self.input_action
def get_out_da(self):
if self.name == "user" and hasattr(self.policy, "semantic_action"):
return self.policy.semantic_action
return self.output_action
......
......@@ -27,7 +27,7 @@ class Environment():
s, r, t = self.step([])
return self.sys_dst.state
def step(self, action):
def step(self, action, user_reward=False):
# save last system action
self.sys_dst.state['system_action'] = action
if not self.use_semantic_acts:
......@@ -41,9 +41,9 @@ class Environment():
if intent == "book":
self.sys_dst.state['booked'][domain] = [{slot: value}]
observation = self.usr.response(model_response)
if self.evaluator:
self.evaluator.add_sys_da(self.usr.get_in_da(), self.sys_dst.state['belief_state'])
self.evaluator.add_sys_da(
self.usr.get_in_da(), self.sys_dst.state['belief_state'])
self.evaluator.add_usr_da(self.usr.get_out_da())
dialog_act = self.sys_nlu.predict(
......@@ -59,10 +59,12 @@ class Environment():
state = deepcopy(state)
terminated = self.usr.is_terminated()
if not user_reward:
if self.evaluator:
reward = self.evaluator.get_reward(terminated)
else:
reward = self.usr.get_reward()
else:
reward = self.usr.get_reward()
return state, reward, terminated
......@@ -263,7 +263,7 @@ class SetSUMBTTracker(DST):
new_state['turn_pooled_representation'] = outputs.turn_pooled_representation.reshape(-1)
self.state = new_state
self.info_dict['belief_state'] = copy.deepcopy(dict(new_state))
# self.info_dict['belief_state'] = copy.deepcopy(dict(new_state))
return self.state
......@@ -281,7 +281,8 @@ class SetSUMBTTracker(DST):
with torch.no_grad():
features['hidden_state'] = self.hidden_states
features['get_turn_pooled_representation'] = self.return_turn_pooled_representation
features['calculate_state_mutual_info'] = self.return_belief_state_mutual_info
mutual_info = self.return_belief_state_mutual_info or self.store_full_belief_state
features['calculate_state_mutual_info'] = mutual_info
outputs = self.model(**features)
self.hidden_states = outputs.hidden_state
......@@ -293,7 +294,6 @@ class SetSUMBTTracker(DST):
if self.store_full_belief_state:
self.info_dict['belief_state_distributions'] = outputs.belief_state
if state_mutual_info is not None:
self.info_dict['belief_state_knowledge_uncertainty'] = outputs.belief_state_mutual_information
# Obtain model output probabilities
......
......@@ -27,8 +27,10 @@ for dom, ref_slots in REF_SYS_DA.items():
REF_SYS_DA_M['taxi']['phone'] = 'phone'
REF_SYS_DA_M['taxi']['car'] = 'car type'
reverse_da = relative_import_module_from_unified_datasets('multiwoz21', 'preprocess.py', 'reverse_da')
reverse_da_slot_name_map = relative_import_module_from_unified_datasets('multiwoz21', 'preprocess.py', 'reverse_da_slot_name_map')
reverse_da = relative_import_module_from_unified_datasets(
'multiwoz21', 'preprocess.py', 'reverse_da')
reverse_da_slot_name_map = relative_import_module_from_unified_datasets(
'multiwoz21', 'preprocess.py', 'reverse_da_slot_name_map')
requestable = \
......
......@@ -46,7 +46,8 @@ class BERTNLU(NLU):
if not os.path.exists(output_dir):
model_downloader(root_dir, model_file)
model = JointBERT(config['model'], DEVICE, dataloader.tag_dim, dataloader.intent_dim)
model = JointBERT(config['model'], DEVICE,
dataloader.tag_dim, dataloader.intent_dim)
state_dict = torch.load(os.path.join(
output_dir, 'pytorch_model.bin'), DEVICE)
......@@ -97,7 +98,8 @@ class BERTNLU(NLU):
intents = []
da = {}
word_seq, tag_seq, new2ori = self.dataloader.bert_tokenize(ori_word_seq, ori_tag_seq)
word_seq, tag_seq, new2ori = self.dataloader.bert_tokenize(
ori_word_seq, ori_tag_seq)
word_seq = word_seq[:510]
tag_seq = tag_seq[:510]
batch_data = [[ori_word_seq, ori_tag_seq, intents, da, context_seq,
......
import json
import os
from argparse import ArgumentParser
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn import metrics
from tqdm import tqdm
from transformers import AutoModelForSequenceClassification, AutoTokenizer
def arg_parser():
parser = ArgumentParser()
parser.add_argument("--model", type=str, default="",
help="model name")
parser.add_argument("--data", type=str)
parser.add_argument("--gen-file", type=str)
return parser.parse_args()
def generate_result(model_checkpoint, data):
result = []
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
model = AutoModelForSequenceClassification.from_pretrained(
model_checkpoint)
data = pd.read_csv(data, index_col=False).astype(str)
# Neutral: 0, Negative: 1, Positive: 2
t2i = {'3': 0, '1': 1, '2': 1, '4': 2, '5': 2}
prefix = "satisfaction score: "
for input_text, target_text in tqdm(zip(data["input_text"], data["target_text"]), ascii=True):
if prefix in input_text:
text = input_text.replace(prefix, '')
target = t2i[target_text]
model_input = tokenizer(
[text], return_tensors="pt", padding=True)
output = model(input_ids=model_input["input_ids"],
attention_mask=model_input["attention_mask"])
output = int(np.argmax(output, axis=-1))
result.append({"input_text": text,
"preds": output,
"label": target})
json.dump(result, open(os.path.join(
model_checkpoint, "uss_result.json"), 'w'))
return result
def read_result(result):
preds = []
label = []
for r in result:
preds.append(r["preds"])
label.append(r["label"])
return preds, label
def main():
args = arg_parser()
if args.gen_file:
preds, label = read_result(json.load(open(args.gen_file)))
else:
results = generate_result(args.model, args.data)
preds, label = read_result(results)
macro_f1 = metrics.f1_score(label, preds, average="macro")
sep_f1 = metrics.f1_score(
label, preds, average=None,
labels=[0, 1, 2])
cm = metrics.confusion_matrix(
label, preds, normalize="true",
labels=[0, 1, 2])
print("Neutral: 0, Negative: 1, Positive: 2")
print("cm", cm)
print("f1", sep_f1)
print("macro", macro_f1)
if __name__ == "__main__":
main()
from datasets import Dataset
from transformers import AutoTokenizer
model_checkpoint = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
raw_data = {
"train": [{"label": 0, "text": "hi how are you"},
{"label": 1, "text": "i'm fine thank you"}, ],
"test": [{"label": 0, "text": "hi how are you"},
{"label": 1, "text": "i'm fine thank you"}, ]}
data = {}
for x in raw_data:
data[x] = Dataset.from_list(raw_data[x])
def tokenize_function(examples):
print(examples)
return tokenizer(examples["text"], padding="max_length", truncation=True)
t = data["train"].map(tokenize_function, batched=True)
print(t)
from argparse import ArgumentParser
import numpy as np
from transformers import AutoModelForSequenceClassification, AutoTokenizer
def arg_parser():
parser = ArgumentParser()
parser.add_argument("--model", type=str, default="",
help="model name")
parser.add_argument("--data", type=str)
parser.add_argument("--gen-file", type=str)
return parser.parse_args()
def main():
args = arg_parser()
model_checkpoint = args.model
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
model = AutoModelForSequenceClassification.from_pretrained(
model_checkpoint)
input_text = "Yeah, I think we are. This isn't even my dress."
inputs = tokenizer([input_text], return_tensors="pt", padding=True)
output = model(input_ids=inputs["input_ids"],
attention_mask=inputs["attention_mask"])
print(np.argmax(output, axis=-1))
if __name__ == "__main__":
main()
import os
import random
from argparse import ArgumentParser
import json
import numpy as np
import torch
from datasets import load_metric, Dataset
from sklearn.model_selection import train_test_split
from transformers import (AutoModelForSequenceClassification, AutoTokenizer,
Trainer, TrainingArguments)
def arg_parser():
parser = ArgumentParser()
parser.add_argument("--data", type=str, default="",
help="input data")
parser.add_argument("--batch", type=int, default=2,
help="batch size")
return parser.parse_args()
def set_seed(r_seed):
random.seed(r_seed)
np.random.seed(r_seed)
torch.manual_seed(r_seed)
def read_data(data_dir):
print("data_dir", data_dir)
subfix = {"train": "trn", "validation": "dev", "test": "tst"}
files = {}
data = {}
for data_split, sub in subfix.items():
data[data_split] = parse_data(json.load(
open(os.path.join(data_dir, f"emotion-detection-{sub}.json"))))
return data
def parse_data(data):
emo2label = {
"Neutral": 0,
"Scared": 1,
"Mad": 1,
"Sad": 1,
"Joyful": 2,
"Peaceful": 2,
"Powerful": 2
}
d = []
for episode in data["episodes"]:
for scene in episode["scenes"]:
for r in range(len(scene["utterances"])-1):
text = ' '.join([scene["utterances"][r]["transcript"],
scene["utterances"][r+1]["transcript"]])
label = emo2label.get(
scene["utterances"][r+1]["emotion"], "Neutral")
d.append({"label": label, "text": text})
return d
def main():
args = arg_parser()
base_name = "convlab/policy/USMDA"
model_checkpoint = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
model = AutoModelForSequenceClassification.from_pretrained(
model_checkpoint, num_labels=3)
metric = load_metric("accuracy")
fp16 = False
if torch.cuda.is_available():
print("use cuda")
fp16 = True
model.to("cuda")
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True)
emory_data = read_data(args.data)
folder_name = os.path.join(base_name, "data")
if not os.path.exists(folder_name):
os.makedirs(folder_name)
json.dump(emory_data, open(os.path.join(folder_name, "data.json"), 'w'))
data = {}
for data_split, d in emory_data.items():
d = Dataset.from_list(d)
data[data_split] = d.map(tokenize_function, batched=True)
model_dir = os.path.join(base_name, "model")
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
return metric.compute(predictions=predictions, references=labels)
training_args = TrainingArguments(
output_dir=model_dir,
learning_rate=2e-5,
per_device_train_batch_size=args.batch,
per_device_eval_batch_size=args.batch,
evaluation_strategy="epoch",
num_train_epochs=2,
fp16=fp16)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=data["train"],
eval_dataset=data["test"],
compute_metrics=compute_metrics,)
trainer.train()
trainer.save_model()
if __name__ == "__main__":
main()
import json
import os
from argparse import ArgumentParser
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
result_dir = "convlab/policy/emoUS/result"
def arg_parser():
parser = ArgumentParser()
parser.add_argument("--file", type=str, help="the conversation file")
return parser.parse_args()
def basic_analysis(conversation):
info = {"Complete": [], "Success": [], "Success strict": [], "turns": []}
for dialog in conversation:
for x in info:
info[x].append(dialog[x])
for x in info:
info[x] = np.mean(info[x])
return info
def advance(conversation):
info = {}
for dialog in conversation:
temp = turn_level(dialog["log"])
for metric, data in temp.items():
if metric not in info:
info[metric] = {}
for emotion, count in data.items():
if emotion not in info[metric]:
info[metric][emotion] = 0
info[metric][emotion] += count
return info
def get_turn_emotion(conversation):
""" Get the emotion of each turn in the conversation
Args:
conversation (list): a list of dialog
Returns:
turn_emotion (list): a list of emotion of each turn
"""
turn_info = {"all": {},
"Complete": {}, "Not Complete": {},
"Success": {}, "Not Success": {},
"Success strict": {}, "Not Success strict": {}}
max_turn = 0
for dialog in conversation:
for i in range(0, len(dialog["log"]), 2):
turn = int(i / 2)
if turn > max_turn:
max_turn = turn
emotion = emotion_score(dialog["log"][i]["emotion"])
insert_turn(turn_info["all"], turn, emotion)
for metric in ["Complete", "Success", "Success strict"]:
if dialog[metric]:
insert_turn(turn_info[metric], turn, emotion)
else:
insert_turn(turn_info[f"Not {metric}"], turn, emotion)
print("MAX_TURN", max_turn)
data = {'x': [t for t in range(max_turn)],
'all_positive': [],
'all_negative': [],
'all_mean': [],
'all_std': []}
for metric in ["Complete", "Success", "Success strict"]:
data[f"{metric}_positive"] = []
data[f"{metric}_negative"] = []
data[f"{metric}_mean"] = []
data[f"{metric}_std"] = []
data[f"Not {metric}_positive"] = []
data[f"Not {metric}_negative"] = []
data[f"Not {metric}_mean"] = []
data[f"Not {metric}_std"] = []
for t in range(turn):
pos, neg, mean, std = turn_score(turn_info["all"][t])
data[f"all_positive"].append(pos)
data[f"all_negative"].append(neg)
data[f"all_mean"].append(mean)
data[f"all_std"].append(std)
for raw_metric in ["Complete", "Success", "Success strict"]:
for metric in [raw_metric, f"Not {raw_metric}"]:
if t not in turn_info[metric]:
data[f"{metric}_positive"].append(0)
data[f"{metric}_negative"].append(0)
data[f"{metric}_mean"].append(0)
data[f"{metric}_std"].append(0)
else:
pos, neg, mean, std = turn_score(turn_info[metric][t])
data[f"{metric}_positive"].append(pos)
data[f"{metric}_negative"].append(neg)
data[f"{metric}_mean"].append(mean)
data[f"{metric}_std"].append(std)
for x in data:
data[x] = np.array(data[x])
fig, ax = plt.subplots(figsize=(6.0, 2.5))
p = {"Complete": {"color": "C0", "label": "Success"},
"Not Complete": {"color": "C1", "label": "Fail"},
"all": {"color": "C2", "label": "all"}}
for name, para in p.items():
ax.plot(data['x'],
data[f"{name}_mean"],
'o--',
color=para["color"],
label=para["label"])
ax.fill_between(data['x'],
data[f"{name}_mean"]+data[f"{name}_std"],
data[f"{name}_mean"]-data[f"{name}_std"],
color=para["color"], alpha=0.2)
ax.legend()
ax.set_xlabel("turn")
ax.set_ylabel("Sentiment")
ax.set_xticks([t for t in range(0, max_turn, 2)])
plt.grid(axis='x', color='0.95')
plt.grid(axis='y', color='0.95')
# plt.show()
plt.tight_layout()
plt.savefig(os.path.join(result_dir, "turn2emotion.png"))
def turn_score(score_list):
count = len(score_list)
positive = 0
negative = 0
for s in score_list:
if s > 0:
positive += 1
if s < 0:
negative += -1
return positive/count, negative/count, np.mean(score_list), np.std(score_list, ddof=1)/np.sqrt(len(score_list))
def insert_turn(turn_info, turn, emotion):
if turn not in turn_info:
turn_info[turn] = []
turn_info[turn].append(emotion)
def emotion_score(emotion):
if emotion == "Neutral":
return 0
if emotion in ["Satisfied", "Excited"]:
return 1
return -1
def plot(conversation):
pass
def turn_level(dialog):
# metric: {emotion: count}
dialog_info = {}
for index in range(2, len(dialog), 2):
pre_usr = dialog[index-2]
sys = dialog[index-1]
cur_usr = dialog[index]
info = neglect_reply(pre_usr, sys, cur_usr)
append_info(dialog_info, info)
info = confirm(pre_usr, sys, cur_usr)
append_info(dialog_info, info)
info = miss_info(pre_usr, sys, cur_usr)
append_info(dialog_info, info)
if index > 2:
info = loop(dialog[index-3], sys, cur_usr)
append_info(dialog_info, info)
return dialog_info
# provide wrong info
# action length
# incomplete info?
def append_info(dialog_info, info):
if not info:
return
for emotion, metric in info.items():
if metric not in dialog_info:
dialog_info[metric] = {}
if emotion not in dialog_info[metric]:
dialog_info[metric][emotion] = 0
dialog_info[metric][emotion] += 1
def get_inform(act):
inform = {}
for intent, domain, slot, value in act:
if intent not in ["inform", "recommend"]:
continue
if domain not in inform:
inform[domain] = []
inform[domain].append(slot)
return inform
def get_request(act):
request = {}
for intent, domain, slot, _ in act:
if intent == "request":
if domain not in request:
request[domain] = []
request[domain].append(slot)
return request
def neglect_reply(pre_usr, sys, cur_usr):
request = get_request(pre_usr["act"])
if not request:
return {}
system_inform = get_inform(sys["utt"])
for domain, slots in request.items():
if domain not in system_inform:
return {cur_usr["emotion"]: "neglect"}
for slot in slots:
if slot not in system_inform[domain]:
return {cur_usr["emotion"]: "neglect"}
return {cur_usr["emotion"]: "reply"}
def miss_info(pre_usr, sys, cur_usr):
system_request = get_request(sys["utt"])
if not system_request:
return {}
user_inform = get_inform(pre_usr["act"])
for domain, slots in system_request.items():
if domain not in user_inform:
continue
for slot in slots:
if slot in user_inform[domain]:
return {cur_usr["emotion"]: "miss_info"}
return {}
def confirm(pre_usr, sys, cur_usr):
user_inform = get_inform(pre_usr["act"])
if not user_inform:
return {}
system_inform = get_inform(sys["utt"])
for domain, slots in user_inform.items():
if domain not in system_inform:
continue
for slot in slots:
if slot in system_inform[domain]:
return {cur_usr["emotion"]: "confirm"}
return {cur_usr["emotion"]: "no confirm"}
def loop(s0, s1, u1):
if s0 == s1:
return {u1["emotion"]: "loop"}
def dict2csv(data):
r = {}
emotion = json.load(open("convlab/policy/emoUS/emotion.json"))
for act, value in data.items():
temp = [0]*(len(emotion)+1)
for emo, count in value.items():
temp[emotion[emo]] = count
temp[-1] = sum(temp)
for i in range(len(emotion)):
temp[i] /= temp[-1]
r[act] = temp
dataframe = pd.DataFrame.from_dict(
r, orient='index', columns=[emo for emo in emotion]+["count"])
dataframe.to_csv(open(os.path.join(result_dir, "act2emotion.csv"), 'w'))
def main():
args = arg_parser()
result = {}
if not os.path.exists(result_dir):
os.makedirs(result_dir)
conversation = json.load(open(args.file))["conversation"]
# basic_info = basic_analysis(conversation)
# result["basic_info"] = basic_info
# print(basic_info)
# advance_info = advance(conversation)
# print(advance_info)
# result["advance_info"] = advance_info
# json.dump(result, open(
# os.path.join("conversation_result.json"), 'w'), indent=2)
# dict2csv(advance_info)
get_turn_emotion(conversation)
if __name__ == "__main__":
main()
from argparse import ArgumentParser
from tqdm import tqdm
from convlab.policy.rule.multiwoz import RulePolicy
from convlab.task.multiwoz.goal_generator import GoalGenerator
from convlab.util.custom_util import (create_goals, data_goals, env_config,
get_config, set_seed)
def arg_parser():
parser = ArgumentParser()
parser.add_argument("--config", type=str, help="the model path")
parser.add_argument("-N", "--num", type=int,
default=500, help="# of evaluation dialogue")
parser.add_argument("--model", type=str,
default="ppo", help="# of evaluation dialogue")
return parser.parse_args()
def interact(model_name, config, seed=0, num_goals=500):
conversation = []
set_seed(seed)
conf = get_config(config, [])
if model_name == "rule":
policy_sys = RulePolicy()
elif model_name == "ppo":
from convlab.policy.ppo import PPO
policy_sys = PPO(vectorizer=conf['vectorizer_sys_activated'])
model_path = conf['model']['load_path']
if model_path:
policy_sys.load(model_path)
env, sess = env_config(conf, policy_sys)
goal_generator = GoalGenerator()
goals = create_goals(goal_generator, num_goals=num_goals,
single_domains=False, allowed_domains=None)
for seed in tqdm(range(1000, 1000 + num_goals)):
dialogue = {"seed": seed, "log": []}
set_seed(seed)
sess.init_session(goal=goals[seed-1000])
sys_response = []
actions = 0.0
total_return = 0.0
turns = 0
task_succ = 0
task_succ_strict = 0
complete = 0
dialogue["goal"] = env.usr.policy.policy.goal.domain_goals
dialogue["user info"] = env.usr.policy.policy.user_info
for i in range(40):
sys_response, user_response, session_over, reward = sess.next_turn(
sys_response)
dialogue["log"].append(
{"role": "usr",
"utt": user_response,
"emotion": env.usr.policy.policy.emotion,
"act": env.usr.policy.policy.semantic_action})
dialogue["log"].append({"role": "sys", "utt": sys_response})
# logging.info(f"Actions in turn: {len(sys_response)}")
turns += 1
total_return += sess.evaluator.get_reward(session_over)
if session_over:
task_succ = sess.evaluator.task_success()
task_succ = sess.evaluator.success
task_succ_strict = sess.evaluator.success_strict
complete = sess.evaluator.complete
break
dialogue['Complete'] = complete
dialogue['Success'] = task_succ
dialogue['Success strict'] = task_succ_strict
dialogue['total_return'] = total_return
dialogue['turns'] = turns
conversation.append(dialogue)
return conversation
if __name__ == "__main__":
import json
from datetime import datetime
import os
time = f"{datetime.now().strftime('%y-%m-%d-%H-%M')}"
args = arg_parser()
conversation = interact(model_name=args.model,
config=args.config,
num_goals=args.num)
data = {"config": json.load(open(args.config)),
"conversation": conversation}
folder_name = os.path.join("convlab/policy/emoUS", "conversation")
if not os.path.exists(folder_name):
os.makedirs(folder_name)
json.dump(data,
open(os.path.join(folder_name, f"{time}.json"), 'w'),
indent=2)
{
"model": {
"load_path": "convlab/policy/ppo/finished_experiments/history/NLGEmoUS/experiment_2023-01-19-17-56-38/save/best_ppo",
"pretrained_load_path": "",
"use_pretrained_initialisation": false,
"batchsz": 200,
"seed": 0,
"epoch": 100,
"eval_frequency": 5,
"process_num": 1,
"num_eval_dialogues": 20,
"sys_semantic_to_usr": false
},
"vectorizer_sys": {
"uncertainty_vector_mul": {
"class_path": "convlab.policy.vector.vector_binary.VectorBinary",
"ini_params": {
"use_masking": true,
"manually_add_entity_names": true,
"seed": 0
}
}
},
"nlu_sys": {
"BertNLU": {
"class_path": "convlab.nlu.jointBERT.unified_datasets.BERTNLU",
"ini_params": {
"mode": "all",
"config_file": "multiwoz21_all.json",
"model_file": "https://huggingface.co/ConvLab/bert-base-nlu/resolve/main/bertnlu_unified_multiwoz21_all_context0.zip"
}
}
},
"dst_sys": {
"RuleDST": {
"class_path": "convlab.dst.rule.multiwoz.dst.RuleDST",
"ini_params": {}
}
},
"sys_nlg": {},
"nlu_usr": {},
"dst_usr": {},
"policy_usr": {
"emoUS": {
"class_path": "convlab.policy.emoUS.emoUS.UserPolicy",
"ini_params": {
"model_checkpoint": "convlab/policy/emoUS/unify/experiments/EmoUS_emowoz+dialmage_0_1/23-01-23-15-03/",
"use_sentiment": false,
"add_persona": true,
"sample": false,
"weight": 1
}
}
},
"usr_nlg": {}
}
\ No newline at end of file
import os
import json
import torch
from convlab.policy.emoUS.token_map import tokenMap
from convlab.policy.emoUS.unify.knowledge_graph import KnowledgeGraph
from convlab.policy.genTUS.stepGenTUS import \
UserActionPolicy as GenTUSUserActionPolicy
from convlab.policy.policy import Policy
from convlab.util.custom_util import model_downloader
from convlab.policy.emoUS.unify.Goal import Goal
DEBUG = False
class UserActionPolicy(GenTUSUserActionPolicy):
def __init__(self, model_checkpoint, mode="language", max_turn=40, **kwargs):
self.use_sentiment = kwargs.get("use_sentiment", False)
self.add_persona = kwargs.get("add_persona", True)
self.emotion_mid = kwargs.get("emotion_mid", False)
if not os.path.exists(os.path.dirname(model_checkpoint)):
os.makedirs(os.path.dirname(model_checkpoint))
model_downloader(os.path.dirname(model_checkpoint),
"https://zenodo.org/record/7801525/files/EmoUS_default.zip")
if mode == "language":
only_action = False
elif mode == "semantic":
only_action = True
else:
raise ValueError("mode should be language or semantic")
super().__init__(model_checkpoint, mode, only_action, max_turn, **kwargs)
weight = kwargs.get("weight", None)
self.kg = KnowledgeGraph(
tokenizer=self.tokenizer,
dataset="emowoz",
use_sentiment=self.use_sentiment,
weight=weight)
data_emotion = json.load(open("convlab/policy/emoUS/emotion.json"))
self.emotion_list = [""]*len(data_emotion)
for emotion, index in data_emotion.items():
self.emotion_list[index] = emotion
self.init_session()
def predict(self, sys_act, mode="max", allow_general_intent=True, emotion=None):
allow_general_intent = False
self.model.eval()
if not self.add_sys_from_reward:
self.goal.update_user_goal(action=sys_act, char="sys")
self.sys_acts.append(sys_act) # for terminate conversation
# update constraint
self.time_step += 2
history = []
if self.usr_acts:
if self.max_history == 1:
history = self.usr_acts[-1]
else:
history = self.usr_acts[-1*self.max_history:]
input_dict = {"system": sys_act,
"goal": self.goal.get_goal_list(),
"history": history,
"turn": str(int(self.time_step/2))}
if self.add_persona:
for user, info in self.user_info.items():
input_dict[user] = info
inputs = json.dumps(input_dict)
with torch.no_grad():
if emotion == "all":
raw_output = self.generate_from_emotion(
raw_inputs=inputs, mode=mode, allow_general_intent=allow_general_intent)
for emo in raw_output:
output = self._parse_output(raw_output[emo])
print("emo:", emo)
print("act:", output["action"])
print("utt:", output["text"])
raw_output = raw_output["Neutral"]
elif emotion is not None:
raw_output = self.generate_from_emotion(
raw_inputs=inputs, emotion=emotion, mode=mode, allow_general_intent=allow_general_intent)
for emo in raw_output:
output = self._parse_output(raw_output[emo])
print("emo:", emo)
print("act:", output["action"])
print("utt:", output["text"])
raw_output = raw_output[emotion]
else:
raw_output = self._generate_action(
raw_inputs=inputs, mode=mode, allow_general_intent=allow_general_intent)
output = self._parse_output(raw_output)
self.semantic_action = self._remove_illegal_action(output["action"])
if not self.only_action:
self.utterance = output["text"]
self.emotion = output["emotion"]
if self.use_sentiment:
self.sentiment = output["sentiment"]
if self.is_finish():
self.emotion, self.semantic_action, self.utterance = self._good_bye()
if self.use_sentiment:
self.sentiment = "Neutral"
self.goal.update_user_goal(action=self.semantic_action, char="usr")
self.vector.update_mentioned_domain(self.semantic_action)
self.usr_acts.append(self.semantic_action)
del inputs
if self.only_action:
return self.semantic_action
return self.utterance
def _parse_output(self, in_str):
in_str = str(in_str)
in_str = in_str.replace('<s>', '').replace(
'<\\s>', '').replace('o"clock', "o'clock")
action = {"emotion": "Neutral", "action": [], "text": ""}
if self.use_sentiment:
action["sentiment"] = "Neutral"
try:
action = json.loads(in_str)
except:
print("invalid action:", in_str)
print("-"*20)
return action
def _update_sentiment(self, pos, model_input, mode):
pos = self._update_seq(
self.token_map.get_id('start_sentiment'), pos)
sentiment = self._get_sentiment(
model_input, self.seq[:1, :pos], mode)
pos = self._update_seq(sentiment["token_id"], pos)
return sentiment, pos
def _update_emotion(self, pos, model_input, mode, emotion_mode, sentiment=None):
pos = self._update_seq(
self.token_map.get_id('start_emotion'), pos)
emotion = self._get_emotion(
model_input, self.seq[:1, :pos], mode, emotion_mode, sentiment)
pos = self._update_seq(emotion["token_id"], pos)
return pos
def _update_semantic_act(self, pos, model_input, mode, allow_general_intent):
mode = "max"
for act_len in range(self.max_action_len):
pos = self._get_semantic_action(
model_input, pos, mode, allow_general_intent)
terminate, token_name = self._stop_semantic(
model_input, pos, act_len)
pos = self._update_seq(self.token_map.get_id(token_name), pos)
if terminate:
break
return pos
def _sent_act_emo(self, pos, model_input, mode, emotion_mode, allow_general_intent):
# sent
sentiment, pos = self._update_sentiment(pos, model_input, mode)
pos = self._update_seq(self.token_map.get_id('sep_token'), pos)
# act
pos = self._update_seq(self.token_map.get_id('start_act'), pos)
pos = self._update_semantic_act(
pos, model_input, mode, allow_general_intent)
# emo
pos = self._update_emotion(
pos, model_input, mode, emotion_mode, sentiment["token_name"])
pos = self._update_seq(self.token_map.get_id('sep_token'), pos)
return pos
def _sent_emo_act(self, pos, model_input, mode, emotion_mode, allow_general_intent):
# sent
sentiment, pos = self._update_sentiment(pos, model_input, mode)
pos = self._update_seq(self.token_map.get_id('sep_token'), pos)
# emo
pos = self._update_emotion(
pos, model_input, mode, emotion_mode, sentiment["token_name"])
pos = self._update_seq(self.token_map.get_id('sep_token'), pos)
# act
pos = self._update_seq(self.token_map.get_id('start_act'), pos)
pos = self._update_semantic_act(
pos, model_input, mode, allow_general_intent)
return pos
def _emo_act(self, pos, model_input, mode, emotion_mode, allow_general_intent):
# emo
pos = self._update_emotion(
pos, model_input, mode, emotion_mode)
pos = self._update_seq(self.token_map.get_id('sep_token'), pos)
# act
pos = self._update_seq(self.token_map.get_id('start_act'), pos)
pos = self._update_semantic_act(
pos, model_input, mode, allow_general_intent)
return pos
def _act_emo(self, pos, model_input, mode, emotion_mode, allow_general_intent):
# act
pos = self._update_seq(self.token_map.get_id('start_act'), pos)
pos = self._update_semantic_act(
pos, model_input, mode, allow_general_intent)
# emo
pos = self._update_emotion(
pos, model_input, mode, emotion_mode)
pos = self._update_seq(self.token_map.get_id('sep_token'), pos)
return pos
def _generate_action(self, raw_inputs, mode="max", allow_general_intent=True, emotion_mode="normal"):
self.kg.parse_input(raw_inputs)
model_input = self.vector.encode(raw_inputs, self.max_in_len)
# start token
self.seq = torch.zeros(1, self.max_out_len, device=self.device).long()
pos = self._update_seq([0], 0)
pos = self._update_seq(self.token_map.get_id('start_json'), pos)
if self.use_sentiment and self.emotion_mid:
pos = self._sent_act_emo(
pos, model_input, mode, emotion_mode, allow_general_intent)
elif self.use_sentiment and not self.emotion_mid:
pos = self._sent_emo_act(
pos, model_input, mode, emotion_mode, allow_general_intent)
elif not self.use_sentiment and self.emotion_mid:
pos = self._act_emo(
pos, model_input, mode, emotion_mode, allow_general_intent)
else: # defalut method
pos = self._emo_act(
pos, model_input, mode, emotion_mode, allow_general_intent)
if self.only_action:
# return semantic action. Don't need to generate text
return self.vector.decode(self.seq[0, :pos])
pos = self._update_seq(self.token_map.get_id("start_text"), pos)
text = self._get_text(model_input, pos)
return text
def generate_from_emotion(self, raw_inputs, emotion=None, mode="max", allow_general_intent=True):
self.kg.parse_input(raw_inputs)
model_input = self.vector.encode(raw_inputs, self.max_in_len)
responses = {}
if emotion:
emotion_list = [emotion]
else:
emotion_list = self.emotion_list
for emotion in emotion_list:
# start token
self.seq = torch.zeros(1, self.max_out_len,
device=self.device).long()
pos = self._update_seq([0], 0)
pos = self._update_seq(self.token_map.get_id('start_json'), pos)
pos = self._update_seq(
self.token_map.get_id('start_emotion'), pos)
pos = self._update_seq(self.kg._get_token_id(emotion), pos)
pos = self._update_seq(self.token_map.get_id('sep_token'), pos)
pos = self._update_seq(self.token_map.get_id('start_act'), pos)
# get semantic actions
for act_len in range(self.max_action_len):
pos = self._get_semantic_action(
model_input, pos, mode, allow_general_intent)
terminate, token_name = self._stop_semantic(
model_input, pos, act_len)
pos = self._update_seq(self.token_map.get_id(token_name), pos)
if terminate:
break
if self.only_action:
return self.vector.decode(self.seq[0, :pos])
pos = self._update_seq(self.token_map.get_id("start_text"), pos)
text = self._get_text(model_input, pos)
responses[emotion] = text
return responses
def generate_text_from_give_semantic(self, raw_inputs, semantic_action, emotion="Neutral"):
self.kg.parse_input(raw_inputs)
model_input = self.vector.encode(raw_inputs, self.max_in_len)
self.seq = torch.zeros(1, self.max_out_len, device=self.device).long()
pos = self._update_seq([0], 0)
pos = self._update_seq(self.token_map.get_id('start_json'), pos)
pos = self._update_seq(
self.token_map.get_id('start_emotion'), pos)
pos = self._update_seq(self.kg._get_token_id(emotion), pos)
pos = self._update_seq(self.token_map.get_id('sep_token'), pos)
pos = self._update_seq(self.token_map.get_id('start_act'), pos)
if len(semantic_action) == 0:
pos = self._update_seq(self.token_map.get_id("end_act"), pos)
for act_id, (intent, domain, slot, value) in enumerate(semantic_action):
pos = self._update_seq(self.kg._get_token_id(intent), pos)
pos = self._update_seq(self.token_map.get_id('sep_token'), pos)
pos = self._update_seq(self.kg._get_token_id(domain), pos)
pos = self._update_seq(self.token_map.get_id('sep_token'), pos)
pos = self._update_seq(self.kg._get_token_id(slot), pos)
pos = self._update_seq(self.token_map.get_id('sep_token'), pos)
pos = self._update_seq(self.kg._get_token_id(value), pos)
if act_id == len(semantic_action) - 1:
token_name = "end_act"
else:
token_name = "sep_act"
pos = self._update_seq(self.token_map.get_id(token_name), pos)
pos = self._update_seq(self.token_map.get_id("start_text"), pos)
raw_output = self._get_text(model_input, pos)
return self._parse_output(raw_output)["text"]
def _get_sentiment(self, model_input, generated_so_far, mode="max"):
next_token_logits = self.model.get_next_token_logits(
model_input, generated_so_far)
return self.kg.get_sentiment(next_token_logits, mode)
def _get_emotion(self, model_input, generated_so_far, mode="max", emotion_mode="normal", sentiment=None):
mode = "max" # emotion is always max
next_token_logits = self.model.get_next_token_logits(
model_input, generated_so_far)
return self.kg.get_emotion(next_token_logits, mode, emotion_mode, sentiment)
def _get_intent(self, model_input, generated_so_far, mode="max", allow_general_intent=True):
next_token_logits = self.model.get_next_token_logits(
model_input, generated_so_far)
return self.kg.get_intent(next_token_logits, mode, allow_general_intent)
def init_session(self, goal=None):
self.token_map = tokenMap(
tokenizer=self.tokenizer, use_sentiment=self.use_sentiment)
self.token_map.default(only_action=self.only_action)
self.time_step = 0
remove_domain = "police" # remove police domain in inference
if not goal:
self._new_goal(remove_domain=remove_domain)
else:
self._read_goal(goal)
self.vector.init_session(goal=self.goal)
self.terminated = False
self.add_sys_from_reward = False
self.sys_acts = []
self.usr_acts = []
self.semantic_action = []
self.utterance = ""
self.emotion = "Neutral"
# TODO sentiment? event? user?
self.user_info = self.goal.emotion_info()
def _read_goal(self, data_goal):
self.goal = Goal(goal=data_goal)
def _new_goal(self, remove_domain="police", domain_len=None):
self.goal = Goal(goal_generator=self.goal_gen)
def _good_bye(self):
# add emotion
if self.is_success():
return "Satisfied", [['thank', 'general', 'none', 'none']], "thank you. bye"
else:
return "Dissatisfied", [["bye", "general", "None", "None"]], "bye"
def get_reward(self):
if self.is_finish():
if self.is_success():
reward = self.reward["success"]
self.success = True
else:
reward = self.reward["fail"]
self.success = False
else:
reward = -1
if self.use_sentiment:
if self.sentiment == "Positive":
reward += 1
elif self.sentiment == "Negative":
reward -= 1
self.success = None
return reward
class UserPolicy(Policy):
def __init__(self,
model_checkpoint="convlab/policy/emoUS/unify/default/EmoUS_default",
mode="language",
sample=False,
action_penalty=False,
**kwargs):
# self.config = config
print("emoUS model checkpoint: ", model_checkpoint)
if sample:
print("EmoUS will sample action, but emotion is always max")
if not os.path.exists(os.path.dirname(model_checkpoint)):
os.makedirs(os.path.dirname(model_checkpoint))
model_downloader(os.path.dirname(model_checkpoint),
"https://zenodo.org/record/7801525/files/EmoUS_default.zip")
self.policy = UserActionPolicy(
model_checkpoint,
mode=mode,
action_penalty=action_penalty,
**kwargs)
self.policy.load(os.path.join(
model_checkpoint, "pytorch_model.bin"))
self.sample = sample
def predict(self, sys_act, mode="max"):
if self.sample:
mode = "sample"
else:
mode = "max"
response = self.policy.predict(sys_act, mode)
self.semantic_action = self.policy.semantic_action
return response
def init_session(self, goal=None):
self.policy.init_session(goal)
self.semantic_action = []
def is_terminated(self):
return self.policy.is_terminated()
def get_reward(self):
return self.policy.get_reward()
def get_goal(self):
if hasattr(self.policy, 'get_goal'):
return self.policy.get_goal()
return None
def get_emotion(self):
return self.policy.emotion
if __name__ == "__main__":
import os
from convlab.dialog_agent import PipelineAgent
from convlab.util.custom_util import set_seed
import time
use_sentiment, emotion_mid = False, False
set_seed(100)
# Test semantic level behaviour
usr_policy = UserPolicy(
# model_checkpoint, # default location = convlab/policy/emoUS/unify/default/EmoUS_default
mode="semantic",
sample=True,
use_sentiment=use_sentiment,
emotion_mid=emotion_mid)
# usr_policy.policy.load(os.path.join(model_checkpoint, "pytorch_model.bin"))
usr_nlu = None # BERTNLU()
usr = PipelineAgent(usr_nlu, None, usr_policy, None, name='user')
usr.init_session()
usr.init_session()
print(usr.policy.get_goal())
start = time.time()
# print(usr.policy.policy.goal.status)
print(usr.response([['inform', 'train', 'day', 'saturday']]),
usr.policy.get_emotion())
# print(usr.policy.policy.goal.status)
print(usr.response([]),
usr.policy.get_emotion())
end = time.time()
print("-"*50)
print("time: ", end - start)
# print(usr.policy.policy.goal.status)
{
"Neutral": 0,
"Fearful": 1,
"Dissatisfied": 2,
"Apologetic": 3,
"Abusive": 4,
"Excited": 5,
"Satisfied": 6
}
\ No newline at end of file
import json
import os
import sys
from argparse import ArgumentParser
from datetime import datetime
import matplotlib.pyplot as plt
import torch
from datasets import load_metric
from sklearn import metrics
from tqdm import tqdm
from convlab.nlg.evaluate import fine_SER
from convlab.policy.emoUS.emoUS import UserActionPolicy
sys.path.append(os.path.dirname(os.path.dirname(
os.path.dirname(os.path.abspath(__file__)))))
def arg_parser():
parser = ArgumentParser()
parser.add_argument("--model-checkpoint", type=str, help="the model path")
parser.add_argument("--input-file", type=str, help="the testing input file",
default="")
parser.add_argument("--generated-file", type=str, help="the generated results",
default="")
parser.add_argument("--dataset", default="multiwoz")
# model parameter
parser.add_argument("--use-sentiment", action="store_true")
parser.add_argument("--emotion-mid", action="store_true")
parser.add_argument("--weight", type=float, default=None)
parser.add_argument("--sample", action="store_true")
return parser.parse_args()
class Evaluator:
def __init__(self, model_checkpoint, dataset, **kwargs):
self.dataset = dataset
self.model_checkpoint = model_checkpoint
self.time = f"{datetime.now().strftime('%y-%m-%d-%H-%M')}"
self.use_sentiment = kwargs.get("use_sentiment", False)
self.add_persona = kwargs.get("add_persona", True)
self.emotion_mid = kwargs.get("emotion_mid", False)
weight = kwargs.get("weight", None)
self.sample = kwargs.get("sample", False)
self.usr = UserActionPolicy(
model_checkpoint,
dataset=self.dataset,
use_sentiment=self.use_sentiment,
add_persona=self.add_persona,
emotion_mid=self.emotion_mid,
weight=weight)
self.usr.load(os.path.join(model_checkpoint, "pytorch_model.bin"))
"""
self.r = {"input", "golden_acts", "golden_utts", "golden_emotions",
emotion_acts, emotion_utts}
"""
self.r = {"input": [],
"golden_acts": [],
"golden_utts": [],
"golden_emotion": []}
if self.use_sentiment:
self.r["golden_sentiment"] = []
self.r["gen_sentiment"] = []
self.emotion_list = []
for emotion in json.load(open("convlab/policy/emoUS/emotion.json")):
self.emotion_list.append(emotion)
self.r[f"{emotion}_acts"] = []
self.r[f"{emotion}_utts"] = []
sent2emo = json.load(
open("convlab/policy/emoUS/sent2emo.json"))
self.emo2sent = {}
for sent, emotions in sent2emo.items():
for emo in emotions:
self.emo2sent[emo] = sent
def _append_result(self, temp):
for x in self.r:
self.r[x].append(temp[x])
def generate_results(self, f_eval, golden=False):
emotion_mode = "normal"
in_file = json.load(open(f_eval))
for dialog in tqdm(in_file['dialog']):
temp = {}
inputs = dialog["in"]
labels = self.usr._parse_output(dialog["out"])
response = self.usr.generate_from_emotion(
raw_inputs=inputs)
temp["input"] = inputs
temp["golden_acts"] = labels["action"]
temp["golden_utts"] = labels["text"]
temp["golden_emotion"] = labels["emotion"]
for emotion, resp in response.items():
output = self.usr._parse_output(resp)
temp[f"{emotion}_acts"] = output["action"]
temp[f"{emotion}_utts"] = output["text"]
if self.use_sentiment:
temp["golden_sentiment"] = labels["sentiment"]
temp["gen_sentiment"] = output["sentiment"]
self._append_result(temp)
def read_generated_result(self, f_eval):
in_file = json.load(open(f_eval))
for dialog in tqdm(in_file['dialog']):
for x in dialog:
self.r[x].append(dialog[x])
def _transform_result(self):
index = [x for x in self.r]
result = []
for i in range(len(self.r[index[0]])):
temp = {}
for x in index:
temp[x] = self.r[x][i]
result.append(temp)
return result
def nlg_evaluation(self, input_file=None, generated_file=None, golden=False):
if input_file:
print("Force generation")
self.generate_results(input_file, golden)
elif generated_file:
self.read_generated_result(generated_file)
else:
print("You must specify the input_file or the generated_file")
mode = "max"
if self.sample:
mode = "sample"
nlg_eval = {
"golden": golden,
"mode": mode,
"metrics": {},
"dialog": self._transform_result()
}
# TODO emotion metric
dir_name = self.model_checkpoint
json.dump(nlg_eval,
open(os.path.join(
dir_name, f"{self.time}-nlg_eval.json"), 'w'),
indent=2)
return os.path.join(dir_name, f"{self.time}-nlg_eval.json")
def evaluation(self, input_file=None, generated_file=None):
# TODO add emotion
gen_file = json.load(open(generated_file))
self.read_generated_result(generated_file)
r = {"golden_acts": [], "golden_emotions": [], "golden_utts": []}
for emotion in self.emotion_list:
r[f"{emotion}_acts"] = []
r[f"{emotion}_utts"] = []
for dialog in gen_file['dialog']:
r["golden_acts"].append(dialog["golden_acts"])
r["golden_emotions"].append(dialog["golden_emotion"])
r["golden_utts"].append(dialog["golden_utts"])
for emotion in self.emotion_list:
r[f"{emotion}_acts"].append(dialog[f"{emotion}_acts"])
r[f"{emotion}_utts"].append(dialog[f"{emotion}_utts"])
dialog_result = gen_file['dialog']
scores = {}
for emotion in self.emotion_list:
# if emotion == "Neutral":
# continue
scores[emotion] = {"precision": [],
"recall": [], "f1": [], "turn_acc": []}
for gen_act, golden_act in zip(r[f"{emotion}_acts"], r["Neutral_acts"]):
s = f1_measure(preds=gen_act, labels=golden_act)
for metric in scores[emotion]:
scores[emotion][metric].append(s[metric])
result = {}
for emotion in self.emotion_list:
# if emotion == "Neutral":
# continue
result[emotion] = {}
for metric in scores[emotion]:
result[emotion][metric] = sum(
scores[emotion][metric])/len(scores[emotion][metric])
result[emotion]["bleu"] = bleu(golden_utts=r["Neutral_utts"],
gen_utts=r[f"{emotion}_utts"])
result[emotion]["SER"] = SER(gen_utts=r[f"{emotion}_utts"],
gen_acts=r[f"{emotion}_acts"])
result[emotion]["len"] = avg_len(gen_utts=r[f"{emotion}_utts"])
rouge_score = rouge(golden_utts=r["Neutral_utts"],
gen_utts=r[f"{emotion}_utts"])
for metric, score in rouge_score.items():
result[emotion][metric] = score.mid.fmeasure
print("emotion:", emotion)
for metric in result[emotion]:
print(f"{metric}: {result[emotion][metric]}")
# for metric in emo_score:
# result[metric] = emo_score[metric]
# print(f"{metric}: {result[metric]}")
result["dialog"] = dialog_result
basename = "semantic_evaluation_result"
json.dump(result, open(os.path.join(
self.model_checkpoint, f"{self.time}-{self.dataset}-{basename}.json"), 'w'), indent=2)
def avg_len(gen_utts):
n = [len(s.split()) for s in gen_utts]
return sum(n)/len(n)
def bleu(golden_utts, gen_utts):
bleu_metric = load_metric("sacrebleu")
labels = [[utt] for utt in golden_utts]
bleu_score = bleu_metric.compute(predictions=gen_utts,
references=labels,
force=True)
return bleu_score["score"]
def rouge(golden_utts, gen_utts):
rouge_metric = load_metric("rouge")
rouge_score = rouge_metric.compute(predictions=gen_utts,
references=golden_utts)
return rouge_score
def SER(gen_utts, gen_acts):
missing, hallucinate, total, hallucination_dialogs, missing_dialogs = fine_SER(
gen_acts, gen_utts)
if total <= 0:
print("ERROR, total = 0")
return 1
return missing/total
def emotion_score(golden_emotions, gen_emotions, dirname=".", time="", no_neutral=False):
labels = ["Neutral", "Fearful", "Dissatisfied",
"Apologetic", "Abusive", "Excited", "Satisfied"]
if no_neutral:
labels = labels[1:]
print(labels)
macro_f1 = metrics.f1_score(golden_emotions, gen_emotions, average="macro")
sep_f1 = metrics.f1_score(
golden_emotions, gen_emotions, average=None, labels=labels)
cm = metrics.confusion_matrix(
golden_emotions, gen_emotions, normalize="true", labels=labels)
disp = metrics.ConfusionMatrixDisplay(
confusion_matrix=cm, display_labels=labels)
disp.plot()
plt.savefig(os.path.join(dirname, f"{time}-emotion.png"))
r = {"macro_f1": float(macro_f1), "sep_f1": list(
sep_f1), "cm": [list(c) for c in list(cm)]}
print(r)
return r
def sentiment_score(golden_sentiment, gen_sentiment, dirname=".", time=""):
labels = ["Neutral", "Negative", "Positive"]
print(labels)
macro_f1 = metrics.f1_score(
golden_sentiment, gen_sentiment, average="macro")
sep_f1 = metrics.f1_score(
golden_sentiment, gen_sentiment, average=None, labels=labels)
cm = metrics.confusion_matrix(
golden_sentiment, gen_sentiment, normalize="true", labels=labels)
disp = metrics.ConfusionMatrixDisplay(
confusion_matrix=cm, display_labels=labels)
disp.plot()
plt.savefig(os.path.join(dirname, f"{time}-sentiment.png"))
r = {"macro_f1": float(macro_f1), "sep_f1": list(
sep_f1), "cm": [list(c) for c in list(cm)]}
print(r)
return r
def f1_measure(preds, labels):
tp = 0
score = {"precision": 0, "recall": 0, "f1": 0, "turn_acc": 0}
for p in preds:
if p in labels:
tp += 1.0
if preds:
score["precision"] = tp/len(preds)
if labels:
score["recall"] = tp/len(labels)
if (score["precision"] + score["recall"]) > 0:
score["f1"] = 2*(score["precision"]*score["recall"]) / \
(score["precision"]+score["recall"])
if tp == len(preds) and tp == len(labels):
score["turn_acc"] = 1
return score
def main():
args = arg_parser()
eval = Evaluator(args.model_checkpoint,
args.dataset,
use_sentiment=args.use_sentiment,
emotion_mid=args.emotion_mid,
weight=args.weight,
sample=args.sample)
print("=== evaluation ===")
print("model checkpoint", args.model_checkpoint)
print("generated_file", args.generated_file)
print("input_file", args.input_file)
with torch.no_grad():
if args.generated_file:
generated_file = args.generated_file
else:
nlg_result = eval.nlg_evaluation(input_file=args.input_file,
generated_file=args.generated_file)
generated_file = nlg_result
eval.evaluation(args.input_file,
generated_file)
if __name__ == '__main__':
main()
import json
import os
import sys
from argparse import ArgumentParser
from datetime import datetime
import matplotlib.pyplot as plt
import torch
from datasets import load_metric
from sklearn import metrics
from tqdm import tqdm
from pprint import pprint
from convlab.nlg.evaluate import fine_SER
from convlab.policy.emoUS.emoUS import UserActionPolicy
sys.path.append(os.path.dirname(os.path.dirname(
os.path.dirname(os.path.abspath(__file__)))))
def arg_parser():
parser = ArgumentParser()
parser.add_argument("--model-checkpoint", type=str, help="the model path")
parser.add_argument("--model-weight", type=str,
help="the model weight", default="")
parser.add_argument("--input-file", type=str, help="the testing input file",
default="")
parser.add_argument("--generated-file", type=str, help="the generated results",
default="")
parser.add_argument("--dataset", default="multiwoz")
parser.add_argument("--golden-emotion", action="store_true",
help="golden emotion -> action + utt")
parser.add_argument("--golden-action", action="store_true",
help="golden emotion + action -> utt")
parser.add_argument("--use-sentiment", action="store_true")
parser.add_argument("--emotion-mid", action="store_true")
parser.add_argument("--weight", type=float, default=None)
parser.add_argument("--sample", action="store_true")
return parser.parse_args()
class Evaluator:
def __init__(self, model_checkpoint, dataset, model_weight=None, **kwargs):
self.dataset = dataset
self.model_checkpoint = model_checkpoint
self.result_dir = os.path.join(model_checkpoint, "results")
os.makedirs(self.result_dir, exist_ok=True)
self.model_weight = model_weight
self.time = f"{datetime.now().strftime('%y-%m-%d-%H-%M-%S')}"
self.use_sentiment = kwargs.get("use_sentiment", False)
self.add_persona = kwargs.get("add_persona", False)
self.emotion_mid = kwargs.get("emotion_mid", False)
self.emotion_weight = kwargs.get("weight", None)
self.sample = kwargs.get("sample", False)
print("self.emotion_weight", self.emotion_weight)
self.evaluation_result = {
"emotion prediction": {},
"semantic action prediction": {},
"natural language generation": {}}
self.usr = UserActionPolicy(
model_checkpoint,
dataset=self.dataset,
use_sentiment=self.use_sentiment,
add_persona=self.add_persona,
emotion_mid=self.emotion_mid,
weight=self.emotion_weight)
self.usr.load(os.path.join(model_checkpoint, "pytorch_model.bin"))
self.r = {"input": [],
"golden_acts": [],
"golden_utts": [],
"golden_emotion": [],
"gen_acts": [],
"gen_utts": [],
"gen_emotion": []}
if self.use_sentiment:
self.r["golden_sentiment"] = []
self.r["gen_sentiment"] = []
sent2emo = json.load(
open("convlab/policy/emoUS/sent2emo.json"))
self.emo2sent = {}
for sent, emotions in sent2emo.items():
for emo in emotions:
self.emo2sent[emo] = sent
def _append_result(self, temp):
for x in self.r:
self.r[x].append(temp[x])
def generate_results(self, f_eval, golden_emotion=False, golden_action=False):
emotion_mode = "normal"
in_file = json.load(open(f_eval))
mode = "max"
if self.sample:
mode = "sample"
for dialog in tqdm(in_file['dialog']):
inputs = dialog["in"]
labels = self.usr._parse_output(dialog["out"])
if golden_action:
usr_act = labels["action"]
usr_emo = labels["emotion"]
usr_utt = self.usr.generate_text_from_give_semantic(
inputs, labels["action"], labels["emotion"])
elif golden_emotion:
usr_emo = labels["emotion"]
output = self.usr.generate_from_emotion(
inputs, emotion=usr_emo, mode=mode)
output = self.usr._parse_output(output[usr_emo])
usr_act = self.usr._remove_illegal_action(output["action"])
usr_utt = output["text"]
else:
output = self.usr._parse_output(
self.usr._generate_action(inputs, mode=mode, emotion_mode=emotion_mode))
usr_emo = output["emotion"]
usr_act = self.usr._remove_illegal_action(output["action"])
usr_utt = output["text"]
temp = {}
temp["input"] = inputs
temp["golden_acts"] = labels["action"]
temp["golden_utts"] = labels["text"]
temp["golden_emotion"] = labels["emotion"]
temp["gen_acts"] = usr_act
temp["gen_utts"] = usr_utt
temp["gen_emotion"] = usr_emo
if self.use_sentiment:
temp["golden_sentiment"] = labels["sentiment"]
temp["gen_sentiment"] = output["sentiment"]
self._append_result(temp)
# save generations
generations = {}
generations["time"] = self.time
generations["golden"] = False
if golden_action:
# basically, golden_action includes golden_emotion
generations["golden"] = "golden_action"
elif golden_emotion:
generations["golden"] = "golden_emotion"
generations["mode"] = mode
generations["dialog"] = self._transform_result()
file_name = "generations.json"
if generations["golden"]:
file_name = generations['golden'] + "_" + file_name
with open(os.path.join(self.result_dir, file_name), "w") as f:
json.dump(generations, f, indent=2)
def read_generated_result(self, f_eval):
in_file = json.load(open(f_eval))
for dialog in tqdm(in_file['dialog']):
for x in dialog:
if x not in self.r:
self.r[x] = []
self.r[x].append(dialog[x])
def _transform_result(self):
index = [x for x in self.r]
result = []
for i in range(len(self.r[index[0]])):
temp = {}
for x in index:
temp[x] = self.r[x][i]
result.append(temp)
return result
@staticmethod
def nlg_evaluation(golden_utts, gen_utts, gen_acts):
bleu_metric = load_metric("sacrebleu")
labels = [[utt] for utt in golden_utts]
bleu_score = bleu_metric.compute(predictions=gen_utts,
references=labels,
force=True)
missing, hallucinate, total, hallucination_dialogs, missing_dialogs = fine_SER(
gen_acts, gen_utts)
return {"bleu": bleu_score["score"], "SER": missing/total}
@staticmethod
def _intent_domain(action):
acts = []
for intent, domain, slot, value in action:
if [intent, domain] not in acts:
acts.append([intent, domain])
return acts
def semantic_evaluation(self, gen_acts, golden_acts):
scores = {"full action": {"precision": [], "recall": [], "f1": [], "turn_acc": []},
"intent-domain": {"precision": [], "recall": [], "f1": [], "turn_acc": []}}
for gen_act, golden_act in zip(gen_acts, golden_acts):
s = f1_measure(preds=gen_act, labels=golden_act)
for metric in scores["full action"]:
scores["full action"][metric].append(s[metric])
s = f1_measure(preds=self._intent_domain(gen_act),
labels=self._intent_domain(golden_act))
for metric in scores["intent-domain"]:
scores["intent-domain"][metric].append(s[metric])
result = {}
for metric_type, score in scores.items():
result[metric_type] = {}
for m, s in score.items():
result[metric_type][m] = sum(s)/len(s)
return result
def evaluation(self, input_file="", generated_file="", golden_emotion=False, golden_action=False):
if input_file:
print("Force generation")
self.generate_results(input_file, golden_emotion, golden_action)
elif generated_file:
self.read_generated_result(generated_file)
else:
print("You must specify the input_file or the generated_file")
r = self.nlg_evaluation(
self.r["golden_utts"], self.r["gen_utts"], self.r["gen_acts"])
for metric, score in r.items():
self.evaluation_result["natural language generation"][metric] = score
if not golden_action:
r = self.semantic_evaluation(
self.r["gen_acts"], self.r["golden_acts"])
for metric, score in r.items():
self.evaluation_result["semantic action prediction"][metric] = score
if not golden_emotion and not golden_action:
r = emotion_score(self.r["golden_emotion"],
self.r["gen_emotion"],
self.result_dir)
self.evaluation_result["emotion prediction"]["emotion"] = {}
self.evaluation_result["emotion prediction"]["emotion"]["macro_f1"] = r["macro_f1"]
self.evaluation_result["emotion prediction"]["emotion"]["sep_f1"] = {
emo: f1 for emo, f1 in zip(r["label"], r["sep_f1"])}
if self.use_sentiment:
golden_sentiment = self.r["golden_sentiment"]
gen_sentiment = self.r["gen_sentiment"]
else:
# transfer emotions to sentiment if the model do not generate sentiment
golden_sentiment = [self.emo2sent[emo]
for emo in self.r["golden_emotion"]]
gen_sentiment = [self.emo2sent[emo]
for emo in self.r["gen_emotion"]]
r = sentiment_score(golden_sentiment,
gen_sentiment,
self.result_dir)
self.evaluation_result["emotion prediction"]["sentiment"] = {}
self.evaluation_result["emotion prediction"]["sentiment"]["macro_f1"] = r["macro_f1"]
self.evaluation_result["emotion prediction"]["sentiment"]["sep_f1"] = {
emo: f1 for emo, f1 in zip(r["label"], r["sep_f1"])}
pprint(self.evaluation_result)
# def save_results(self):
# def print_result(self):
# print("=== Natural language generation ===")
# print("Sacre-BLEU", nlg_eval["metrics"]["bleu"]["score"])
# print("SER", nlg_eval["metrics"]["SER"])
# self.r[""]
def emotion_score(golden_emotions, gen_emotions, dirname=".", no_neutral=False):
labels = ["Neutral", "Fearful", "Dissatisfied",
"Apologetic", "Abusive", "Excited", "Satisfied"]
if no_neutral:
labels = labels[1:]
macro_f1 = metrics.f1_score(golden_emotions, gen_emotions, average="macro")
sep_f1 = metrics.f1_score(
golden_emotions, gen_emotions, average=None, labels=labels)
cm = metrics.confusion_matrix(
golden_emotions, gen_emotions, normalize="true", labels=labels)
disp = metrics.ConfusionMatrixDisplay(
confusion_matrix=cm, display_labels=labels)
disp.plot()
plt.savefig(os.path.join(dirname, f"emotion.png"))
r = {"label": labels,
"macro_f1": float(macro_f1),
"sep_f1": list(sep_f1),
"cm": [list(c) for c in list(cm)]}
return r
def sentiment_score(golden_sentiment, gen_sentiment, dirname="."):
labels = ["Neutral", "Negative", "Positive"]
macro_f1 = metrics.f1_score(
golden_sentiment, gen_sentiment, average="macro")
sep_f1 = metrics.f1_score(
golden_sentiment, gen_sentiment, average=None, labels=labels)
cm = metrics.confusion_matrix(
golden_sentiment, gen_sentiment, normalize="true", labels=labels)
disp = metrics.ConfusionMatrixDisplay(
confusion_matrix=cm, display_labels=labels)
disp.plot()
plt.savefig(os.path.join(dirname, f"sentiment.png"))
r = {"label": labels,
"macro_f1": float(macro_f1),
"sep_f1": list(sep_f1),
"cm": [list(c) for c in list(cm)]}
return r
def f1_measure(preds, labels):
tp = 0
score = {"precision": 0, "recall": 0, "f1": 0, "turn_acc": 0}
for p in preds:
if p in labels:
tp += 1.0
if preds:
score["precision"] = tp/len(preds)
if labels:
score["recall"] = tp/len(labels)
if (score["precision"] + score["recall"]) > 0:
score["f1"] = 2*(score["precision"]*score["recall"]) / \
(score["precision"]+score["recall"])
if tp == len(preds) and tp == len(labels):
score["turn_acc"] = 1
return score
def main():
args = arg_parser()
eval = Evaluator(args.model_checkpoint,
args.dataset,
args.model_weight,
use_sentiment=args.use_sentiment,
emotion_mid=args.emotion_mid,
weight=args.weight,
sample=args.sample)
print("=== evaluation ===")
print("model checkpoint", args.model_checkpoint)
print("generated_file", args.generated_file)
print("input_file", args.input_file)
with torch.no_grad():
eval.evaluation(input_file=args.input_file,
generated_file=args.generated_file,
golden_emotion=args.golden_emotion,
golden_action=args.golden_action)
if __name__ == '__main__':
main()
# from fast_bleu import SelfBLEU
import argparse
import json
from datasets import Dataset, load_metric
from tqdm import tqdm
def arg_parser():
parser = argparse.ArgumentParser()
parser.add_argument("--file", type=str)
parser.add_argument("--fast-bleu", action="store_true")
parser.add_argument("--uss", action="store_true")
return parser.parse_args()
def read_file(file_name):
nlg_candidates = json.load(open(file_name))
return nlg_candidates
def get_sent(candidates, bleu_mode="torch", uss=False):
if bleu_mode == "torch":
if uss:
return [x["preds"] for x in candidates]
if "log" in candidates:
return [x["gen_utts"] for x in candidates["log"]]
else:
return [x["gen_utts"] for x in candidates["dialog"]]
else:
if uss:
return [x["preds"].split() for x in candidates]
if "log" in candidates:
return [x["gen_utts"].split() for x in candidates["log"]]
else:
return [x["gen_utts"].split() for x in candidates["dialog"]]
def SelfBLEU(sentences):
metric = load_metric("sacrebleu")
result = []
for i, sent in tqdm(enumerate(sentences), ascii=True):
r = metric.compute(predictions=[sent], references=[
sentences[i:]+sentences[i+1:]])
result.append(r["score"])
return sum(result)/len(result)
def calculate(candidates, bleu_mode="torch", uss=False):
sentences = get_sent(candidates, bleu_mode, uss)
if bleu_mode == "torch":
x = SelfBLEU(sentences)
else:
bleu = fast_bleu.SelfBLEU(sentences)
x = bleu.get_score()
# x = bleu.get_score()
# print(x)
print(sum(x[4])/len(x[4]))
if __name__ == "__main__":
args = arg_parser()
if args.fast_bleu:
import fast_bleu
calculate(read_file(args.file), "fast-bleu", uss=args.uss)
else:
calculate(read_file(args.file), uss=args.uss)
{
"Neutral": [
"Neutral"
],
"Negative": [
"Fearful",
"Dissatisfied",
"Apologetic",
"Abusive"
],
"Positive": [
"Excited",
"Satisfied"
]
}
\ No newline at end of file
{
"Neutral": 0,
"Negative": 1,
"Positive": 2
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment