# Copyright 2020 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import shutil import tempfile import unittest from unittest.mock import patch from transformers import ( DefaultFlowCallback, EarlyStoppingCallback, IntervalStrategy, PrinterCallback, ProgressCallback, Trainer, TrainerCallback, TrainerState, TrainingArguments, is_torch_available, ) from transformers.testing_utils import require_torch from transformers.trainer_callback import ExportableState if is_torch_available(): from transformers.trainer import DEFAULT_CALLBACKS, TRAINER_STATE_NAME from .test_trainer import RegressionDataset, RegressionModelConfig, RegressionPreTrainedModel class MyTestExportableCallback(TrainerCallback, ExportableState): def __init__(self, my_test_state="test"): self.my_test_state = my_test_state def state(self): return { "args": { "my_test_state": self.my_test_state, }, } class MyTestTrainerCallback(TrainerCallback): "A callback that registers the events that goes through." def __init__(self, my_test_state="test"): self.events = [] self.my_test_state = my_test_state def on_init_end(self, args, state, control, **kwargs): self.events.append("on_init_end") def on_train_begin(self, args, state, control, **kwargs): self.events.append("on_train_begin") def on_train_end(self, args, state, control, **kwargs): self.events.append("on_train_end") def on_epoch_begin(self, args, state, control, **kwargs): self.events.append("on_epoch_begin") def on_epoch_end(self, args, state, control, **kwargs): self.events.append("on_epoch_end") def on_step_begin(self, args, state, control, **kwargs): self.events.append("on_step_begin") def on_pre_optimizer_step(self, args, state, control, **kwargs): self.events.append("on_pre_optimizer_step") def on_optimizer_step(self, args, state, control, **kwargs): self.events.append("on_optimizer_step") def on_step_end(self, args, state, control, **kwargs): self.events.append("on_step_end") def on_evaluate(self, args, state, control, **kwargs): self.events.append("on_evaluate") def on_predict(self, args, state, control, **kwargs): self.events.append("on_predict") def on_save(self, args, state, control, **kwargs): self.events.append("on_save") def on_log(self, args, state, control, **kwargs): self.events.append("on_log") def on_prediction_step(self, args, state, control, **kwargs): self.events.append("on_prediction_step") @require_torch class TrainerCallbackTest(unittest.TestCase): def setUp(self): self.output_dir = tempfile.mkdtemp() def tearDown(self): shutil.rmtree(self.output_dir) def get_trainer(self, a=0, b=0, train_len=64, eval_len=64, callbacks=None, disable_tqdm=False, **kwargs): # disable_tqdm in TrainingArguments has a flaky default since it depends on the level of logging. We make sure # its set to False since the tests later on depend on its value. train_dataset = RegressionDataset(length=train_len) eval_dataset = RegressionDataset(length=eval_len) config = RegressionModelConfig(a=a, b=b) model = RegressionPreTrainedModel(config) args = TrainingArguments(self.output_dir, disable_tqdm=disable_tqdm, report_to=[], **kwargs) return Trainer( model, args, train_dataset=train_dataset, eval_dataset=eval_dataset, callbacks=callbacks, ) def check_callbacks_equality(self, cbs1, cbs2): self.assertEqual(len(cbs1), len(cbs2)) # Order doesn't matter cbs1 = sorted(cbs1, key=lambda cb: cb.__name__ if isinstance(cb, type) else cb.__class__.__name__) cbs2 = sorted(cbs2, key=lambda cb: cb.__name__ if isinstance(cb, type) else cb.__class__.__name__) for cb1, cb2 in zip(cbs1, cbs2): if isinstance(cb1, type) and isinstance(cb2, type): self.assertEqual(cb1, cb2) elif isinstance(cb1, type) and not isinstance(cb2, type): self.assertEqual(cb1, cb2.__class__) elif not isinstance(cb1, type) and isinstance(cb2, type): self.assertEqual(cb1.__class__, cb2) else: self.assertEqual(cb1, cb2) def get_expected_events(self, trainer): expected_events = ["on_init_end", "on_train_begin"] step = 0 train_dl_len = len(trainer.get_eval_dataloader()) evaluation_events = ["on_prediction_step"] * len(trainer.get_eval_dataloader()) + ["on_log", "on_evaluate"] for _ in range(trainer.state.num_train_epochs): expected_events.append("on_epoch_begin") for _ in range(train_dl_len): step += 1 expected_events += ["on_step_begin", "on_pre_optimizer_step", "on_optimizer_step", "on_step_end"] if step % trainer.args.logging_steps == 0: expected_events.append("on_log") if trainer.args.eval_strategy == IntervalStrategy.STEPS and step % trainer.args.eval_steps == 0: expected_events += evaluation_events.copy() if step % trainer.args.save_steps == 0 or step == trainer.state.max_steps: expected_events.append("on_save") expected_events.append("on_epoch_end") if trainer.args.eval_strategy == IntervalStrategy.EPOCH: expected_events += evaluation_events.copy() expected_events += ["on_log", "on_train_end"] return expected_events def test_init_callback(self): trainer = self.get_trainer() expected_callbacks = DEFAULT_CALLBACKS.copy() + [ProgressCallback] self.check_callbacks_equality(trainer.callback_handler.callbacks, expected_callbacks) # Callbacks passed at init are added to the default callbacks trainer = self.get_trainer(callbacks=[MyTestTrainerCallback]) expected_callbacks.append(MyTestTrainerCallback) self.check_callbacks_equality(trainer.callback_handler.callbacks, expected_callbacks) # TrainingArguments.disable_tqdm controls if use ProgressCallback or PrinterCallback trainer = self.get_trainer(disable_tqdm=True) expected_callbacks = DEFAULT_CALLBACKS.copy() + [PrinterCallback] self.check_callbacks_equality(trainer.callback_handler.callbacks, expected_callbacks) def test_add_remove_callback(self): expected_callbacks = DEFAULT_CALLBACKS.copy() + [ProgressCallback] trainer = self.get_trainer() # We can add, pop, or remove by class name trainer.remove_callback(DefaultFlowCallback) expected_callbacks.remove(DefaultFlowCallback) self.check_callbacks_equality(trainer.callback_handler.callbacks, expected_callbacks) trainer = self.get_trainer() cb = trainer.pop_callback(DefaultFlowCallback) self.assertEqual(cb.__class__, DefaultFlowCallback) self.check_callbacks_equality(trainer.callback_handler.callbacks, expected_callbacks) trainer.add_callback(DefaultFlowCallback) expected_callbacks.insert(0, DefaultFlowCallback) self.check_callbacks_equality(trainer.callback_handler.callbacks, expected_callbacks) # We can also add, pop, or remove by instance trainer = self.get_trainer() cb = trainer.callback_handler.callbacks[0] trainer.remove_callback(cb) expected_callbacks.remove(DefaultFlowCallback) self.check_callbacks_equality(trainer.callback_handler.callbacks, expected_callbacks) trainer = self.get_trainer() cb1 = trainer.callback_handler.callbacks[0] cb2 = trainer.pop_callback(cb1) self.assertEqual(cb1, cb2) self.check_callbacks_equality(trainer.callback_handler.callbacks, expected_callbacks) trainer.add_callback(cb1) expected_callbacks.insert(0, DefaultFlowCallback) self.check_callbacks_equality(trainer.callback_handler.callbacks, expected_callbacks) def test_event_flow(self): import warnings # XXX: for now ignore scatter_gather warnings in this test since it's not relevant to what's being tested with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=UserWarning) trainer = self.get_trainer(callbacks=[MyTestTrainerCallback]) trainer.train() events = trainer.callback_handler.callbacks[-2].events self.assertEqual(events, self.get_expected_events(trainer)) # Independent log/save/eval trainer = self.get_trainer(callbacks=[MyTestTrainerCallback], logging_steps=5) trainer.train() events = trainer.callback_handler.callbacks[-2].events self.assertEqual(events, self.get_expected_events(trainer)) trainer = self.get_trainer(callbacks=[MyTestTrainerCallback], save_steps=5) trainer.train() events = trainer.callback_handler.callbacks[-2].events self.assertEqual(events, self.get_expected_events(trainer)) trainer = self.get_trainer(callbacks=[MyTestTrainerCallback], eval_steps=5, eval_strategy="steps") trainer.train() events = trainer.callback_handler.callbacks[-2].events self.assertEqual(events, self.get_expected_events(trainer)) trainer = self.get_trainer(callbacks=[MyTestTrainerCallback], eval_strategy="epoch") trainer.train() events = trainer.callback_handler.callbacks[-2].events self.assertEqual(events, self.get_expected_events(trainer)) # A bit of everything trainer = self.get_trainer( callbacks=[MyTestTrainerCallback], logging_steps=3, save_steps=10, eval_steps=5, eval_strategy="steps", ) trainer.train() events = trainer.callback_handler.callbacks[-2].events self.assertEqual(events, self.get_expected_events(trainer)) # warning should be emitted for duplicated callbacks with patch("transformers.trainer_callback.logger.warning") as warn_mock: trainer = self.get_trainer( callbacks=[MyTestTrainerCallback, MyTestTrainerCallback], ) assert str(MyTestTrainerCallback) in warn_mock.call_args[0][0] def test_stateful_callbacks(self): # Use something with non-defaults cb = EarlyStoppingCallback(early_stopping_patience=5, early_stopping_threshold=0.2) trainer = self.get_trainer( callbacks=[cb], load_best_model_at_end=True, save_strategy="steps", eval_strategy="steps", save_steps=2, eval_steps=2, max_steps=2, ) trainer.train() # Create a new trainer with defaults trainer = self.get_trainer( callbacks=[EarlyStoppingCallback()], load_best_model_at_end=True, save_strategy="steps", eval_strategy="steps", save_steps=2, eval_steps=2, max_steps=2, restore_callback_states_from_checkpoint=True, ) # Load it back in and verify values checkpoint = os.path.join(self.output_dir, "checkpoint-2") trainer.train(resume_from_checkpoint=checkpoint) cb = [ callback for callback in trainer.callback_handler.callbacks if isinstance(callback, EarlyStoppingCallback) ][0] assert cb.early_stopping_patience == 5 assert cb.early_stopping_threshold == 0.2 def test_stateful_mixed_callbacks(self): # Use two callbacks, one stateful one not # Use something with non-defaults cbs = [ MyTestTrainerCallback(my_test_state="another value"), EarlyStoppingCallback(early_stopping_patience=5, early_stopping_threshold=0.2), ] trainer = self.get_trainer( callbacks=cbs, load_best_model_at_end=True, save_strategy="steps", eval_strategy="steps", save_steps=2, eval_steps=2, max_steps=2, ) trainer.train() # Create a new trainer with defaults trainer = self.get_trainer( callbacks=[EarlyStoppingCallback(), MyTestTrainerCallback()], load_best_model_at_end=True, save_strategy="steps", eval_strategy="steps", save_steps=2, eval_steps=2, max_steps=2, restore_callback_states_from_checkpoint=True, ) # Load it back in and verify values checkpoint = os.path.join(self.output_dir, "checkpoint-2") trainer.train(resume_from_checkpoint=checkpoint) cbs = [ callback for callback in trainer.callback_handler.callbacks if isinstance(callback, (EarlyStoppingCallback, MyTestTrainerCallback)) ] assert len(cbs) == 2 my_test, early_stopping = cbs assert early_stopping.early_stopping_patience == 5 assert early_stopping.early_stopping_threshold == 0.2 assert my_test.my_test_state == "test" def test_stateful_duplicate_callbacks(self): # Use something with non-defaults cbs = [MyTestExportableCallback("first"), MyTestExportableCallback("second")] trainer = self.get_trainer( callbacks=cbs, load_best_model_at_end=True, save_strategy="steps", eval_strategy="steps", save_steps=2, eval_steps=2, max_steps=2, ) trainer.train() # Create a new trainer with defaults trainer = self.get_trainer( callbacks=[MyTestExportableCallback(), MyTestExportableCallback()], load_best_model_at_end=True, save_strategy="steps", eval_strategy="steps", save_steps=2, eval_steps=2, max_steps=2, restore_callback_states_from_checkpoint=True, ) # Load it back in and verify values checkpoint = os.path.join(self.output_dir, "checkpoint-2") trainer.train(resume_from_checkpoint=checkpoint) cbs = [ callback for callback in trainer.callback_handler.callbacks if isinstance(callback, MyTestExportableCallback) ] assert len(cbs) == 2 assert cbs[0].my_test_state == "first" assert cbs[1].my_test_state == "second" def test_missing_stateful_callback(self): cb = EarlyStoppingCallback() trainer = self.get_trainer( callbacks=[cb], load_best_model_at_end=True, save_strategy="steps", eval_strategy="steps", save_steps=2, eval_steps=2, max_steps=2, ) trainer.train() # Create a new trainer with defaults trainer = self.get_trainer( save_strategy="steps", eval_strategy="steps", save_steps=2, eval_steps=2, max_steps=2, restore_callback_states_from_checkpoint=True, ) # Load it back in and verify values checkpoint = os.path.join(self.output_dir, "checkpoint-2") # warning should be emitted for not-present callbacks with patch("transformers.trainer.logger.warning") as warn_mock: trainer.train(resume_from_checkpoint=checkpoint) assert "EarlyStoppingCallback" in warn_mock.call_args[0][0] def test_stateful_control(self): trainer = self.get_trainer( max_steps=2, save_strategy="steps", save_steps=2, ) trainer.train() # Load it back in and verify values trainer = self.get_trainer(max_steps=2, restore_callback_states_from_checkpoint=True) checkpoint = os.path.join(self.output_dir, "checkpoint-2") trainer.state = TrainerState.load_from_json(os.path.join(checkpoint, TRAINER_STATE_NAME)) trainer._load_callback_state() assert trainer.control.should_training_stop def test_no_duplicate_save_on_epoch_save_strategy(self): times_saved = 0 class OnEndCallback(TrainerCallback): def on_step_end(self, args: TrainingArguments, state: TrainerState, control, **kwargs): nonlocal times_saved if control.should_save: times_saved += 1 def on_epoch_end(self, args: TrainingArguments, state: TrainerState, control, **kwargs): nonlocal times_saved if control.should_save: times_saved += 1 trainer = self.get_trainer(max_steps=2, save_strategy="epoch", callbacks=[OnEndCallback]) trainer.train() assert times_saved == 1