import torch
import json
import logging
import numpy as np
from datasets import Dataset
from transformers import (
AutoModelForMultipleChoice,
AutoTokenizer,
AutoModelForQuestionAnswering,
AutoConfig,
default_data_collator,
DataCollatorWithPadding,
HfArgumentParser,
TrainingArguments,
Trainer
)
from pathlib import Path
from itertools import chain
from dataclasses import dataclass, field
from transformers.tokenization_utils_base import PreTrainedTokenizerBase
from typing import Optional, Union
from transformers.utils import PaddingStrategy
logger = logging.getLogger(__name__)
@dataclass
class DataCollatorForMultipleChoice:
tokenizer: PreTrainedTokenizerBase
padding: Union[bool, str, PaddingStrategy] = True
max_length: Optional[int] = None
pad_to_multiple_of: Optional[int] = None
def __call__(self, features):
## features only bring in ['input_id', 'token_type_id', 'attention_mask']
d = {}
for key in features[0].keys(): ## transfer list to dict
if key != 'label':
d[key] = torch.tensor([feature[key] for feature in features])
else:
d['labels'] = torch.tensor([feature[key] for feature in features], dtype=torch.int64)
return d
## check point 8000
model_name = '.\\output'
model_name_qa = 'bert-base-chinese'
dataset_path = './ntu-adl-hw2-fall-2022'
do_train, do_eval = (False, False)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
training_args = (TrainingArguments(output_dir=model_name, fp16=True))
config = AutoConfig.from_pretrained(
model_name
)
model = AutoModelForMultipleChoice.from_pretrained(
model_name,
config=config
)
tokenizer = AutoTokenizer.from_pretrained(
model_name
)
with open(Path(dataset_path) / 'context.json', 'r', encoding='utf-8') as reader:
context = json.load(reader)
context = [{'id': i, 'context': ins} for i, ins in enumerate(context)]
with open(Path(dataset_path) / 'train.json', 'r', encoding='utf-8') as reader:
train = json.load(reader)
with open(Path(dataset_path) / 'valid.json', 'r', encoding='utf-8') as reader:
dev = json.load(reader)
train_dataset, dev_dataset = Dataset.from_list(train), Dataset.from_list(dev)
context_dataset = Dataset.from_list(context)
def preprocess_function(examples):
## dataset: [id, question, paragraphs, relevant, answer]
real_paragraphs = [[context_dataset[choice]['context'] for choice in choices] for choices in examples['paragraphs']]
flattened_paragraphs = list(chain(*real_paragraphs)) ## 1-1 1-2 1-3 1-4 2-1 2-2 2-3 2-4
questions = [[context] * 4 for context in examples['question']] ## 1 1 1 1 2 22 2 3 3 3 3 4 4 44 ..
flattened_questions = list(chain(*questions))
tokenized_examples = tokenizer(
flattened_questions,
flattened_paragraphs,
padding=True,
truncation='only_second',
# max_length=512,
add_special_tokens=True,
return_tensors='pt'
)
r = {
k: [v[i : i+4] for i in range(0, len(v), 4)] for k, v in tokenized_examples.items()
}
r['label'] = [choices.index(rel) for choices, rel in zip(examples['paragraphs'], examples['relevant'])]
return r
## finished pre-processing dataset... i think we can train model now.
if do_train == True:
with training_args.main_process_first(desc='pre-processing'):
train_dataset = train_dataset.map(
preprocess_function,
batched=True,
)
if do_eval == True:
with training_args.main_process_first(desc="validation pre-processing"):
dev_dataset = dev_dataset.map(
preprocess_function,
batched=True,
)
data_collator = (DataCollatorForMultipleChoice(tokenizer=tokenizer)) ## THIS IS A PIPELINE!!
def compute_metrics(eval_predictions):
predictions, label_ids = eval_predictions
preds = np.argmax(predictions, axis=1)
return {"accuracy": (preds == label_ids).astype(np.float32).mean().item()}
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=dev_dataset,
tokenizer=tokenizer,
data_collator=data_collator,
compute_metrics=compute_metrics,
)
if do_train:
checkpoint = None
train_result = trainer.train()
trainer.save_model()
metrics = train_result.metrics
trainer.log_metrics("train", metrics)
trainer.save_metrics("train", metrics)
trainer.save_state()
if do_eval:
logger.info("*** Evaluate ***")
metrics = trainer.evaluate()
max_eval_samples = len(dev_dataset)
metrics["eval_samples"] = min(max_eval_samples, len(dev_dataset))
trainer.log_metrics("eval", metrics)
trainer.save_metrics("eval", metrics)
kwargs = dict(
finetuned_from=model_name,
tasks="multiple-choice",
dataset_tags="swag",
dataset_args="regular",
dataset="SWAG",
language="en",
)
trainer.create_model_card(**kwargs)
inference = trainer.predict(train_dataset) ## WE GET {label_ids: [1, 1, 3 ...]}
_, label_ids, _ = inference
training_args = (TrainingArguments(output_dir=model_name, fp16=True))
config_qa = AutoConfig.from_pretrained(
model_name_qa
)
model_qa = AutoModelForQuestionAnswering.from_pretrained(
model_name_qa,
config=config_qa
)
tokenizer_qa = AutoTokenizer.from_pretrained(
model_name_qa
)
train_dataset, dev_dataset = Dataset.from_list(train), Dataset.from_list(dev)
context_dataset = Dataset.from_list(context)
#['id', 'question', 'paragraphs', 'relevant', 'answer']
question_column_name = "question"
context_column_name = "context"
answer_column_name = "answer"
def preprocessing_qa(examples):
# print(examples.column_names)
label_ids = [choices.index(rel) for choices, rel in zip(examples['paragraphs'], examples['relevant'])]
real_paragraphs = [[context_dataset[choices[index]]['context']] for index, choices in zip(label_ids, dev_dataset['paragraphs'])]
flattened_paragraphs = list(chain(*real_paragraphs))
answers = {}
for key in dev_dataset['answer'][0].keys():
answers[key] = [item[key] for item in dev_dataset['answer']]
tokenized_examples = tokenizer_qa(
examples['question'],
flattened_paragraphs,
padding='max_length',
truncation='only_second',
max_length=512,
stride=128,
return_overflowing_tokens=True,
return_offsets_mapping=True,
add_special_tokens=True,
# return_tensors='pt'
)
sample_mapping = tokenized_examples.pop("overflow_to_sample_mapping")
# The offset mappings will give us a map from token to character position in the original context. This will
# help us compute the start_positions and end_positions.
offset_mapping = tokenized_examples.pop("offset_mapping")
# Let's label those examples!
tokenized_examples["start_positions"] = []
tokenized_examples["end_positions"] = []
for i, offsets in enumerate(offset_mapping):
# We will label impossible answers with the index of the CLS token.
input_ids = tokenized_examples["input_ids"][i]
cls_index = input_ids.index(tokenizer_qa.cls_token_id)
# Grab the sequence corresponding to that example (to know what is the context and what is the question).
sequence_ids = tokenized_examples.sequence_ids(i)
# One example can give several spans, this is the index of the example containing this span of text.
sample_index = sample_mapping[i]
answers = examples[answer_column_name][sample_index]
# If no answers are given, set the cls_index as answer.
if len(answers["start"]) == 0:
tokenized_examples["start_positions"].append(cls_index)
tokenized_examples["end_positions"].append(cls_index)
else:
# Start/end character index of the answer in the text.
start_char = answers["start"][0]
end_char = start_char + len(answers["text"][0])
# Start token index of the current span in the text.
token_start_index = 0
while sequence_ids[token_start_index] != 1:
token_start_index += 1
# End token index of the current span in the text.
token_end_index = len(input_ids) - 1
while sequence_ids[token_end_index] != 1:
token_end_index -= 1
# Detect if the answer is out of the span (in which case this feature is labeled with the CLS index).
if not (offsets[token_start_index][0] <= start_char and offsets[token_end_index][1] >= end_char):
tokenized_examples["start_positions"].append(cls_index)
tokenized_examples["end_positions"].append(cls_index)
else:
# Otherwise move the token_start_index and token_end_index to the two ends of the answer.
# Note: we could go after the last offset if the answer is the last word (edge case).
while token_start_index < len(offsets) and offsets[token_start_index][0] <= start_char:
token_start_index += 1
tokenized_examples["start_positions"].append(token_start_index - 1)
while offsets[token_end_index][1] >= end_char:
token_end_index -= 1
tokenized_examples["end_positions"].append(token_end_index + 1)
r = {k: v for k, v in tokenized_examples.items()}
for key in answers:
r[key] = answers[key]
return r
if do_train != True:
with training_args.main_process_first(desc='pre-processing'):
train_dataset = train_dataset.map(
preprocessing_qa,
batched=True,
)
if do_eval == True:
with training_args.main_process_first(desc="validation pre-processing"):
dev_dataset = dev_dataset.map(
preprocessin_qa,
batched=True,
)
data_collator = (DataCollatorWithPadding(tokenizer_qa, pad_to_multiple_of=8 if training_args.fp16 else None)) ## THIS IS A PIPELINE!!
def compute_metrics(eval_predictions):
predictions, label_ids = eval_predictions
preds = np.argmax(predictions, axis=1)
return {"accuracy": (preds == label_ids).astype(np.float32).mean().item()}
trainer_qa = Trainer(
model=model_qa,
args=training_args,
train_dataset=train_dataset,
eval_dataset=dev_dataset,
tokenizer=tokenizer_qa,
data_collator=data_collator,
compute_metrics=compute_metrics,
)
if do_train:
checkpoint = None
train_result = trainer.train()
trainer.save_model()
metrics = train_result.metrics
trainer.log_metrics("train", metrics)
trainer.save_metrics("train", metrics)
trainer.save_state()
if do_eval:
logger.info("*** Evaluate ***")
metrics = trainer.evaluate()
max_eval_samples = len(dev_dataset)
metrics["eval_samples"] = min(max_eval_samples, len(dev_dataset))
trainer.log_metrics("eval", metrics)
trainer.save_metrics("eval", metrics)
kwargs = dict(
finetuned_from=model_name,
tasks="multiple-choice",
dataset_tags="swag",
dataset_args="regular",
dataset="SWAG",
language="en",
)
trainer.create_model_card(**kwargs)
# paragraphs = [dev_dataset['paragraphs'][i][index] for i, index in enumerate(label_ids)]