Skip to content
Snippets Groups Projects
Unverified Commit 405e87c5 authored by hualai-liujiexi's avatar hualai-liujiexi Committed by GitHub
Browse files

Add laug and sc-gpt (#201)

* add laug and sc-gpt

* Update README.md

Co-authored-by: Ljx <hualai-liujiexi>
parent 76406501
Branches
No related tags found
No related merge requests found
Showing
with 1033 additions and 1 deletion
...@@ -43,7 +43,9 @@ RUN pip install quadprog ...@@ -43,7 +43,9 @@ RUN pip install quadprog
RUN pip install pyyaml RUN pip install pyyaml
RUN pip install fuzzywuzzy RUN pip install fuzzywuzzy
RUN pip install python-Levenshtein RUN pip install python-Levenshtein
RUN pip install gtts
RUN pip install DeepSpeech
RUN pip install pydub
RUN [ "python", "-c", "import nltk; nltk.download('stopwords')" ] RUN [ "python", "-c", "import nltk; nltk.download('stopwords')" ]
......
# LAUG
**LAUG**[[repo]](https://github.com/thu-coai/LAUG/) is an open-source toolkit for Language understanding AUGmentation. It is an automatic method to approximate the natural perturbations to existing data. Augmented data could be used to conduct black-box robustness testing or enhancing training. [[paper]](https://arxiv.org/abs/2012.15262)
Here are the 4 augmentation methods described in our paper.
- Word Perturbation, at `Word_Perturbation/` dir.
- Text Paraphrasing, at `Text_Paraphrasing/`dir.
- Speech Recognition, at `Speech_Recognition/`dir.
- Speech Disfluency, at `Speech_Disfluency/`dir.
Please see our paper and README.md in each augmentation method for detailed information.
See `demo.py` for the usage of these augmentation methods.
> python demo.py
Noting that our augmentation methods contains several neural models, pre-trained parameters need to be downloaded before use. Parameters pre-trained by us are available at [Link](http://115.182.62.174:9876/). For parameters which released by others, please follow the instructions of each method.
# -*- coding: utf-8 -*-
# Arranged from pytorch official tutorials
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.optim as optim
import json
torch.manual_seed(1)
#####################################################################
# Helper functions to make the code more readable.
def argmax(vec):
# return the argmax as a python int
_, idx = torch.max(vec, 1)
return idx.item()
# Compute log sum exp in a numerically stable way for the forward algorithm
def log_sum_exp(vec):
max_score = vec[0, argmax(vec)]
max_score_broadcast = max_score.view(1, -1).expand(1, vec.size()[1])
return max_score + \
torch.log(torch.sum(torch.exp(vec - max_score_broadcast)))
#####################################################################
# Create model
class BiLSTM_CRF(nn.Module):
def __init__(self, vocab_size, tag_to_ix, embedding_dim, hidden_dim,emb_weights):
super(BiLSTM_CRF, self).__init__()
self.embedding_dim = embedding_dim
self.hidden_dim = hidden_dim
self.vocab_size = vocab_size
self.tag_to_ix = tag_to_ix
self.tagset_size = len(tag_to_ix)
#self.word_embeds = nn.Embedding(vocab_size, embedding_dim)
self.word_embeds=nn.Embedding.from_pretrained(emb_weights)
self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2,
num_layers=1, bidirectional=True)
# Maps the output of the LSTM into tag space.
self.hidden2tag = nn.Linear(hidden_dim, self.tagset_size)
# Matrix of transition parameters. Entry i,j is the score of
# transitioning *to* i *from* j.
self.transitions = nn.Parameter(
torch.randn(self.tagset_size, self.tagset_size))
# These two statements enforce the constraint that we never transfer
# to the start tag and we never transfer from the stop tag
self.transitions.data[tag_to_ix[START_TAG], :] = -10000
self.transitions.data[:, tag_to_ix[STOP_TAG]] = -10000
self.hidden = self.init_hidden()
def init_hidden(self):
return (torch.randn(2, 1, self.hidden_dim // 2),
torch.randn(2, 1, self.hidden_dim // 2))
def _forward_alg(self, feats):
# Do the forward algorithm to compute the partition function
init_alphas = torch.full((1, self.tagset_size), -10000.)
# START_TAG has all of the score.
init_alphas[0][self.tag_to_ix[START_TAG]] = 0.
# Wrap in a variable so that we will get automatic backprop
forward_var = init_alphas
# Iterate through the sentence
for feat in feats:
alphas_t = [] # The forward tensors at this timestep
for next_tag in range(self.tagset_size):
# broadcast the emission score: it is the same regardless of
# the previous tag
emit_score = feat[next_tag].view(
1, -1).expand(1, self.tagset_size)
# the ith entry of trans_score is the score of transitioning to
# next_tag from i
trans_score = self.transitions[next_tag].view(1, -1)
# The ith entry of next_tag_var is the value for the
# edge (i -> next_tag) before we do log-sum-exp
next_tag_var = forward_var + trans_score + emit_score
# The forward variable for this tag is log-sum-exp of all the
# scores.
alphas_t.append(log_sum_exp(next_tag_var).view(1))
forward_var = torch.cat(alphas_t).view(1, -1)
terminal_var = forward_var + self.transitions[self.tag_to_ix[STOP_TAG]]
alpha = log_sum_exp(terminal_var)
return alpha
def _get_lstm_features(self, sentence):
self.hidden = self.init_hidden()
embeds = self.word_embeds(sentence).view(len(sentence), 1, -1)
lstm_out, self.hidden = self.lstm(embeds, self.hidden)
lstm_out = lstm_out.view(len(sentence), self.hidden_dim)
lstm_feats = self.hidden2tag(lstm_out)
return lstm_feats
def _score_sentence(self, feats, tags):
# Gives the score of a provided tag sequence
score = torch.zeros(1)
tags = torch.cat([torch.tensor([self.tag_to_ix[START_TAG]], dtype=torch.long), tags])
for i, feat in enumerate(feats):
score = score + \
self.transitions[tags[i + 1], tags[i]] + feat[tags[i + 1]]
score = score + self.transitions[self.tag_to_ix[STOP_TAG], tags[-1]]
return score
def _viterbi_decode(self, feats):
backpointers = []
# Initialize the viterbi variables in log space
init_vvars = torch.full((1, self.tagset_size), -10000.)
init_vvars[0][self.tag_to_ix[START_TAG]] = 0
# forward_var at step i holds the viterbi variables for step i-1
forward_var = init_vvars
for feat in feats:
bptrs_t = [] # holds the backpointers for this step
viterbivars_t = [] # holds the viterbi variables for this step
for next_tag in range(self.tagset_size):
# next_tag_var[i] holds the viterbi variable for tag i at the
# previous step, plus the score of transitioning
# from tag i to next_tag.
# We don't include the emission scores here because the max
# does not depend on them (we add them in below)
next_tag_var = forward_var + self.transitions[next_tag]
best_tag_id = argmax(next_tag_var)
bptrs_t.append(best_tag_id)
viterbivars_t.append(next_tag_var[0][best_tag_id].view(1))
# Now add in the emission scores, and assign forward_var to the set
# of viterbi variables we just computed
forward_var = (torch.cat(viterbivars_t) + feat).view(1, -1)
backpointers.append(bptrs_t)
# Transition to STOP_TAG
terminal_var = forward_var + self.transitions[self.tag_to_ix[STOP_TAG]]
best_tag_id = argmax(terminal_var)
path_score = terminal_var[0][best_tag_id]
# Follow the back pointers to decode the best path.
best_path = [best_tag_id]
for bptrs_t in reversed(backpointers):
best_tag_id = bptrs_t[best_tag_id]
best_path.append(best_tag_id)
# Pop off the start tag (we dont want to return that to the caller)
start = best_path.pop()
assert start == self.tag_to_ix[START_TAG] # Sanity check
best_path.reverse()
return path_score, best_path
def neg_log_likelihood(self, sentence, tags):
feats = self._get_lstm_features(sentence)
forward_score = self._forward_alg(feats)
gold_score = self._score_sentence(feats, tags)
return forward_score - gold_score
def forward(self, sentence): # dont confuse this with _forward_alg above.
# Get the emission scores from the BiLSTM
lstm_feats = self._get_lstm_features(sentence)
# Find the best path, given the features.
score, tag_seq = self._viterbi_decode(lstm_feats)
return score, tag_seq
#####################################################################
# Run training
START_TAG = "<START>"
STOP_TAG = "<STOP>"
EMBEDDING_DIM = 100
HIDDEN_DIM = 100
## Speech_Disfluency
The interruption points are predictedby a Bi-LSTM+CRF model.
The fillerwords, restart terms, and edit terms and their occurrence frequency are all sampled from their distribution in SwitchBoard.
## Bi-LSTM+CRF model
Bi-LSTM+CRF model is trained on SwitchBoard data.
Please download the pre-trained parameters and disfluency resources at [Link](http://115.182.62.174:9876/).
The model requires glove.6B.100d wordvector, please modify line22 in inference.py.
# -*- coding: utf-8 -*-
import json
import random
from fuzzywuzzy import fuzz
from convlab2.laug.Speech_Disfluency.inference import IP_model
import os
current_path=os.path.dirname(os.path.abspath(__file__))
def random_01(possibility):
x=random.random()
if x>=possibility:
return 0
else:
return 1
def random_pick_from_list(random_list):
return random_list[int(len(random_list)*random.random())]
def process_distribution_dict(distribution_dict):
processed_distribution=[]
sum=0
for key in distribution_dict:
sum+=distribution_dict[key]
processed_distribution.append((key,sum))
return processed_distribution
def random_pick_from_distribution(distribution_dict):
processed_distribution=process_distribution_dict(distribution_dict)
x=random.random()*processed_distribution[-1][1]
for item in processed_distribution:
if x>item[1]:
continue
else:
picked_item=item[0]
break
return picked_item
def preprocess(sentence):
word_list=sentence.lower().strip().split()
return word_list
class Speech_Disfluency:
def __init__(self,dataset='multiwoz',edit_frequency=0.3):
self.resources=json.load(open(os.path.join(current_path,'resources/resources_'+dataset+'.json'),'r'))
self.edit_frequency=edit_frequency
def protect_slots(self,word_list,spans,IP_tags):
sentence=' '.join(word_list)+' '
for span in spans:
value=span[2]
start=sentence.count(' ',0,sentence.find(' '+value+' '))
lenth=len(value.split())
for i in range(start+1,start+lenth):
IP_tags[i]=0
IP_tags[start]=1
if IP_tags[start]==2:
IP_tags[start]=1
return IP_tags
def add_repairs(self,word_list,spans):
sentence=' '+' '.join(word_list)+' '
if len(spans)==0:
return word_list
else:
edit_possibility=self.edit_frequency/len(spans)
for span in spans:
if random_01(edit_possibility)==0:
continue
value=span[2]
start=sentence.count(' ',0,sentence.find(' '+value+' '))-1
max_ratio,max_entity=0,''
for e in self.resources["knowledge_base"]["entity"]:
ratio=fuzz.ratio(e,value)
if ratio>max_ratio:
max_ratio=ratio
max_entity=e
if max_entity!='' and max_ratio>60:
candidate=[]
if max_entity in self.resources["knowledge_base"]["entity"]:
candidate=self.resources["knowledge_base"]["category"][random_pick_from_list(self.resources["knowledge_base"]["entity"][max_entity])][0:]
if span in candidate:
candidate.remove(span)
if len(candidate)!=0:
word_list[start]=random_pick_from_list(candidate)+' '+random_pick_from_list(self.resources["edit_terms"])+' '+word_list[start]
return word_list
def add_repeats(self,word_list,IP_tags):
for i in range(len(IP_tags)):
if IP_tags[i]==2:
word_list[i]=word_list[i]+random_pick_from_list([' ',' , '])+word_list[i]
return word_list
def add_fillers(self,word_list,IP_tags):
for i in range(len(IP_tags)):
if IP_tags[i]==1:
word_list[i]=random_pick_from_distribution(self.resources["filler_terms"])+' '+word_list[i]
return word_list
def add_restart(self,word_list):
word_list[0]=random_pick_from_distribution(self.resources["restart_terms"])+' '+word_list[0]
return word_list
def find_spans(self,disfluent_sentence,spans):
checked=1
sentence=' '+disfluent_sentence+' '
for i in range(len(spans)):
value=spans[i][2]
start=sentence.count(' ',0,sentence.find(' '+value+' '))
lenth=len(value.split())
spans[i][3]=start
spans[i][4]=start+lenth-1
if ' '.join(sentence.split()[spans[i][3]:spans[i][4]+1])!=spans[i][2]:
checked=0
return spans,checked
def aug(self,sentence,spans):
word_list=preprocess(sentence)
IP_tags=IP_model(word_list)
IP_tags=self.protect_slots(word_list,spans,IP_tags)
word_list=self.add_repairs(word_list,spans)
word_list=self.add_repeats(word_list,IP_tags)
word_list=self.add_fillers(word_list,IP_tags)
word_list=self.add_restart(word_list)
disfluent_sentence=' '.join(word_list)
new_spans,checked=self.find_spans(disfluent_sentence,spans)
return disfluent_sentence,new_spans
# input sentence and span_info ; output the disfluent sentence and new_span_info
if __name__=="__main__":
text = "I want a train to Cambridge"
span_info = [["Train-Inform","Dest","Cambridge",5,5]]
SR = Speech_Disfluency()
new_text,new_span_info = SR.aug(text,span_info)
print(new_text)
print(new_span_info)
from .LSTMCRF import BiLSTM_CRF
import json
import numpy as np
import torch
import os
START_TAG = "<START>"
STOP_TAG = "<STOP>"
EMBEDDING_DIM = 100
HIDDEN_DIM = 100
# Make up some training data
def prepare_sequence(seq, to_ix):
idxs=[]
for w in seq:
if w in to_ix:
idxs.append(to_ix[w])
else:
idxs.append(0)
return torch.tensor(idxs, dtype=torch.long)
# Put your dir to glove here
glove_file='[dir_to]/glove.6B.100d.txt'
word_to_ix={}
max=20000
ifs=open(glove_file, 'r')
word_to_ix['<unk>'] = 0
weights=[]
weights.append(torch.from_numpy(np.array([0.]*100)))
for i,line in enumerate(ifs.readlines()):
if i>=max:
break
line_list = line.split()
word = line_list[0]
embed = line_list[1:]
embed = torch.from_numpy(np.array([float(num) for num in embed]))
word_to_ix[word] = i+1
weights.append(embed)
weights = torch.stack(weights, 0).float()
tag_to_ix = {"O": 0, "F": 1, "R": 2, START_TAG: 3, STOP_TAG: 4}
model = BiLSTM_CRF(len(word_to_ix), tag_to_ix, EMBEDDING_DIM, HIDDEN_DIM,weights)
model_path=os.path.dirname(os.path.abspath(__file__))
model.load_state_dict(torch.load(os.path.join(model_path,'model/LSTMCRF.bin')))
def IP_model(word_list):
with torch.no_grad():
precheck_sent = prepare_sequence(word_list, word_to_ix)
return model(precheck_sent)[1]
if __name__=="__main__":
sent="okay , i like to do weight training and cycling ."
print(IP_model(sent.split()))
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.optim as optim
import json
from LSTMCRF2 import BiLSTM_CRF
import numpy as np
from progressbar import progressbar
def prepare_sequence(seq, to_ix):
idxs=[]
for w in seq:
if w in to_ix:
idxs.append(to_ix[w])
else:
idxs.append(0)
return torch.tensor(idxs, dtype=torch.long)
START_TAG = "<START>"
STOP_TAG = "<STOP>"
EMBEDDING_DIM = 100
HIDDEN_DIM = 100
# Make up some training data
data=json.load(open('SWBD/data.json','r'))
training_data=[]
for d in data:
training_data.append((d['text'],d['tags']))
print(len(training_data))
glove_file=''
word_to_ix={}
max=20000
ifs=open(glove_file, 'r')
word_to_ix['<unk>'] = 0
weights=[]
weights.append(torch.from_numpy(np.array([0.]*100)))
for i,line in enumerate(ifs.readlines()):
if i>=max:
break
line_list = line.split()
word = line_list[0]
embed = line_list[1:]
embed = torch.from_numpy(np.array([float(num) for num in embed]))
word_to_ix[word] = i+1
weights.append(embed)
weights = torch.stack(weights, 0).float()
tag_to_ix = {"O": 0, "F": 1, "R": 2, START_TAG: 3, STOP_TAG: 4}
model = BiLSTM_CRF( len(word_to_ix), tag_to_ix, EMBEDDING_DIM, HIDDEN_DIM,weights)
model
optimizer = optim.Adam(model.parameters(), lr=0.001)
with torch.no_grad():
precheck_sent = prepare_sequence(training_data[0][0], word_to_ix)
precheck_tags = torch.tensor([tag_to_ix[t] for t in training_data[0][1]], dtype=torch.long)
print(model(precheck_sent))
ep=0
for epoch in range(30):
n,losses=0,0.
ep+=1
for sentence, tags in progressbar(training_data):
model.zero_grad()
sentence_in = prepare_sequence(sentence, word_to_ix)
targets = torch.tensor([tag_to_ix[t] for t in tags], dtype=torch.long)
loss = model.neg_log_likelihood(sentence_in, targets)
losses+=loss
n+=1
loss.backward()
optimizer.step()
torch.save(model.state_dict(), 'model/LSTMCRF_'+str(ep)+'.bin')
print('loss:'+str(losses/n))
with torch.no_grad():
precheck_sent = prepare_sequence("okay , i like to do , weight training and cycling .".split(), word_to_ix)
print(model(precheck_sent))
precheck_sent = prepare_sequence(training_data[1][0], word_to_ix)
print(model(precheck_sent))
precheck_sent = prepare_sequence('i want to go to cambridge .'.split(), word_to_ix)
print(model(precheck_sent))
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function
import os
import argparse
import numpy as np
import shlex
import subprocess
import sys
import wave
import json
from deepspeech import Model, version
from timeit import default_timer as timer
try:
from shhlex import quote
except ImportError:
from pipes import quote
def convert_samplerate(audio_path, desired_sample_rate):
sox_cmd = 'sox {} --type raw --bits 16 --channels 1 --rate {} --encoding signed-integer --endian little --compression 0.0 --no-dither - '.format(quote(audio_path), desired_sample_rate)
try:
output = subprocess.check_output(shlex.split(sox_cmd), stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
raise RuntimeError('SoX returned non-zero status: {}'.format(e.stderr))
except OSError as e:
raise OSError(e.errno, 'SoX not found, use {}hz files or install it: {}'.format(desired_sample_rate, e.strerror))
return desired_sample_rate, np.frombuffer(output, np.int16)
def metadata_to_string(metadata):
return ''.join(token.text for token in metadata.tokens)
def words_from_candidate_transcript(metadata):
word = ""
word_list = []
word_start_time = 0
# Loop through each character
for i, token in enumerate(metadata.tokens):
# Append character to word if it's not a space
if token.text != " ":
if len(word) == 0:
# Log the start time of the new word
word_start_time = token.start_time
word = word + token.text
# Word boundary is either a space or the last character in the array
if token.text == " " or i == len(metadata.tokens) - 1:
word_duration = token.start_time - word_start_time
if word_duration < 0:
word_duration = 0
each_word = dict()
each_word["word"] = word
each_word["start_time"] = round(word_start_time, 4)
each_word["duration"] = round(word_duration, 4)
word_list.append(each_word)
# Reset
word = ""
word_start_time = 0
return word_list
def metadata_json_output(metadata):
json_result = dict()
json_result["transcripts"] = [{
"confidence": transcript.confidence,
"words": words_from_candidate_transcript(transcript),
} for transcript in metadata.transcripts]
return json.dumps(json_result, indent=2)
class VersionAction(argparse.Action):
def __init__(self, *args, **kwargs):
super(VersionAction, self).__init__(nargs=0, *args, **kwargs)
def __call__(self, *args, **kwargs):
print('DeepSpeech ', version())
exit(0)
class wav2text():
def __init__(self,):
print('Loading model from file {}'.format(args.model), file=sys.stderr)
model_load_start = timer()
# sphinx-doc: python_ref_model_start
model_path=os.path.dirname(os.path.abspath(__file__))
ds = Model(os.path.join(model_path,args.model))
# sphinx-doc: python_ref_model_stop
model_load_end = timer() - model_load_start
print('Loaded model in {:.3}s.'.format(model_load_end), file=sys.stderr)
if args.beam_width:
ds.setBeamWidth(args.beam_width)
self.desired_sample_rate = ds.sampleRate()
if args.scorer:
print('Loading scorer from files {}'.format(args.scorer), file=sys.stderr)
scorer_load_start = timer()
ds.enableExternalScorer(os.path.join(model_path,args.scorer))
scorer_load_end = timer() - scorer_load_start
print('Loaded scorer in {:.3}s.'.format(scorer_load_end), file=sys.stderr)
if args.lm_alpha and args.lm_beta:
ds.setScorerAlphaBeta(args.lm_alpha, args.lm_beta)
if args.hot_words:
print('Adding hot-words', file=sys.stderr)
for word_boost in args.hot_words.split(','):
word,boost = word_boost.split(':')
ds.addHotWord(word,float(boost))
self.ds=ds
def run(self,audio):
fin = wave.open(audio, 'rb')
fs_orig = fin.getframerate()
if fs_orig != self.desired_sample_rate:
print('Warning: original sample rate ({}) is different than {}hz. Resampling might produce erratic speech recognition.'.format(fs_orig, desired_sample_rate), file=sys.stderr)
fs_new, audio = convert_samplerate(args.audio, desired_sample_rate)
else:
audio = np.frombuffer(fin.readframes(fin.getnframes()), np.int16)
audio_length = fin.getnframes() * (1/fs_orig)
fin.close()
inference_start = timer()
# sphinx-doc: python_ref_inference_start
text=self.ds.stt(audio)
#print(text)
# sphinx-doc: python_ref_inference_stop
inference_end = timer() - inference_start
#print('Inference took %0.3fs for %0.3fs audio file.' % (inference_end, audio_length), file=sys.stderr)
return text
parser = argparse.ArgumentParser(description='Running DeepSpeech inference.')
parser.add_argument('--model', required=False,default='deepspeech-0.9.3-models.pbmm',
help='Path to the model (protocol buffer binary file)')
parser.add_argument('--scorer', required=False,default='deepspeech-0.9.3-models.scorer',
help='Path to the external scorer file')
parser.add_argument('--audio', required=False,
help='Path to the audio file to run (WAV format)')
parser.add_argument('--beam_width', type=int,
help='Beam width for the CTC decoder')
parser.add_argument('--lm_alpha', type=float,
help='Language model weight (lm_alpha). If not specified, use default from the scorer package.')
parser.add_argument('--lm_beta', type=float,
help='Word insertion bonus (lm_beta). If not specified, use default from the scorer package.')
parser.add_argument('--version', action=VersionAction,
help='Print version and exits')
parser.add_argument('--extended', required=False, action='store_true',
help='Output string from extended metadata')
parser.add_argument('--json', required=False, action='store_true',
help='Output json from metadata with timestamp of each word')
parser.add_argument('--candidate_transcripts', type=int, default=3,
help='Number of candidate transcripts to include in JSON output')
parser.add_argument('--hot_words', type=str,
help='Hot-words and their boosts.')
args = parser.parse_args()
# Speech Recognition
A TTS+ASR pipeline to simulate speech characteristics and recognition error.
## TTS
We use gTTS as the TTS moudle.
Pleas install ffmpeg before use:
```bash
conda install ffmpeg
```
## ASR
We use DeepSpeech as the ASR moudle. Noting that we use DeepSpeech2 to conduct our experiments in our paper, but in this released toolkit we choose DeepSpeech instead for higher efficiency.
Please download [released models](https://github.com/mozilla/DeepSpeech/releases/tag/v0.9.3) before use.
Please download deepspeech-0.9.3-models.pbmm and deepspeech-0.9.3-models.scorer place them under `Speech Recognition/` dir.
#coding: UTF-8
from convlab2.laug.Speech_Recognition.ASR import wav2text
from convlab2.laug.Speech_Recognition.TTS import text2wav
from convlab2.laug.Speech_Recognition.multiwoz.span_detection import span_detect
import os
import time
class Speech_Recognition:
def __init__(self,dataset='multiwoz',temp_file='temp',tld='com'):
self.wav2text = wav2text()
self.temp_file = temp_file
self.tld = tld
def aug(self,text,span_info):
ok=0
while ok==0:
try:
text2wav(text,tld=self.tld,filename=self.temp_file)
except ValueError:
ok=0
print("gTTS error occur!")
else:
ok=1
new_text = self.wav2text.run(self.temp_file+".wav")
new_span_info=[]
for span in span_info:
new_span_info.append(span_detect(text,new_text,span))
return new_text,new_span_info
if __name__=="__main__":
text = "I want a train to Cambridge"
span_info = [["Train-Inform","Dest","Cambridge",5,5]]
SR = Speech_Recognition()
new_text,new_span_info = SR.aug(text,span_info)
print(new_text)
print(new_span_info)
#coding: UTF-8
from gtts import gTTS
from pydub.audio_segment import AudioSegment
import os
def text2wav(text,language='en',filename='temp',tld='cn'):
gTTS(text=text, tld=tld,lang=language).save(filename+".mp3")
AudioSegment.from_mp3(filename+".mp3").set_frame_rate(16000).export(filename+".wav", format="wav")
# -*- coding: utf-8 -*-
\ No newline at end of file
import locale;
NUMBER_CONSTANT = {0:"zero ", 1:"one", 2:"two", 3:"three", 4:"four", 5:"five", 6:"six", 7:"seven",
8:"eight", 9:"nine", 10:"ten", 11:"eleven", 12:"twelve", 13:"thirteen",
14:"fourteen", 15:"fifteen", 16:"sixteen", 17:"seventeen", 18:"eighteen", 19:"nineteen" };
IN_HUNDRED_CONSTANT = {2:"twenty", 3:"thirty", 4:"forty", 5:"fifty", 6:"sixty", 7:"seventy", 8:"eighty", 9:"ninety"}
BASE_CONSTANT = {0:" ", 1:"hundred", 2:"thousand", 3:"million", 4:"billion"};
#supported number range is 1-n billion;
def translateNumberToEnglish(number):
if str(number).isnumeric():
if str(number)[0] == '0' and len(str(number)) > 1:
return translateNumberToEnglish(int(number[1:]));
if int(number) < 20:
return NUMBER_CONSTANT[int(number)];
elif int(number) < 100:
if str(number)[1] == '0':
return IN_HUNDRED_CONSTANT[int(str(number)[0])];
else:
return IN_HUNDRED_CONSTANT[int(str(number)[0])] + " " + NUMBER_CONSTANT[int(str(number)[1])];
else:
#locale.setlocale(locale.LC_ALL, "English_United States.1252");
#strNumber = locale.format("%d" , number, grouping=True);
strNumber=str(number)
numberArray = str(strNumber).split(",");
stringResult = "";
groupCount = len(numberArray) + 1;
for groupNumber in numberArray:
if groupCount > 1 and groupNumber[0:] != "000":
stringResult += str(getUnderThreeNumberString(str(groupNumber))) + " ";
else:
break;
groupCount -= 1;
if groupCount > 1:
stringResult += BASE_CONSTANT[groupCount] + " ";
endPoint = len(stringResult) - len(" hundred,");
#return stringResult[0:endPoint];
return stringResult;
else:
print("please input a number!");
#between 0-999
def getUnderThreeNumberString(number):
if str(number).isnumeric() and len(number) < 4:
if len(number) < 3:
return translateNumberToEnglish(int(number));
elif len(number) == 3 and number[0:] == "000":
return " ";
elif len(number) == 3 and number[1:] == "00":
return NUMBER_CONSTANT[int(number[0])] + " " + BASE_CONSTANT[1];
else:
return NUMBER_CONSTANT[int(number[0])] + " " + BASE_CONSTANT[1] + " and " + translateNumberToEnglish((number[1:]));
def translateTimeToEnglish(t):
t=t.split(':')
if t[1]!='00':
return translateNumberToEnglish(t[0])+' '+translateNumberToEnglish(t[1])
else:
return translateNumberToEnglish(t[0])+' '+'o\'clock'
def span_typer(s):
if s.isnumeric():
return "number"
if s.find(':')>=0:
s=s.split(':')
if len(s)==2:
if s[0].isnumeric() and s[1].isnumeric():
return "time"
return "none"
def replacer(s):
s=s.replace(' n\'t','n\'t')
s=s.replace(' \'ll','\'ll')
s=s.replace('centre','center')
s=s.replace('-star',' star')
s=s.replace('guesthouse','guest house')
return s
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
Created on Tue Aug 11 17:49:53 2020
@author: truthless
"""
import spacy
from fuzzywuzzy import fuzz
digit2word = {
'0': 'zero', '1': 'one', '2': 'two', '3': 'three', '4': 'four', '5': 'five',
'6': 'six', '7': 'seven', '8': 'eight', '9': 'nine', '10': 'ten', '11': 'eleven',
'12': 'twelve'
}
word2digit = {v:k for k,v in digit2word.items()}
#nlp = spacy.load('en_core_web_sm')
threshold = 55
def digit_normalize(utt_list):
for i, text in enumerate(utt_list):
if text in word2digit:
utt_list[i] = word2digit[text]
return utt_list
def phrase_idx_utt(value_list, utt_list):
utt_list = digit_normalize(utt_list)
candidates = []
l = len(value_list)
for i in [l, l-1, l+1]:
if i == 0:
continue
for j in range(len(utt_list)-i+1):
score = fuzz.ratio(' '.join(utt_list[j:j+i]), ' '.join(value_list))
if score > threshold:
candidates.append((score, j, j+i-1))
return sorted(candidates, key=lambda x:x[0], reverse=True)[0][1:] if candidates else None
def preprocess(utt, da):
'''
utt: str
da: dict {'domain-intent': [slot, value]}
'''
with nlp.disable_pipes('tagger', 'parser'):
tokens = [token.text for token in nlp(utt)]
labels = dict()
for key, pair in da.items():
tags = ["O"] * len(tokens)
slots = []
labels[key] = {'tags':tags, 'slots':slots}
for slot, value in pair:
intent = key.split('-')[1].lower()
if intent in ["request"]:
slots.append(slot)
elif intent in ['inform']:
value_tokens = [token.text for token in nlp(value)]
span = phrase_idx_utt(value_tokens, tokens)
if span is not None:
if slot.lower() in ['name', 'dest', 'depart']:
tokens[span[0]:span[1]+1] = value_tokens
tags[span[0]:span[1]+1] = ["O"] * len(value_tokens)
tags[span[0]] = "B-" + slot
for i in range(span[0]+1, span[0]+len(value_tokens)):
tags[i] = "I-" + slot
else:
#tags[span[0]] = "B-" + da[1] + '-' + da[0] + "+" + da[2]
tags[span[0]] = "B-" + slot
for i in range(span[0]+1, span[1]+1):
#tags[i] = "I-" + da[1] + '-' + da[0] + "+" + da[2]
tags[i] = "I-" + slot
return tokens, labels
from .detection_utils import translateNumberToEnglish,translateTimeToEnglish,span_typer,replacer
import json
from .paraphrase_span_detection import phrase_idx_utt
def span_detect(original_text,new_text,span_list):
#input:original_text,new_text,one span_info [slot,slot,span,start,end]
#output:is_span_found? , is_span_changed? , new span_info [slot,slot,new span,new start,new end]
span=span_list[2].lower()
span=replacer(span)
span_type=span_typer(span)
new_words=new_text.split()
if span_type=="time":
span2=translateTimeToEnglish(span)
if span_type=="number":
span2=translateNumberToEnglish(span)
if span_type=="none":
span2=span
span_changed,span_found=0,0
if new_text.find(span)>=0:
span_changed,span_found=0,1
span_start=new_text.count(' ',0,new_text.find(span))
span_end=span_start+len(span.split())-1
new_span_list=[span_list[0],span_list[1],' '.join(new_words[span_start:span_end+1]),span_start,span_end]
elif new_text.find(span2)>=0:
span_changed,span_found=1,1
span=span2
span_start=new_text.count(' ',0,new_text.find(span))
span_end=span_start+len(span.split())-1
new_span_list=[span_list[0],span_list[1],' '.join(new_words[span_start:span_end+1]),span_start,span_end]
else:
span=span2
span_words=span.split()
result=phrase_idx_utt(span_words,new_words)
if result is not None:
max_start,max_end=result
span_changed,span_found=1,1
new_span_list=[span_list[0],span_list[1],' '.join(new_words[max_start:max_end+1]),max_start,max_end]
else:
origin_split=original_text.split()
new_split=new_words
ok=0
origin_start=span_list[3]-1
if origin_start>=0:
if origin_start-1>=0 and origin_split[origin_start] in ['.',',','?']:
origin_start-=1
start_word=origin_split[origin_start]
for start in range(len(new_split)):
if new_split[start]==start_word:
break
start+=1
else:
start=0
if span_list[4]+1<len(origin_split) and start<len(new_split):
end_word=origin_split[span_list[4]+1]
if end_word not in ['.',',','?']:
if span_list[4]+1<len(origin_split):
end_word=origin_split[span_list[4]+1]
for end in range(start,len(new_split)):
if new_split[end]==end_word:
ok=1
break
end-=1
else:
if span_list[4]+2<len(origin_split):
end_word=origin_split[span_list[4]+2]
for end in range(start,len(new_split)):
if new_split[end]==end_word:
ok=1
break
end-=1
else:
ok=1
end=len(new_split)-1
else:
ok=1
end=len(new_split)-1
if start<=end and ok==1:
span_changed,span_found=1,1
new_span_list=[span_list[0],span_list[1],' '.join(new_words[start:end+1]),start,end]
if span_found==0:
new_span_list=[span_list[0],span_list[1],span_list[2],0,0]
return new_span_list
# Text Paraphrasing
We applied SC-GPT to paraphrase the sentences. Code of SC-GPT is under `LAUG/nlg/` dir.
# -*- coding: utf-8 -*-
from convlab2.nlg.scgpt.multiwoz.scgpt import SCGPT
from convlab2.laug.Text_Paraphrasing.utils import span2tuple,paraphrase_span_detection
class Text_Paraphrasing:
def __init__(self,dataset='multiwoz'):
if dataset=='multiwoz':
self.model=SCGPT()
if dataset=='frames':
self.model=SCGPT(model_file='https://convlab.blob.core.windows.net/convlab-2/nlg-gpt-frames.zip')
self.model.init_session()
def aug(self,text,span_info):
t=span2tuple(span_info)
new_text = self.model.generate(t)
new_span_info = paraphrase_span_detection(new_text,span_info)
return new_text, new_span_info
if __name__=="__main__":
text = "I want a train to Cambridge"
span_info = [["Train-Infrom","Dest","Cambridge",5,5]]
TP = Text_Paraphrasing()
new_text,new_span_info = TP.aug(text,span_info)
print(new_text)
print(new_span_info)
# -*- coding: utf-8 -*-
\ No newline at end of file
# -*- coding: utf-8 -*-
from convlab2.util.multiwoz.paraphrase_span_detection import phrase_idx_utt
def paraphrase_span_detection(new_text,span_info):
new_words=new_text.split()
new_span_info=[]
for span in span_info:
span_words=span[2].split()
result=phrase_idx_utt(span_words,new_words)
if result is not None:
max_start,max_end=result
new_span_info.append([span[0],span[1],' '.join(new_words[max_start:max_end+1]),max_start,max_end])
return new_span_info
def span2tuple(span_info):
t=[]
for span in span_info:
t.append((span[0].split('-')[1],span[0].split('-')[0],span[1],span[2]))
return t
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment