# coding=utf-8 # Copyright 2018 the HuggingFace Inc. team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import dataclasses import gc import os import tempfile import unittest import numpy as np from transformers import AutoTokenizer, EvaluationStrategy, PretrainedConfig, TrainingArguments, is_torch_available from transformers.file_utils import WEIGHTS_NAME from transformers.testing_utils import ( get_tests_dir, require_datasets, require_optuna, require_sentencepiece, require_tokenizers, require_torch, require_torch_gpu, require_torch_multi_gpu, slow, ) from transformers.utils.hp_naming import TrialShortNamer if is_torch_available(): import torch from torch.utils.data import IterableDataset from transformers import ( AutoModelForMaskedLM, AutoModelForSequenceClassification, DataCollatorForLanguageModeling, EarlyStoppingCallback, GlueDataset, GlueDataTrainingArguments, GPT2Config, GPT2LMHeadModel, LineByLineTextDataset, PreTrainedModel, TextDataset, Trainer, TrainerState, ) from transformers.trainer import _model_unwrap PATH_SAMPLE_TEXT = f"{get_tests_dir()}/fixtures/sample_text.txt" class RegressionDataset: def __init__(self, a=2, b=3, length=64, seed=42, label_names=None): np.random.seed(seed) self.label_names = ["labels"] if label_names is None else label_names self.length = length self.x = np.random.normal(size=(length,)).astype(np.float32) self.ys = [a * self.x + b + np.random.normal(scale=0.1, size=(length,)) for _ in self.label_names] self.ys = [y.astype(np.float32) for y in self.ys] def __len__(self): return self.length def __getitem__(self, i): result = {name: y[i] for name, y in zip(self.label_names, self.ys)} result["input_x"] = self.x[i] return result class RepeatDataset: def __init__(self, x, length=64): self.x = x self.length = length def __len__(self): return self.length def __getitem__(self, i): return {"input_ids": self.x, "labels": self.x} class DynamicShapesDataset: def __init__(self, length=64, seed=42, batch_size=8): self.length = length np.random.seed(seed) sizes = np.random.randint(1, 20, (length // batch_size,)) # For easy batching, we make every batch_size consecutive samples the same size. self.xs = [np.random.normal(size=(s,)) for s in sizes.repeat(batch_size)] self.ys = [np.random.normal(size=(s,)) for s in sizes.repeat(batch_size)] def __len__(self): return self.length def __getitem__(self, i): return {"input_x": self.xs[i], "labels": self.ys[i]} class AlmostAccuracy: def __init__(self, thresh=0.25): self.thresh = thresh def __call__(self, eval_pred): predictions, labels = eval_pred true = np.abs(predictions - labels) <= self.thresh return {"accuracy": true.astype(np.float32).mean().item()} class RegressionModelConfig(PretrainedConfig): def __init__(self, a=0, b=0, double_output=False, **kwargs): super().__init__(**kwargs) self.a = a self.b = b self.double_output = double_output if is_torch_available(): class SampleIterableDataset(IterableDataset): """ Criteria is not whether it is IterableDataset or not, criteria is whether __len__ is implemented """ def __init__(self, file_path, tokenizer): self.ds = TextDataset(file_path=file_path, tokenizer=tokenizer, block_size=64) def __iter__(self): for i in range(len(self.ds)): yield self.ds[i] class RegressionModel(torch.nn.Module): def __init__(self, a=0, b=0, double_output=False): super().__init__() self.a = torch.nn.Parameter(torch.tensor(a).float()) self.b = torch.nn.Parameter(torch.tensor(b).float()) self.double_output = double_output self.config = None def forward(self, input_x, labels=None, **kwargs): y = input_x * self.a + self.b if labels is None: return (y, y) if self.double_output else (y,) loss = torch.nn.functional.mse_loss(y, labels) return (loss, y, y) if self.double_output else (loss, y) class RegressionDictModel(torch.nn.Module): def __init__(self, a=0, b=0): super().__init__() self.a = torch.nn.Parameter(torch.tensor(a).float()) self.b = torch.nn.Parameter(torch.tensor(b).float()) self.config = None def forward(self, input_x, labels=None, **kwargs): y = input_x * self.a + self.b result = {"output": y} if labels is not None: result["loss"] = torch.nn.functional.mse_loss(y, labels) return result class RegressionPreTrainedModel(PreTrainedModel): config_class = RegressionModelConfig base_model_prefix = "regression" def __init__(self, config): super().__init__(config) self.a = torch.nn.Parameter(torch.tensor(config.a).float()) self.b = torch.nn.Parameter(torch.tensor(config.b).float()) self.double_output = config.double_output def forward(self, input_x, labels=None, **kwargs): y = input_x * self.a + self.b if labels is None: return (y, y) if self.double_output else (y,) loss = torch.nn.functional.mse_loss(y, labels) return (loss, y, y) if self.double_output else (loss, y) def get_regression_trainer(a=0, b=0, double_output=False, train_len=64, eval_len=64, pretrained=True, **kwargs): label_names = kwargs.get("label_names", None) train_dataset = RegressionDataset(length=train_len, label_names=label_names) eval_dataset = RegressionDataset(length=eval_len, label_names=label_names) if pretrained: config = RegressionModelConfig(a=a, b=b, double_output=double_output) model = RegressionPreTrainedModel(config) else: model = RegressionModel(a=a, b=b, double_output=double_output) compute_metrics = kwargs.pop("compute_metrics", None) data_collator = kwargs.pop("data_collator", None) optimizers = kwargs.pop("optimizers", (None, None)) output_dir = kwargs.pop("output_dir", "./regression") model_init = kwargs.pop("model_init", None) args = TrainingArguments(output_dir, **kwargs) return Trainer( model, args, data_collator=data_collator, train_dataset=train_dataset, eval_dataset=eval_dataset, compute_metrics=compute_metrics, optimizers=optimizers, model_init=model_init, ) @require_torch @require_sentencepiece @require_tokenizers class TrainerIntegrationTest(unittest.TestCase): def setUp(self): args = TrainingArguments(".") self.n_epochs = args.num_train_epochs self.batch_size = args.train_batch_size trainer = get_regression_trainer(learning_rate=0.1) trainer.train() self.default_trained_model = (trainer.model.a, trainer.model.b) trainer = get_regression_trainer(learning_rate=0.1, seed=314) trainer.train() self.alternate_trained_model = (trainer.model.a, trainer.model.b) def check_trained_model(self, model, alternate_seed=False): # Checks a training seeded with learning_rate = 0.1 (a, b) = self.alternate_trained_model if alternate_seed else self.default_trained_model self.assertTrue(torch.allclose(model.a, a)) self.assertTrue(torch.allclose(model.b, b)) def check_saved_checkpoints(self, output_dir, freq, total, is_pretrained=True): file_list = [WEIGHTS_NAME, "training_args.bin", "optimizer.pt", "scheduler.pt", "trainer_state.json"] if is_pretrained: file_list.append("config.json") for step in range(freq, total, freq): checkpoint = os.path.join(output_dir, f"checkpoint-{step}") self.assertTrue(os.path.isdir(checkpoint)) for filename in file_list: self.assertTrue(os.path.isfile(os.path.join(checkpoint, filename))) def check_best_model_has_been_loaded( self, output_dir, freq, total, trainer, metric, greater_is_better=False, is_pretrained=True ): checkpoint = os.path.join(output_dir, f"checkpoint-{(total // freq) * freq}") log_history = TrainerState.load_from_json(os.path.join(checkpoint, "trainer_state.json")).log_history values = [d[metric] for d in log_history] best_value = max(values) if greater_is_better else min(values) best_checkpoint = (values.index(best_value) + 1) * freq checkpoint = os.path.join(output_dir, f"checkpoint-{best_checkpoint}") if is_pretrained: best_model = RegressionPreTrainedModel.from_pretrained(checkpoint) best_model.to(trainer.args.device) else: best_model = RegressionModel() state_dict = torch.load(os.path.join(checkpoint, WEIGHTS_NAME)) best_model.load_state_dict(state_dict) best_model.to(trainer.args.device) self.assertTrue(torch.allclose(best_model.a, trainer.model.a)) self.assertTrue(torch.allclose(best_model.b, trainer.model.b)) metrics = trainer.evaluate() self.assertEqual(metrics[metric], best_value) def check_trainer_state_are_the_same(self, trainer_state, trainer_state1): # We'll pop things so operate on copies. state = trainer_state.copy() state1 = trainer_state1.copy() # Log history main contain different logs for the time metrics (after resuming a training). log_history = state.pop("log_history", None) log_history1 = state1.pop("log_history", None) self.assertEqual(state, state1) for log, log1 in zip(log_history, log_history1): _ = log.pop("train_runtime", None) _ = log1.pop("train_runtime", None) _ = log.pop("train_samples_per_second", None) _ = log1.pop("train_samples_per_second", None) self.assertEqual(log, log1) def test_trainer_works_with_dict(self): # Edge case because Apex with mode O2 will change our models to return dicts. This test checks it doesn't break # anything. train_dataset = RegressionDataset() eval_dataset = RegressionDataset() model = RegressionDictModel() args = TrainingArguments("./regression") trainer = Trainer(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset) trainer.train() _ = trainer.evaluate() _ = trainer.predict(eval_dataset) def test_evaluation_with_keys_to_drop(self): config = GPT2Config(vocab_size=100, n_positions=128, n_ctx=128, n_embd=32, n_layer=3, n_head=4) tiny_gpt2 = GPT2LMHeadModel(config) x = torch.randint(0, 100, (128,)) eval_dataset = RepeatDataset(x) args = TrainingArguments("./test") trainer = Trainer(tiny_gpt2, args, eval_dataset=eval_dataset) # By default the past_key_values are removed result = trainer.predict(eval_dataset) self.assertTrue(isinstance(result.predictions, np.ndarray)) # We can still get them by setting ignore_keys to [] result = trainer.predict(eval_dataset, ignore_keys=[]) self.assertTrue(isinstance(result.predictions, tuple)) self.assertEqual(len(result.predictions), 2) def test_training_arguments_are_left_untouched(self): trainer = get_regression_trainer() trainer.train() args = TrainingArguments("./regression") dict1, dict2 = args.to_dict(), trainer.args.to_dict() for key in dict1.keys(): # Logging dir can be slightly different as they default to something with the time. if key != "logging_dir": self.assertEqual(dict1[key], dict2[key]) def test_reproducible_training(self): # Checks that training worked, model trained and seed made a reproducible training. trainer = get_regression_trainer(learning_rate=0.1) trainer.train() self.check_trained_model(trainer.model) # Checks that a different seed gets different (reproducible) results. trainer = get_regression_trainer(learning_rate=0.1, seed=314) trainer.train() self.check_trained_model(trainer.model, alternate_seed=True) def test_number_of_steps_in_training(self): # Regular training has n_epochs * len(train_dl) steps trainer = get_regression_trainer(learning_rate=0.1) train_output = trainer.train() self.assertEqual(train_output.global_step, self.n_epochs * 64 / self.batch_size) # Check passing num_train_epochs works (and a float version too): trainer = get_regression_trainer(learning_rate=0.1, num_train_epochs=1.5) train_output = trainer.train() self.assertEqual(train_output.global_step, int(1.5 * 64 / self.batch_size)) # If we pass a max_steps, num_train_epochs is ignored trainer = get_regression_trainer(learning_rate=0.1, max_steps=10) train_output = trainer.train() self.assertEqual(train_output.global_step, 10) def test_train_and_eval_dataloaders(self): n_gpu = max(1, torch.cuda.device_count()) trainer = get_regression_trainer(learning_rate=0.1, per_device_train_batch_size=16) self.assertEqual(trainer.get_train_dataloader().batch_size, 16 * n_gpu) trainer = get_regression_trainer(learning_rate=0.1, per_device_eval_batch_size=16) self.assertEqual(trainer.get_eval_dataloader().batch_size, 16 * n_gpu) # Check drop_last works trainer = get_regression_trainer( train_len=66, eval_len=74, learning_rate=0.1, per_device_train_batch_size=16, per_device_eval_batch_size=32 ) self.assertEqual(len(trainer.get_train_dataloader()), 66 // (16 * n_gpu) + 1) self.assertEqual(len(trainer.get_eval_dataloader()), 74 // (32 * n_gpu) + 1) trainer = get_regression_trainer( train_len=66, eval_len=74, learning_rate=0.1, per_device_train_batch_size=16, per_device_eval_batch_size=32, dataloader_drop_last=True, ) self.assertEqual(len(trainer.get_train_dataloader()), 66 // (16 * n_gpu)) self.assertEqual(len(trainer.get_eval_dataloader()), 74 // (32 * n_gpu)) # Check passing a new dataset for evaluation works new_eval_dataset = RegressionDataset(length=128) self.assertEqual(len(trainer.get_eval_dataloader(new_eval_dataset)), 128 // (32 * n_gpu)) @require_torch_multi_gpu def test_data_is_not_parallelized_when_model_is_parallel(self): model = RegressionModel() # Make the Trainer believe it's a parallelized model model.is_parallelizable = True model.model_parallel = True args = TrainingArguments("./regression", per_device_train_batch_size=16, per_device_eval_batch_size=16) trainer = Trainer(model, args, train_dataset=RegressionDataset(), eval_dataset=RegressionDataset()) # Check the Trainer was fooled self.assertTrue(trainer.is_model_parallel) self.assertEqual(trainer.args.n_gpu, 1) # The batch size of the training and evaluation dataloaders should be 16, not 16 * n_gpu self.assertEqual(trainer.get_train_dataloader().batch_size, 16) self.assertEqual(len(trainer.get_train_dataloader()), 64 // 16) self.assertEqual(trainer.get_eval_dataloader().batch_size, 16) self.assertEqual(len(trainer.get_eval_dataloader()), 64 // 16) def test_evaluate(self): trainer = get_regression_trainer(a=1.5, b=2.5, compute_metrics=AlmostAccuracy()) results = trainer.evaluate() x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0] pred = 1.5 * x + 2.5 expected_loss = ((pred - y) ** 2).mean() self.assertAlmostEqual(results["eval_loss"], expected_loss) expected_acc = AlmostAccuracy()((pred, y))["accuracy"] self.assertAlmostEqual(results["eval_accuracy"], expected_acc) # With a number of elements not a round multiple of the batch size trainer = get_regression_trainer(a=1.5, b=2.5, eval_len=66, compute_metrics=AlmostAccuracy()) results = trainer.evaluate() x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0] pred = 1.5 * x + 2.5 expected_loss = ((pred - y) ** 2).mean() self.assertAlmostEqual(results["eval_loss"], expected_loss) expected_acc = AlmostAccuracy()((pred, y))["accuracy"] self.assertAlmostEqual(results["eval_accuracy"], expected_acc) def test_predict(self): trainer = get_regression_trainer(a=1.5, b=2.5) preds = trainer.predict(trainer.eval_dataset).predictions x = trainer.eval_dataset.x self.assertTrue(np.allclose(preds, 1.5 * x + 2.5)) # With a number of elements not a round multiple of the batch size trainer = get_regression_trainer(a=1.5, b=2.5, eval_len=66) preds = trainer.predict(trainer.eval_dataset).predictions x = trainer.eval_dataset.x self.assertTrue(np.allclose(preds, 1.5 * x + 2.5)) # With more than one output of the model trainer = get_regression_trainer(a=1.5, b=2.5, double_output=True) preds = trainer.predict(trainer.eval_dataset).predictions x = trainer.eval_dataset.x self.assertTrue(len(preds), 2) self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5)) self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5)) # With more than one output/label of the model trainer = get_regression_trainer(a=1.5, b=2.5, double_output=True, label_names=["labels", "labels_2"]) outputs = trainer.predict(trainer.eval_dataset) preds = outputs.predictions labels = outputs.label_ids x = trainer.eval_dataset.x self.assertTrue(len(preds), 2) self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5)) self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5)) self.assertTrue(np.array_equal(labels[0], trainer.eval_dataset.ys[0])) self.assertTrue(np.array_equal(labels[1], trainer.eval_dataset.ys[1])) def test_dynamic_shapes(self): eval_dataset = DynamicShapesDataset(batch_size=self.batch_size) model = RegressionModel(a=2, b=1) args = TrainingArguments("./regression") trainer = Trainer(model, args, eval_dataset=eval_dataset) # Check evaluation can run to completion _ = trainer.evaluate() # Check predictions preds = trainer.predict(eval_dataset) for expected, seen in zip(eval_dataset.ys, preds.label_ids): self.assertTrue(np.array_equal(expected, seen[: expected.shape[0]])) self.assertTrue(np.all(seen[expected.shape[0] :] == -100)) for expected, seen in zip(eval_dataset.xs, preds.predictions): self.assertTrue(np.array_equal(2 * expected + 1, seen[: expected.shape[0]])) self.assertTrue(np.all(seen[expected.shape[0] :] == -100)) # Same tests with eval accumulation args = TrainingArguments("./regression", eval_accumulation_steps=2) trainer = Trainer(model, args, eval_dataset=eval_dataset) # Check evaluation can run to completion _ = trainer.evaluate() # Check predictions preds = trainer.predict(eval_dataset) for expected, seen in zip(eval_dataset.ys, preds.label_ids): self.assertTrue(np.array_equal(expected, seen[: expected.shape[0]])) self.assertTrue(np.all(seen[expected.shape[0] :] == -100)) for expected, seen in zip(eval_dataset.xs, preds.predictions): self.assertTrue(np.array_equal(2 * expected + 1, seen[: expected.shape[0]])) self.assertTrue(np.all(seen[expected.shape[0] :] == -100)) @require_datasets def test_trainer_with_datasets(self): import datasets np.random.seed(42) x = np.random.normal(size=(64,)).astype(np.float32) y = 2.0 * x + 3.0 + np.random.normal(scale=0.1, size=(64,)) train_dataset = datasets.Dataset.from_dict({"input_x": x, "label": y}) # Base training. Should have the same results as test_reproducible_training model = RegressionModel() args = TrainingArguments("./regression", learning_rate=0.1) trainer = Trainer(model, args, train_dataset=train_dataset) trainer.train() self.check_trained_model(trainer.model) # Can return tensors. train_dataset.set_format(type="torch", dtype=torch.float32) model = RegressionModel() trainer = Trainer(model, args, train_dataset=train_dataset) trainer.train() self.check_trained_model(trainer.model) # Adding one column not used by the model should have no impact z = np.random.normal(size=(64,)).astype(np.float32) train_dataset = datasets.Dataset.from_dict({"input_x": x, "label": y, "extra": z}) model = RegressionModel() trainer = Trainer(model, args, train_dataset=train_dataset) trainer.train() self.check_trained_model(trainer.model) def test_custom_optimizer(self): train_dataset = RegressionDataset() args = TrainingArguments("./regression") model = RegressionModel() optimizer = torch.optim.SGD(model.parameters(), lr=1.0) lr_scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda x: 1.0) trainer = Trainer(model, args, train_dataset=train_dataset, optimizers=(optimizer, lr_scheduler)) trainer.train() (a, b) = self.default_trained_model self.assertFalse(torch.allclose(trainer.model.a, a)) self.assertFalse(torch.allclose(trainer.model.b, b)) self.assertEqual(trainer.optimizer.state_dict()["param_groups"][0]["lr"], 1.0) def test_model_init(self): train_dataset = RegressionDataset() args = TrainingArguments("./regression", learning_rate=0.1) trainer = Trainer(args=args, train_dataset=train_dataset, model_init=lambda: RegressionModel()) trainer.train() self.check_trained_model(trainer.model) # Re-training should restart from scratch, thus lead the same results. trainer.train() self.check_trained_model(trainer.model) # Re-training should restart from scratch, thus lead the same results and new seed should be used. trainer.args.seed = 314 trainer.train() self.check_trained_model(trainer.model, alternate_seed=True) def test_save_checkpoints(self): with tempfile.TemporaryDirectory() as tmpdir: trainer = get_regression_trainer(output_dir=tmpdir, save_steps=5) trainer.train() self.check_saved_checkpoints(tmpdir, 5, int(self.n_epochs * 64 / self.batch_size)) # With a regular model that is not a PreTrainedModel with tempfile.TemporaryDirectory() as tmpdir: trainer = get_regression_trainer(output_dir=tmpdir, save_steps=5, pretrained=False) trainer.train() self.check_saved_checkpoints(tmpdir, 5, int(self.n_epochs * 64 / self.batch_size), False) def test_gradient_accumulation(self): # Training with half the batch size but accumulation steps as 2 should give the same results. trainer = get_regression_trainer( gradient_accumulation_steps=2, per_device_train_batch_size=4, learning_rate=0.1 ) trainer.train() self.check_trained_model(trainer.model) def test_can_resume_training(self): if torch.cuda.device_count() > 2: # This test will fail for more than 2 GPUs since the batch size will get bigger and with the number of # save_steps, the checkpoint will resume training at epoch 2 or more (so the data seen by the model # won't be the same since the training dataloader is shuffled). return with tempfile.TemporaryDirectory() as tmpdir: trainer = get_regression_trainer(output_dir=tmpdir, train_len=128, save_steps=5, learning_rate=0.1) trainer.train() (a, b) = trainer.model.a.item(), trainer.model.b.item() state = dataclasses.asdict(trainer.state) checkpoint = os.path.join(tmpdir, "checkpoint-5") # Reinitialize trainer trainer = get_regression_trainer(output_dir=tmpdir, train_len=128, save_steps=5, learning_rate=0.1) trainer.train(resume_from_checkpoint=checkpoint) (a1, b1) = trainer.model.a.item(), trainer.model.b.item() state1 = dataclasses.asdict(trainer.state) self.assertEqual(a, a1) self.assertEqual(b, b1) self.check_trainer_state_are_the_same(state, state1) # Now check with a later checkpoint that it also works when we span over one epoch checkpoint = os.path.join(tmpdir, "checkpoint-15") # Reinitialize trainer and load model trainer = get_regression_trainer(output_dir=tmpdir, train_len=128, save_steps=5, learning_rate=0.1) trainer.train(resume_from_checkpoint=checkpoint) (a1, b1) = trainer.model.a.item(), trainer.model.b.item() state1 = dataclasses.asdict(trainer.state) self.assertEqual(a, a1) self.assertEqual(b, b1) self.check_trainer_state_are_the_same(state, state1) # With a regular model that is not a PreTrainedModel with tempfile.TemporaryDirectory() as tmpdir: trainer = get_regression_trainer( output_dir=tmpdir, train_len=128, save_steps=5, learning_rate=0.1, pretrained=False ) trainer.train() (a, b) = trainer.model.a.item(), trainer.model.b.item() state = dataclasses.asdict(trainer.state) checkpoint = os.path.join(tmpdir, "checkpoint-5") # Reinitialize trainer and load model trainer = get_regression_trainer( output_dir=tmpdir, train_len=128, save_steps=5, learning_rate=0.1, pretrained=False ) trainer.train(resume_from_checkpoint=checkpoint) (a1, b1) = trainer.model.a.item(), trainer.model.b.item() state1 = dataclasses.asdict(trainer.state) self.assertEqual(a, a1) self.assertEqual(b, b1) self.check_trainer_state_are_the_same(state, state1) # Now check with a later checkpoint that it also works when we span over one epoch checkpoint = os.path.join(tmpdir, "checkpoint-15") # Reinitialize trainer and load model trainer = get_regression_trainer( output_dir=tmpdir, train_len=128, save_steps=5, learning_rate=0.1, pretrained=False ) trainer.train(resume_from_checkpoint=checkpoint) (a1, b1) = trainer.model.a.item(), trainer.model.b.item() state1 = dataclasses.asdict(trainer.state) self.assertEqual(a, a1) self.assertEqual(b, b1) self.check_trainer_state_are_the_same(state, state1) def test_resume_training_with_gradient_accumulation(self): if torch.cuda.device_count() > 2: # This test will fail for more than 2 GPUs since the batch size will get bigger and with the number of # save_steps, the checkpoint will resume training at epoch 2 or more (so the data seen by the model # won't be the same since the training dataloader is shuffled). return with tempfile.TemporaryDirectory() as tmpdir: trainer = get_regression_trainer( output_dir=tmpdir, train_len=128, gradient_accumulation_steps=2, per_device_train_batch_size=4, save_steps=5, learning_rate=0.1, ) trainer.train() (a, b) = trainer.model.a.item(), trainer.model.b.item() state = dataclasses.asdict(trainer.state) checkpoint = os.path.join(tmpdir, "checkpoint-5") # Reinitialize trainer trainer = get_regression_trainer( output_dir=tmpdir, train_len=128, gradient_accumulation_steps=2, per_device_train_batch_size=4, save_steps=5, learning_rate=0.1, ) trainer.train(resume_from_checkpoint=checkpoint) (a1, b1) = trainer.model.a.item(), trainer.model.b.item() state1 = dataclasses.asdict(trainer.state) self.assertEqual(a, a1) self.assertEqual(b, b1) self.check_trainer_state_are_the_same(state, state1) def test_load_best_model_at_end(self): total = int(self.n_epochs * 64 / self.batch_size) with tempfile.TemporaryDirectory() as tmpdir: trainer = get_regression_trainer( a=1.5, b=2.5, output_dir=tmpdir, learning_rate=0.1, eval_steps=5, evaluation_strategy="steps", load_best_model_at_end=True, ) self.assertFalse(trainer.args.greater_is_better) trainer.train() self.check_saved_checkpoints(tmpdir, 5, total) self.check_best_model_has_been_loaded(tmpdir, 5, total, trainer, "eval_loss") with tempfile.TemporaryDirectory() as tmpdir: trainer = get_regression_trainer( a=1.5, b=2.5, output_dir=tmpdir, learning_rate=0.1, eval_steps=5, evaluation_strategy="steps", load_best_model_at_end=True, metric_for_best_model="accuracy", compute_metrics=AlmostAccuracy(), ) self.assertTrue(trainer.args.greater_is_better) trainer.train() self.check_saved_checkpoints(tmpdir, 5, total) self.check_best_model_has_been_loaded(tmpdir, 5, total, trainer, "eval_accuracy", greater_is_better=True) # Save is done every eval regardless of the strategy with tempfile.TemporaryDirectory() as tmpdir: trainer = get_regression_trainer( a=1.5, b=2.5, output_dir=tmpdir, learning_rate=0.1, evaluation_strategy="epoch", load_best_model_at_end=True, metric_for_best_model="accuracy", compute_metrics=AlmostAccuracy(), ) self.assertTrue(trainer.args.greater_is_better) trainer.train() self.check_saved_checkpoints(tmpdir, 64 // self.batch_size, total) self.check_best_model_has_been_loaded( tmpdir, 64 // self.batch_size, total, trainer, "eval_accuracy", greater_is_better=True ) # Test this works with a non PreTrainedModel with tempfile.TemporaryDirectory() as tmpdir: trainer = get_regression_trainer( output_dir=tmpdir, learning_rate=0.1, eval_steps=5, evaluation_strategy="steps", load_best_model_at_end=True, pretrained=False, ) self.assertFalse(trainer.args.greater_is_better) trainer.train() self.check_saved_checkpoints(tmpdir, 5, total, is_pretrained=False) self.check_best_model_has_been_loaded(tmpdir, 5, total, trainer, "eval_loss", is_pretrained=False) @slow def test_trainer_eval_mrpc(self): MODEL_ID = "bert-base-cased-finetuned-mrpc" tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) model = AutoModelForSequenceClassification.from_pretrained(MODEL_ID) data_args = GlueDataTrainingArguments( task_name="mrpc", data_dir=f"{get_tests_dir()}/fixtures/tests_samples/MRPC", overwrite_cache=True ) eval_dataset = GlueDataset(data_args, tokenizer=tokenizer, mode="dev") training_args = TrainingArguments(output_dir="./examples", no_cuda=True) trainer = Trainer(model=model, args=training_args, eval_dataset=eval_dataset) result = trainer.evaluate() self.assertLess(result["eval_loss"], 0.2) @slow def test_trainer_eval_lm(self): MODEL_ID = "distilroberta-base" tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) dataset = LineByLineTextDataset( tokenizer=tokenizer, file_path=PATH_SAMPLE_TEXT, block_size=tokenizer.max_len_single_sentence, ) self.assertEqual(len(dataset), 31) def test_trainer_iterable_dataset(self): # Simulate Language Modeling with an IterableDataset, with no __len__ method # Pick-up a tiny model, so it works on CPU # See Issue #5990: https://github.com/huggingface/transformers/issues/5990 MODEL_ID = "sshleifer/tiny-distilbert-base-cased" model = AutoModelForMaskedLM.from_pretrained(MODEL_ID) tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) train_dataset = SampleIterableDataset(file_path=PATH_SAMPLE_TEXT, tokenizer=tokenizer) training_args = TrainingArguments(output_dir="./examples", no_cuda=True, max_steps=2) data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=True, mlm_probability=0.15) training_args = TrainingArguments(output_dir="./examples", no_cuda=True, max_steps=2) trainer = Trainer(model=model, args=training_args, train_dataset=train_dataset, data_collator=data_collator) trainer.train() loader = trainer.get_train_dataloader() self.assertIsInstance(loader, torch.utils.data.DataLoader) self.assertIsInstance(loader.sampler, torch.utils.data.dataloader._InfiniteConstantSampler) # Exception if giving iterable dataset and no max_steps with self.assertRaises(ValueError): training_args = TrainingArguments(output_dir="./examples", no_cuda=True) _ = Trainer(model=model, args=training_args, train_dataset=train_dataset, data_collator=data_collator) # Exception if eval_dataset is iterable in __init__ with self.assertRaises(ValueError): training_args = TrainingArguments(output_dir="./examples", no_cuda=True, max_steps=2) _ = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=train_dataset, data_collator=data_collator, ) # Exception if predicting with iterable dataset with self.assertRaises(ValueError): training_args = TrainingArguments(output_dir="./examples", no_cuda=True) trainer = Trainer(model=model, args=training_args, data_collator=data_collator) trainer.predict(train_dataset) # Exception if evaluating with iterable dataset with self.assertRaises(ValueError): training_args = TrainingArguments(output_dir="./examples", no_cuda=True) trainer = Trainer(model=model, args=training_args, data_collator=data_collator) trainer.evaluate(train_dataset) def test_num_train_epochs_in_training(self): # len(train_dl) < gradient_accumulation_steps shouldn't give ``ZeroDivisionError`` when ``max_steps`` is given. # It should give 1 update step for each epoch. trainer = get_regression_trainer( max_steps=3, train_len=64, per_device_train_batch_size=16, gradient_accumulation_steps=5 ) train_output = trainer.train() self.assertEqual(train_output.global_step, 3) # Even ``max_steps`` is not specified, we still expect 1 update step for each epoch if # len(train_dl) < gradient_accumulation_steps. trainer = get_regression_trainer(train_len=64, per_device_train_batch_size=16, gradient_accumulation_steps=5) train_output = trainer.train() self.assertEqual(train_output.global_step, int(self.n_epochs)) def test_early_stopping_callback(self): # early stopping stops training before num_training_epochs with tempfile.TemporaryDirectory() as tmp_dir: trainer = get_regression_trainer( output_dir=tmp_dir, num_train_epochs=20, gradient_accumulation_steps=1, per_device_train_batch_size=16, load_best_model_at_end=True, evaluation_strategy=EvaluationStrategy.EPOCH, compute_metrics=AlmostAccuracy(), metric_for_best_model="accuracy", ) trainer.add_callback(EarlyStoppingCallback(1, 0.0001)) train_output = trainer.train() self.assertLess(train_output.global_step, 20 * 64 / 16) # Invalid inputs to trainer with early stopping callback result in assertion error with tempfile.TemporaryDirectory() as tmp_dir: trainer = get_regression_trainer( output_dir=tmp_dir, num_train_epochs=20, gradient_accumulation_steps=1, per_device_train_batch_size=16, evaluation_strategy=EvaluationStrategy.EPOCH, compute_metrics=AlmostAccuracy(), metric_for_best_model="accuracy", ) trainer.add_callback(EarlyStoppingCallback(1)) self.assertEqual(trainer.state.global_step, 0) try: trainer.train() except AssertionError: self.assertEqual(trainer.state.global_step, 0) def test_flos_extraction(self): trainer = get_regression_trainer(learning_rate=0.1) def assert_flos_extraction(trainer, wrapped_model_to_check): self.assertEqual(trainer.model, _model_unwrap(wrapped_model_to_check)) self.assertGreaterEqual(getattr(_model_unwrap(wrapped_model_to_check).config, "total_flos", 0), 0) # with plain model assert_flos_extraction(trainer, trainer.model) # with enforced DataParallel assert_flos_extraction(trainer, torch.nn.DataParallel(trainer.model)) trainer.train() self.assertTrue(isinstance(trainer.state.total_flos, float)) def check_mem_metrics(self, trainer, check_func): metrics = trainer.train().metrics check_func("init_mem_cpu_alloc_delta", metrics) check_func("train_mem_cpu_alloc_delta", metrics) if torch.cuda.device_count() > 0: check_func("init_mem_gpu_alloc_delta", metrics) check_func("train_mem_gpu_alloc_delta", metrics) metrics = trainer.evaluate() check_func("eval_mem_cpu_alloc_delta", metrics) if torch.cuda.device_count() > 0: check_func("eval_mem_gpu_alloc_delta", metrics) metrics = trainer.predict(RegressionDataset()).metrics check_func("test_mem_cpu_alloc_delta", metrics) if torch.cuda.device_count() > 0: check_func("test_mem_gpu_alloc_delta", metrics) def test_mem_metrics(self): # with mem metrics enabled trainer = get_regression_trainer() self.check_mem_metrics(trainer, self.assertIn) # with mem metrics disabled trainer = get_regression_trainer(skip_memory_metrics=True) self.check_mem_metrics(trainer, self.assertNotIn) @require_torch_gpu def test_fp16_full_eval(self): # this is a sensitive test so let's keep debugging printouts in place for quick diagnosis. # it's using pretty large safety margins, but small enough to detect broken functionality. debug = 0 bs = 8 # make the params somewhat big so that there will be enough RAM consumed to be able to # measure things. We should get about 64KB for a+b in fp32 a = torch.ones(1000, bs) + 0.001 b = torch.ones(1000, bs) - 0.001 # 1. with mem metrics enabled trainer = get_regression_trainer(a=a, b=b, eval_len=16) metrics = trainer.evaluate() del trainer gc.collect() fp32_init = metrics["init_mem_gpu_alloc_delta"] fp32_eval = metrics["eval_mem_gpu_alloc_delta"] if debug: print(f"fp32_init {fp32_init}") print(f"fp32_eval {fp32_eval}") # here we expect the model to be preloaded in trainer.__init__ and consume around 64K gpu ram. # perfect world: fp32_init == 64<<10 self.assertGreater(fp32_init, 59_000) # after eval should be no extra memory allocated - with a small margin (other than the peak # memory consumption for the forward calculation that gets recovered) # perfect world: fp32_eval == close to zero self.assertLess(fp32_eval, 5_000) # 2. with mem metrics disabled trainer = get_regression_trainer(a=a, b=b, eval_len=16, fp16_full_eval=True) metrics = trainer.evaluate() fp16_init = metrics["init_mem_gpu_alloc_delta"] fp16_eval = metrics["eval_mem_gpu_alloc_delta"] if debug: print(f"fp16_init {fp16_init}") print(f"fp16_eval {fp16_eval}") # here we expect the model to not be preloaded in trainer.__init__, so with a small margin it should be close to 0 # perfect world: fp16_init == close to zero self.assertLess(fp16_init, 5_000) # here we put the model on device in eval and only `half()` of it, i.e. about 32K,(again we ignore the peak margin which gets returned back) # perfect world: fp32_init == 32<<10 self.assertGreater(fp16_eval, 27_000) # 3. relative comparison fp32 vs full fp16 # should be about half of fp16_init # perfect world: fp32_init/2 == fp16_eval self.assertAlmostEqual(fp16_eval, fp32_init / 2, delta=5_000) @require_torch @require_optuna class TrainerHyperParameterIntegrationTest(unittest.TestCase): def setUp(self): args = TrainingArguments(".") self.n_epochs = args.num_train_epochs self.batch_size = args.train_batch_size def test_hyperparameter_search(self): class MyTrialShortNamer(TrialShortNamer): DEFAULTS = {"a": 0, "b": 0} def hp_space(trial): return {} def model_init(trial): if trial is not None: a = trial.suggest_int("a", -4, 4) b = trial.suggest_int("b", -4, 4) else: a = 0 b = 0 config = RegressionModelConfig(a=a, b=b, double_output=False) return RegressionPreTrainedModel(config) def hp_name(trial): return MyTrialShortNamer.shortname(trial.params) with tempfile.TemporaryDirectory() as tmp_dir: trainer = get_regression_trainer( output_dir=tmp_dir, learning_rate=0.1, logging_steps=1, evaluation_strategy=EvaluationStrategy.EPOCH, num_train_epochs=4, disable_tqdm=True, load_best_model_at_end=True, logging_dir="runs", run_name="test", model_init=model_init, ) trainer.hyperparameter_search(direction="minimize", hp_space=hp_space, hp_name=hp_name, n_trials=4)