ERC - CoMPM 모델 논문 구현
url: https://aclanthology.org/2022.naacl-main.416.pdf
Context Modeling with Speaker’s Pre-trained Memory
핵심: 대화 속 감정 인식 task에서 Common sense knowledge와 같은 외부 데이터 셋을 활용하는 것이 제한될 경우(한글과 같이 관련 데이터가 부족할 경우) CSK를 이용하지 않고도 PM이라는 기법을 통해 CSK를 사용한 경우에 못지 않은 성능을 도출할 수 있다고 주장한다.
Dataset
활용 데이터셋: MELD Dataset
- Friends 드라마 대본이며 Dialogue_ID로 대화(상황)가 구분되어 있다.
- 하나의 Dialogue 내에서 마지막 발화에 대한 감성을 학습하는 방식을 취하되 이전 발화를 참조하도록 한다.
Preprocess
Dialogue별로 speaker, utterance, emotion 정보를 활용한다.
- Speaker에 대한 label encoding을 진행하되 Dialogue_ID 단위로 적용
- Dialogue별로 speaker, utterance, emotion을 하나의 리스트로 묶어 session으로 정의
- 하나의 Dialogue 내 각 세션이 단계적으로 누적되는 형태로 재구성
- session_dataset = [ [발화1], [발화1, 발화2], [발화1, 발화2, 발화3],..., [발화1, 발화2, 발화3, 발화4, 발화5] ]
코드 참조
class CustomDataset(Dataset):
def __init__(self, data_path):
self.session_dataset = self.preprocess(data_path)
self.labels = ['anger', 'disgust', 'fear', 'joy', 'neutral', 'sadness', 'surprise']
self.tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
def __len__(self):
return len(self.session_dataset)
def __getitem__(self, idx):
return self.session_dataset[idx]
def preprocess(self, data_path):
"""preprocessing csv files
"""
'''
1. Speaker에 대한 label encoding을 진행하되 Dialogue_ID 단위로 적용
2. Dialogue별로 speaker, utterance, emotion을 하나의 리스트로 묶음
3. 하나의 Dialogue 내 발화가 단계적으로 누적되도록 데이터 구성 (아래 참조)
result:[
[
[0,
'also I was the point person on my company’s transition from the KL-5 to GR-6 system.',
'neutral']],
[
[0,
'also I was the point person on my company’s transition from the KL-5 to GR-6 system.',
'neutral'],
[1, 'You must’ve had your hands full.', 'neutral']],
[
[0,
'also I was the point person on my company’s transition from the KL-5 to GR-6 system.',
'neutral'],
[1, 'You must’ve had your hands full.', 'neutral'],
[0, 'That I did. That I did.', 'neutral']],
[
[0,
'also I was the point person on my company’s transition from the KL-5 to GR-6 system.',
'neutral'],
[1, 'You must’ve had your hands full.', 'neutral'],
[0, 'That I did. That I did.', 'neutral'],
[1, 'So let’s talk a little bit about your duties.', 'neutral']],
[...
]
'''
datasets = []
# Load dataset
raw_data = pd.read_csv(data_path)
sep_data = raw_data[['Speaker', 'Utterance', 'Emotion', 'Dialogue_ID']].copy()
# Dialogue_ID별 화자 넘버링
start = -1
speaker_to_num = []
for idx in sep_data.index:
# initialize
if sep_data.loc[idx, 'Dialogue_ID'] != start:
start += 1
speaker_num = 0
speaker_list = []
speaker = sep_data.loc[idx, 'Speaker']
if speaker not in speaker_list:
speaker_to_num.append(speaker_num)
speaker_num += 1
speaker_list.append(speaker)
else:
speaker_to_num.append(speaker_list.index(speaker))
sep_data['Speaker'] = speaker_to_num
# Dialogue기준으로 화자, 발화, 감정 데이터를 groupby
grouped_by_dialogue = sep_data.groupby(['Dialogue_ID'], as_index=True)
def func(x):
"""화자, 발화, 감정을 하나의 리스트에 담아 session으로 정의한다."""
result = x.apply(lambda x: [x['Speaker'], x['Utterance'], x['Emotion']], axis=1)
return result
dialogue_session = grouped_by_dialogue.apply(func) # dialogue와 session번호로 구분된 multi-index
'''dialogue_session result (실제 결과에서는 Speaker의 이름 대신 label 번호가 적용된다.)
Dialogue_ID
0 0 [Chandler, also I was the point person on my c...
1 [The Interviewer, You must’ve had your hands f...
2 [Chandler, That I did. That I did., neutral]
3 [The Interviewer, So let’s talk a little bit a...
4 [Chandler, My duties? All right., surprise]
...
1038 9984 [Chandler, You or me?, neutral]
9985 [Ross, I got it. Uh, Joey, women don't have Ad...
9986 [Joey, You guys are messing with me, right?, s...
9987 [All, Yeah., neutral]
9988 [Joey, That was a good one. For a second there...
'''
session_datasets = []
for idx in raw_data['Dialogue_ID'].unique():
session_datasets.append(dialogue_session[idx].tolist())
# 세션 내 대화를 연이어 붙이는 작업 진행
for sess in session_datasets:
length = len(sess)
for i in range(1, length+1):
datasets.append(sess[:i])
return datasets
def collate_fn(self, sessions):
'''
PM 만들기
: PM 데이터는 하나의 세션에서 예측하고자 하는 마지막 utt의 감정과 동일한 화자의 이전 utt를 따로 뽑아둔다.
아래 예시는 6번째 세션에서 최종 화자의 이전 발화를 담은 케이스를 의미한다.
0 = cls token, 2 = sep token
[
[],
[],
[],
[],
[],
[tensor([[ 0, 1793, 328, 1793, 6, 52, 115, 213, 7, 5,
827, 6, 593, 84, 2349, 8, 847, 106, 160, 23,
5, 1300, 4, 2, 370, 17, 27, 241, 10, 16333,
328, 2]])],
[],...
]
CoM 만들기
: 각 세션에서 뽑은 모든 utt를 보관
tensor([[ 0, 1793, 328, 1793, 6, 52, 115, 213, 7, 5,
827, 6, 593, 84, 2349, 8, 847, 106, 160, 23,
5, 1300, 4, 2, 370, 17, 27, 241, 10, 16333,
328, 2]])
'''
max_seq_len = self.tokenizer.model_max_length
batch_input = []
batch_labels = []
batch_PM_input = []
for session in sessions:
input_sep = self.tokenizer.cls_token # tokenizer.cls_token:str = '<s>'
curr_speaker, curr_utt, curr_emotion = session[-1] # current는 세션 내 마지막 화자, 발화, 감정 정보를 의미한다.
PM_input = []
for i, line in enumerate(session):
speaker, utt, emotion = line
input_sep += " " + utt + self.tokenizer.sep_token # tokenizer.sep_token:str = '</s>'
if i < len(session)-1 and speaker == curr_speaker: # curr화자의 이전 발화가 존재할 경우를 의미
# PM에 대한 Roberta tokenize (직접 cls token과 sep token을 붙여주므로 add_special_tokens를 False로 설정)
PM_input.append(self.tokenizer.encode(utt, add_special_tokens=False, return_tensors='pt'))
batch_PM_input.append(PM_input)
batch_input.append(input_sep)
batch_labels.append(self.labels.index(emotion)) # 각 세션의 마지막 화자의 감정만 label로 적용
# CoM Roberta tokenize (padding True로 설정하면 각 배치 내 가장 긴 utt에 대해 패딩을 진행한다.)
tokenized_utt = self.tokenizer(batch_input, add_special_tokens=False, return_tensors='pt', max_length=max_seq_len, truncation=True, padding=True)
batch_input_ids = tokenized_utt['input_ids']
batch_attention_mask = tokenized_utt['attention_mask']
return batch_input_ids, batch_attention_mask, batch_PM_input, torch.tensor(batch_labels)
Model Architecture
CoM(context module)
- PLM(Roberta)에 모든 대화의 발화(utterance)를 fine-tuning한다.
- $S_A$, $S_B$… 는 Speaker를 의미하며 각 발화를 화자에 따라 구분지음을 의미한다.
- 실제 Speaker의 정보가 들어가는 것은 아니며 special token으로 발화를 구분한다.
PM(pre-trained memory module)
- 대화에서 예측 대상인 최종 발화를 한 Speaker가 이전에 발화한 내용을 Tracking하여 PLM(Roberta)에 fine-tuning하여 External Knowledge로 활용한다.
- 전체 대화에서 최종 발화한 Speaker의 이전 발화가 여러 개일 경우가 발생하는데, 최종 발화와 가까운 발화일수록 참조의 비중을 높게 두도록 undirectional GRU 모델을 추가적으로 활용한다.
- GRU는 LSTM의 advanced 버전과 같은 성격을 띄고 있는 RNN 모델로서, 먼저 입력되는 발화가 나중에 입력되는 발화보다 참조를 덜하게 되는 장기의존성 문제를 활용한 것으로 추측된다. (논문에서는 GRU 사용에 대한 구체적인 이유 설명을 하지 않고 있음)
코드 참조
class ERCModel(nn.Module):
def __init__(self, num_class, device):
super().__init__() # 미 입력 시 에러 발생
self.com_model = RobertaModel.from_pretrained('roberta-base')
self.PM_model = RobertaModel.from_pretrained('roberta-base')
'''GRU 모델 세팅 (참조: <https://pytorch.org/docs/stable/generated/torch.nn.GRU.html>)
- GRU 모델의 input, ouput dimension은 roberta의 hidden size를 따른다.
- input size (seq_len, bs, h)를 입력으로 받고, output size (num_layers, bs, h)를 출력한다.
- bidirectional=True로 설정된다면 num_layers * 2를 해야하지만 논문에서 제안한 대로. 여기서는 False로 설정한다.
'''
self.hidden_dim = self.com_model.config.hidden_size # roberta의 hidden_dim (768)
self.h_0 = torch.zeros(size=(2, 1, self.hidden_dim)).to(device) # (num_layers * num_directions, batch, hidden_size)
self.speakerGRU = nn.GRU(self.hidden_dim, self.hidden_dim, 2, dropout=0.3) # (input, hidden, num_layer) (BERT_hidden_size, BERT_hidden_size, num_layer)
# classification
self.W = nn.Sequential(
nn.Linear(self.hidden_dim, num_class),
nn.Softmax(dim=-1)
)
def forward(self, data):
input_ids = data[0].to(device)
attention_mask = data[1].to(device)
PM_input = data[2] # PM은 리스트 내 여러 utt 텐서가 존재하는 형태이므로 to(device)를 개별적으로 적용해야 한다.
labels = data[3].to(device)
# CoM_model의 cls 토큰 확보
com_cls_output = self.com_model(input_ids, attention_mask)['last_hidden_state'][:, 0, :]
'''PM 데이터는 각 세션별로 존재하지 않는 경우부터 다량 존재하는 경우로 나뉜다.
PM 데이터가 없을 경우 0값의 행렬을, 있을 경우 각 utt를 roberta 모델에 학습시킨 뒤 GRU 모델로 추가 학습시킨다.
'''
pm_gru_final = []
for utts in PM_input:
if utts: # 현재 세션의 PM tensor가 존재한다면
pm_cls_output = []
for utt in utts:
cls_output = self.PM_model(utt.to(device))['last_hidden_state'][:, 0, :] # input_ids만 넣을 경우 attention mask는 자동 1로 채워진다.
pm_cls_output.append(cls_output)
pm_output = torch.cat(pm_cls_output, 0).unsqueeze(1) # (speaker_num, batch=1, hidden_dim)
pm_gru_output, _ = self.speakerGRU(pm_output, self.h_0)
pm_gru_final.append(pm_gru_output[-1,:,:]) # (1(bs), hidden_dim) 마지막 uttr token에 대해서만 가져간다.
else: # 존재하지 않을 경우
pm_gru_final.append(torch.zeros(1, self.hidden_dim).to(device)) # PM tensor가 비어있다면 0값 차원을 맞춰 보낸다.
pm_gru_final = torch.cat(pm_gru_final, 0)
# CoM과 PM을 element-wise sum한 뒤 label 차원으로 축소 후 softmax
final_output = self.W(com_cls_output + pm_gru_final)
return final_output, labels
Train
Training Setup
Model: Roberta-base or large(huggingface library), undirectional GRU(dropout=0.3)
Loss: CrossEntropyLoss
Optimizer: AdamW(lr=1e-05)
Learning scheduler: get_linear_schedule_with_warmup, clip_grad_norm_(max=10)
Evaluation: Weighted avg F1-score
*All experiments are conducted on one V100 GPU with 32GB memory.(논문 기준)
코드 참조
def train_one_epoch(model, dataloader, loss_fn, optimizer, scheduler, max_grad_norm, device):
# model 학습
model.train()
# train_loss
train_loss = 0
for data in tqdm(dataloader, leave=False):
pred, target = model(data)
loss = loss_fn(pred, target)
# 역전파 과정
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
train_loss += loss.item() # 최종 평균을 구하기 위해 전부 더한다.
train_loss /= len(dataloader) # epoch 당 평균 train_loss
return train_loss
def valid_one_epoch(model, dataloader, loss_fn, device):
# model 평가
model.eval()
# valid_loss
valid_loss = 0
pred_list, target_list = [], []
with torch.no_grad():
for data in tqdm(dataloader, leave=False):
pred, target = model(data)
loss = loss_fn(pred, target)
valid_loss += loss.item() # 최종 평균을 구하기 위해 전부 더한다.
# evaluation
pred_label = pred.argmax(1).item()
target_label = target.item()
pred_list.append(pred_label)
target_list.append(target_label)
valid_loss /= len(dataloader) # epoch 당 평균 valid_loss
valid_precision, valid_recall, valid_f1_score, _ = precision_recall_fscore_support(target_list, pred_list, average='weighted', zero_division=0)
'''Notes (precision_recall_fscore_support)
When true positive + false positive == 0, precision is undefined.
When true positive + false negative == 0, recall is undefined.
In such cases, by default the metric will be set to 0, as will f-score,
and UndefinedMetricWarning will be raised. This behavior can be modified with zero_division.
-> precision을 구할 때 pred_list에 일부 target label이 포함되지 않은 경우 (가령 sadness를 한 번도 예측하지 않은 경우)
precision 연산에서 sadness label에 대한 결과를 0으로 처리한다. 이는 recall과 f1 score도 동일하다.
zero_division 인자의 default값은 ”warn”으로 되어 있어 위 경우 UndefinedMetricWarning 메시지가 출력되고 0으로 처리하는데,
경고창을 보지 않기 위해 해당 인자를 0으로 설정한다.
'''
return valid_loss, valid_f1_score, pred_list, target_list
def save_model(model, path='./best_model'):
"""save model parameters
"""
if not os.path.exists(path):
os.makedirs(path)
torch.save(model.state_dict(), os.path.join(path, 'model.bin'))
Run
학습 순서는 아래와 같다.
logging과 하이퍼 파라미터, 데이터셋 준비를 편의상 같은 run 함수에 하드코딩 했으나 실제로는 따로 관리되어야 한다.
코드참조
# 데이터 경로 리스트
data_path = "./MELD/data/MELD/*.csv"
data_path_list = glob.glob(data_path)
data_path_list.sort(reverse=True)
'''
['./MELD/data/MELD/train_sent_emo.csv',
'./MELD/data/MELD/test_sent_emo.csv',
'./MELD/data/MELD/dev_sent_emo.csv']
'''
device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
def run():
# ---------------------------------------
# NOTE: utils.py get_logger 함수로 따로 정의할 것
import logging
# 로그 생성
logger = logging.getLogger()
# 로그의 출력 기준 설정
logger.setLevel(logging.INFO)
# log 출력
stream_handler = logging.StreamHandler()
logger.addHandler(stream_handler)
# log를 파일에 출력
file_handler = logging.FileHandler('model.log')
logger.addHandler(file_handler)
# NOTE: 하이퍼 파라미터는 yaml파일로 관리할 것
epochs = 1
num_labels = 7
batch_size = 1
max_grad_norm = 10
lr = 1e-05
eps = 1e-08
shuffle = False
# ---------------------------------------
# Load dataset
train_dataset = CustomDataset(data_path_list[0])
valid_dataset = CustomDataset(data_path_list[2])
# test_dataset = CustomDataset(data_path_list[1])
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle, collate_fn=train_dataset.collate_fn)
valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=shuffle, collate_fn=valid_dataset.collate_fn)
# test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=shuffle, collate_fn=test_dataset.collate_fn)
model = ERCModel(num_labels, device).to(device)
loss_fn = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr, eps=eps) # weight_decay=0.01
# scheduler
num_training_steps = len(train_dataset)*epochs
num_warmup_steps = len(train_dataset)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=num_warmup_steps, num_training_steps=num_training_steps)
logger.info("=====Start Learning=====")
best_score = 0
train_loss_hs, valid_loss_hs = [], [] # for visualize
valid_f1_score_hs = [] # for visualize
# run train, validation
for epoch in tqdm(range(epochs)):
train_loss = train_one_epoch(model=model, dataloader=train_dataloader, loss_fn=loss_fn,
optimizer=optimizer, scheduler=scheduler, max_grad_norm=max_grad_norm, device=device)
valid_loss, valid_f1_score, pred_list, target_list = valid_one_epoch(model=model, dataloader=valid_dataloader,
loss_fn=loss_fn, device=device)
# for visualize
train_loss_hs.append(train_loss)
valid_loss_hs.append(valid_loss)
valid_f1_score_hs.append(valid_f1_score)
logger.info(f"Epoch:{epoch}, w-avg f1-score: {valid_f1_score:.6f}")
# update best score & save model
if valid_f1_score > best_score:
best_score = valid_f1_score
save_model(model)
return train_loss_hs, valid_loss_hs, valid_f1_score_hs, pred_list, target_list
train_loss_hs, valid_loss_hs, valid_f1_score_hs, pred_list, target_list = run()
'데이터사이언스 이론 공부' 카테고리의 다른 글
하이퍼 파라미터 튜닝 구현하기 - Random Forest(랜덤포레스트) (0) | 2023.03.15 |
---|---|
DNN(Deep Neural Network) 구현하기 with numpy(2) - Initialize (1) | 2023.01.19 |
DNN(Deep Neural Network) 구현하기 with numpy(1) - Preprocess (0) | 2023.01.16 |
GPT3 모델의 대한 간략한 정리 (0) | 2023.01.09 |
BERT의 파생모델 [DistilBERT] (0) | 2022.12.30 |