import torch import json import logging import numpy as np from datasets import Dataset from transformers import ( AutoModelForMultipleChoice, AutoTokenizer, AutoModelForQuestionAnswering, AutoConfig, default_data_collator, DataCollatorWithPadding, HfArgumentParser, TrainingArguments, Trainer ) from pathlib import Path from itertools import chain from dataclasses import dataclass, field from transformers.tokenization_utils_base import PreTrainedTokenizerBase from typing import Optional, Union from transformers.utils import PaddingStrategy logger = logging.getLogger(__name__) @dataclass class DataCollatorForMultipleChoice: tokenizer: PreTrainedTokenizerBase padding: Union[bool, str, PaddingStrategy] = True max_length: Optional[int] = None pad_to_multiple_of: Optional[int] = None def __call__(self, features): ## features only bring in ['input_id', 'token_type_id', 'attention_mask'] d = {} for key in features[0].keys(): ## transfer list to dict if key != 'label': d[key] = torch.tensor([feature[key] for feature in features]) else: d['labels'] = torch.tensor([feature[key] for feature in features], dtype=torch.int64) return d ## check point 8000 model_name = '.\\output' model_name_qa = 'bert-base-chinese' dataset_path = './ntu-adl-hw2-fall-2022' do_train, do_eval = (False, False) device = 'cuda' if torch.cuda.is_available() else 'cpu' training_args = (TrainingArguments(output_dir=model_name, fp16=True)) config = AutoConfig.from_pretrained( model_name ) model = AutoModelForMultipleChoice.from_pretrained( model_name, config=config ) tokenizer = AutoTokenizer.from_pretrained( model_name ) with open(Path(dataset_path) / 'context.json', 'r', encoding='utf-8') as reader: context = json.load(reader) context = [{'id': i, 'context': ins} for i, ins in enumerate(context)] with open(Path(dataset_path) / 'train.json', 'r', encoding='utf-8') as reader: train = json.load(reader) with open(Path(dataset_path) / 'valid.json', 'r', encoding='utf-8') as reader: dev = json.load(reader) train_dataset, dev_dataset = Dataset.from_list(train), Dataset.from_list(dev) context_dataset = Dataset.from_list(context) def preprocess_function(examples): ## dataset: [id, question, paragraphs, relevant, answer] real_paragraphs = [[context_dataset[choice]['context'] for choice in choices] for choices in examples['paragraphs']] flattened_paragraphs = list(chain(*real_paragraphs)) ## 1-1 1-2 1-3 1-4 2-1 2-2 2-3 2-4 questions = [[context] * 4 for context in examples['question']] ## 1 1 1 1 2 22 2 3 3 3 3 4 4 44 .. flattened_questions = list(chain(*questions)) tokenized_examples = tokenizer( flattened_questions, flattened_paragraphs, padding=True, truncation='only_second', # max_length=512, add_special_tokens=True, return_tensors='pt' ) r = { k: [v[i : i+4] for i in range(0, len(v), 4)] for k, v in tokenized_examples.items() } r['label'] = [choices.index(rel) for choices, rel in zip(examples['paragraphs'], examples['relevant'])] return r ## finished pre-processing dataset... i think we can train model now. if do_train == True: with training_args.main_process_first(desc='pre-processing'): train_dataset = train_dataset.map( preprocess_function, batched=True, ) if do_eval == True: with training_args.main_process_first(desc="validation pre-processing"): dev_dataset = dev_dataset.map( preprocess_function, batched=True, ) data_collator = (DataCollatorForMultipleChoice(tokenizer=tokenizer)) ## THIS IS A PIPELINE!! def compute_metrics(eval_predictions): predictions, label_ids = eval_predictions preds = np.argmax(predictions, axis=1) return {"accuracy": (preds == label_ids).astype(np.float32).mean().item()} trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=dev_dataset, tokenizer=tokenizer, data_collator=data_collator, compute_metrics=compute_metrics, ) if do_train: checkpoint = None train_result = trainer.train() trainer.save_model() metrics = train_result.metrics trainer.log_metrics("train", metrics) trainer.save_metrics("train", metrics) trainer.save_state() if do_eval: logger.info("*** Evaluate ***") metrics = trainer.evaluate() max_eval_samples = len(dev_dataset) metrics["eval_samples"] = min(max_eval_samples, len(dev_dataset)) trainer.log_metrics("eval", metrics) trainer.save_metrics("eval", metrics) kwargs = dict( finetuned_from=model_name, tasks="multiple-choice", dataset_tags="swag", dataset_args="regular", dataset="SWAG", language="en", ) trainer.create_model_card(**kwargs) inference = trainer.predict(train_dataset) ## WE GET {label_ids: [1, 1, 3 ...]} _, label_ids, _ = inference training_args = (TrainingArguments(output_dir=model_name, fp16=True)) config_qa = AutoConfig.from_pretrained( model_name_qa ) model_qa = AutoModelForQuestionAnswering.from_pretrained( model_name_qa, config=config_qa ) tokenizer_qa = AutoTokenizer.from_pretrained( model_name_qa ) train_dataset, dev_dataset = Dataset.from_list(train), Dataset.from_list(dev) context_dataset = Dataset.from_list(context) #['id', 'question', 'paragraphs', 'relevant', 'answer'] question_column_name = "question" context_column_name = "context" answer_column_name = "answer" def preprocessing_qa(examples): # print(examples.column_names) label_ids = [choices.index(rel) for choices, rel in zip(examples['paragraphs'], examples['relevant'])] real_paragraphs = [[context_dataset[choices[index]]['context']] for index, choices in zip(label_ids, dev_dataset['paragraphs'])] flattened_paragraphs = list(chain(*real_paragraphs)) answers = {} for key in dev_dataset['answer'][0].keys(): answers[key] = [item[key] for item in dev_dataset['answer']] tokenized_examples = tokenizer_qa( examples['question'], flattened_paragraphs, padding='max_length', truncation='only_second', max_length=512, stride=128, return_overflowing_tokens=True, return_offsets_mapping=True, add_special_tokens=True, # return_tensors='pt' ) sample_mapping = tokenized_examples.pop("overflow_to_sample_mapping") # The offset mappings will give us a map from token to character position in the original context. This will # help us compute the start_positions and end_positions. offset_mapping = tokenized_examples.pop("offset_mapping") # Let's label those examples! tokenized_examples["start_positions"] = [] tokenized_examples["end_positions"] = [] for i, offsets in enumerate(offset_mapping): # We will label impossible answers with the index of the CLS token. input_ids = tokenized_examples["input_ids"][i] cls_index = input_ids.index(tokenizer_qa.cls_token_id) # Grab the sequence corresponding to that example (to know what is the context and what is the question). sequence_ids = tokenized_examples.sequence_ids(i) # One example can give several spans, this is the index of the example containing this span of text. sample_index = sample_mapping[i] answers = examples[answer_column_name][sample_index] # If no answers are given, set the cls_index as answer. if len(answers["start"]) == 0: tokenized_examples["start_positions"].append(cls_index) tokenized_examples["end_positions"].append(cls_index) else: # Start/end character index of the answer in the text. start_char = answers["start"][0] end_char = start_char + len(answers["text"][0]) # Start token index of the current span in the text. token_start_index = 0 while sequence_ids[token_start_index] != 1: token_start_index += 1 # End token index of the current span in the text. token_end_index = len(input_ids) - 1 while sequence_ids[token_end_index] != 1: token_end_index -= 1 # Detect if the answer is out of the span (in which case this feature is labeled with the CLS index). if not (offsets[token_start_index][0] <= start_char and offsets[token_end_index][1] >= end_char): tokenized_examples["start_positions"].append(cls_index) tokenized_examples["end_positions"].append(cls_index) else: # Otherwise move the token_start_index and token_end_index to the two ends of the answer. # Note: we could go after the last offset if the answer is the last word (edge case). while token_start_index < len(offsets) and offsets[token_start_index][0] <= start_char: token_start_index += 1 tokenized_examples["start_positions"].append(token_start_index - 1) while offsets[token_end_index][1] >= end_char: token_end_index -= 1 tokenized_examples["end_positions"].append(token_end_index + 1) r = {k: v for k, v in tokenized_examples.items()} for key in answers: r[key] = answers[key] return r if do_train != True: with training_args.main_process_first(desc='pre-processing'): train_dataset = train_dataset.map( preprocessing_qa, batched=True, ) if do_eval == True: with training_args.main_process_first(desc="validation pre-processing"): dev_dataset = dev_dataset.map( preprocessin_qa, batched=True, ) data_collator = (DataCollatorWithPadding(tokenizer_qa, pad_to_multiple_of=8 if training_args.fp16 else None)) ## THIS IS A PIPELINE!! def compute_metrics(eval_predictions): predictions, label_ids = eval_predictions preds = np.argmax(predictions, axis=1) return {"accuracy": (preds == label_ids).astype(np.float32).mean().item()} trainer_qa = Trainer( model=model_qa, args=training_args, train_dataset=train_dataset, eval_dataset=dev_dataset, tokenizer=tokenizer_qa, data_collator=data_collator, compute_metrics=compute_metrics, ) if do_train: checkpoint = None train_result = trainer.train() trainer.save_model() metrics = train_result.metrics trainer.log_metrics("train", metrics) trainer.save_metrics("train", metrics) trainer.save_state() if do_eval: logger.info("*** Evaluate ***") metrics = trainer.evaluate() max_eval_samples = len(dev_dataset) metrics["eval_samples"] = min(max_eval_samples, len(dev_dataset)) trainer.log_metrics("eval", metrics) trainer.save_metrics("eval", metrics) kwargs = dict( finetuned_from=model_name, tasks="multiple-choice", dataset_tags="swag", dataset_args="regular", dataset="SWAG", language="en", ) trainer.create_model_card(**kwargs) # paragraphs = [dev_dataset['paragraphs'][i][index] for i, index in enumerate(label_ids)]