# coding=utf-8 # Copyright 2019 HuggingFace Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy import gc import inspect import os.path import random import tempfile import unittest import warnings from typing import Dict, List, Tuple from huggingface_hub import HfApi from requests.exceptions import HTTPError from transformers import is_torch_available, logging from transformers.file_utils import WEIGHTS_NAME, is_torch_fx_available from transformers.models.auto import get_values from transformers.testing_utils import ( ENDPOINT_STAGING, PASS, USER, CaptureLogger, is_staging_test, require_torch, require_torch_multi_gpu, slow, torch_device, ) if is_torch_available(): import numpy as np import torch from torch import nn from transformers import ( BERT_PRETRAINED_MODEL_ARCHIVE_LIST, MODEL_FOR_CAUSAL_LM_MAPPING, MODEL_FOR_IMAGE_CLASSIFICATION_MAPPING, MODEL_FOR_MASKED_LM_MAPPING, MODEL_FOR_MULTIPLE_CHOICE_MAPPING, MODEL_FOR_NEXT_SENTENCE_PREDICTION_MAPPING, MODEL_FOR_QUESTION_ANSWERING_MAPPING, MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING, MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING, MODEL_FOR_TOKEN_CLASSIFICATION_MAPPING, MODEL_MAPPING, AdaptiveEmbedding, BertConfig, BertModel, PretrainedConfig, PreTrainedModel, T5ForConditionalGeneration, ) if is_torch_fx_available(): from transformers.utils.fx import symbolic_trace def _config_zero_init(config): configs_no_init = copy.deepcopy(config) for key in configs_no_init.__dict__.keys(): if "_range" in key or "_std" in key or "initializer_factor" in key: setattr(configs_no_init, key, 1e-10) return configs_no_init TINY_T5 = "patrickvonplaten/t5-tiny-random" @require_torch class ModelTesterMixin: model_tester = None all_model_classes = () all_generative_model_classes = () fx_ready_model_classes = () test_torchscript = True test_pruning = True test_resize_embeddings = True test_head_masking = True test_missing_keys = True test_model_parallel = False is_encoder_decoder = False test_sequence_classification_problem_types = False def _prepare_for_class(self, inputs_dict, model_class, return_labels=False): inputs_dict = copy.deepcopy(inputs_dict) if model_class in get_values(MODEL_FOR_MULTIPLE_CHOICE_MAPPING): inputs_dict = { k: v.unsqueeze(1).expand(-1, self.model_tester.num_choices, -1).contiguous() if isinstance(v, torch.Tensor) and v.ndim > 1 else v for k, v in inputs_dict.items() } if return_labels: if model_class in get_values(MODEL_FOR_MULTIPLE_CHOICE_MAPPING): inputs_dict["labels"] = torch.ones(self.model_tester.batch_size, dtype=torch.long, device=torch_device) elif model_class in get_values(MODEL_FOR_QUESTION_ANSWERING_MAPPING): inputs_dict["start_positions"] = torch.zeros( self.model_tester.batch_size, dtype=torch.long, device=torch_device ) inputs_dict["end_positions"] = torch.zeros( self.model_tester.batch_size, dtype=torch.long, device=torch_device ) elif model_class in [ *get_values(MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING), *get_values(MODEL_FOR_NEXT_SENTENCE_PREDICTION_MAPPING), *get_values(MODEL_FOR_IMAGE_CLASSIFICATION_MAPPING), ]: inputs_dict["labels"] = torch.zeros( self.model_tester.batch_size, dtype=torch.long, device=torch_device ) elif model_class in [ *get_values(MODEL_FOR_TOKEN_CLASSIFICATION_MAPPING), *get_values(MODEL_FOR_CAUSAL_LM_MAPPING), *get_values(MODEL_FOR_MASKED_LM_MAPPING), *get_values(MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING), ]: inputs_dict["labels"] = torch.zeros( (self.model_tester.batch_size, self.model_tester.seq_length), dtype=torch.long, device=torch_device ) return inputs_dict def test_save_load(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) out_2 = outputs[0].cpu().numpy() out_2[np.isnan(out_2)] = 0 with tempfile.TemporaryDirectory() as tmpdirname: model.save_pretrained(tmpdirname) model = model_class.from_pretrained(tmpdirname) model.to(torch_device) with torch.no_grad(): after_outputs = model(**self._prepare_for_class(inputs_dict, model_class)) # Make sure we don't have nans out_1 = after_outputs[0].cpu().numpy() out_1[np.isnan(out_1)] = 0 max_diff = np.amax(np.abs(out_1 - out_2)) self.assertLessEqual(max_diff, 1e-5) def test_save_load__keys_to_ignore_on_save(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) _keys_to_ignore_on_save = getattr(model, "_keys_to_ignore_on_save", None) if _keys_to_ignore_on_save is None: continue # check the keys are in the original state_dict for k in _keys_to_ignore_on_save: self.assertIn(k, model.state_dict()) # check that certain keys didn't get saved with the model with tempfile.TemporaryDirectory() as tmpdirname: model.save_pretrained(tmpdirname) output_model_file = os.path.join(tmpdirname, WEIGHTS_NAME) state_dict_saved = torch.load(output_model_file) for k in _keys_to_ignore_on_save: self.assertNotIn(k, state_dict_saved) # Test we can load the state dict in the model, necessary for the checkpointing API in Trainer. load_result = model.load_state_dict(state_dict_saved, strict=False) self.assertTrue( len(load_result.missing_keys) == 0 or set(load_result.missing_keys) == set(model._keys_to_ignore_on_save) ) self.assertTrue(len(load_result.unexpected_keys) == 0) def _mock_init_weights(self, module): if hasattr(module, "weight") and module.weight is not None: module.weight.data.fill_(3) if hasattr(module, "bias") and module.bias is not None: module.bias.data.fill_(3) def test_save_load_fast_init_from_base(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() base_class = MODEL_MAPPING[config.__class__] if isinstance(base_class, tuple): base_class = base_class[0] for model_class in self.all_model_classes: if model_class == base_class: continue # make a copy of model class to not break future tests # from https://stackoverflow.com/questions/9541025/how-to-copy-a-python-class class CopyClass(model_class): pass model_class_copy = CopyClass # make sure that all keys are expected for test model_class_copy._keys_to_ignore_on_load_missing = [] # make init deterministic, but make sure that # non-initialized weights throw errors nevertheless model_class_copy._init_weights = self._mock_init_weights model = base_class(config) state_dict = model.state_dict() # this will often delete a single weight of a multi-weight module # to test an edge case random_key_to_del = random.choice(list(state_dict.keys())) del state_dict[random_key_to_del] # check that certain keys didn't get saved with the model with tempfile.TemporaryDirectory() as tmpdirname: model.save_pretrained(tmpdirname) torch.save(state_dict, os.path.join(tmpdirname, "pytorch_model.bin")) model_fast_init = model_class_copy.from_pretrained(tmpdirname) model_slow_init = model_class_copy.from_pretrained(tmpdirname, _fast_init=False) for key in model_fast_init.state_dict().keys(): max_diff = (model_slow_init.state_dict()[key] - model_fast_init.state_dict()[key]).sum().item() self.assertLessEqual(max_diff, 1e-3, msg=f"{key} not identical") def test_save_load_fast_init_to_base(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() base_class = MODEL_MAPPING[config.__class__] if isinstance(base_class, tuple): base_class = base_class[0] for model_class in self.all_model_classes: if model_class == base_class: continue # make a copy of model class to not break future tests # from https://stackoverflow.com/questions/9541025/how-to-copy-a-python-class class CopyClass(base_class): pass base_class_copy = CopyClass # make sure that all keys are expected for test base_class_copy._keys_to_ignore_on_load_missing = [] # make init deterministic, but make sure that # non-initialized weights throw errors nevertheless base_class_copy._init_weights = self._mock_init_weights model = model_class(config) state_dict = model.state_dict() # this will often delete a single weight of a multi-weight module # to test an edge case random_key_to_del = random.choice(list(state_dict.keys())) del state_dict[random_key_to_del] # check that certain keys didn't get saved with the model with tempfile.TemporaryDirectory() as tmpdirname: model.config.save_pretrained(tmpdirname) torch.save(state_dict, os.path.join(tmpdirname, "pytorch_model.bin")) model_fast_init = base_class_copy.from_pretrained(tmpdirname) model_slow_init = base_class_copy.from_pretrained(tmpdirname, _fast_init=False) for key in model_fast_init.state_dict().keys(): max_diff = (model_slow_init.state_dict()[key] - model_fast_init.state_dict()[key]).sum().item() self.assertLessEqual(max_diff, 1e-3, msg=f"{key} not identical") def test_initialization(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() configs_no_init = _config_zero_init(config) for model_class in self.all_model_classes: model = model_class(config=configs_no_init) for name, param in model.named_parameters(): if param.requires_grad: self.assertIn( ((param.data.mean() * 1e9).round() / 1e9).item(), [0.0, 1.0], msg=f"Parameter {name} of model {model_class} seems not properly initialized", ) def test_determinism(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) model.to(torch_device) model.eval() with torch.no_grad(): first = model(**self._prepare_for_class(inputs_dict, model_class))[0] second = model(**self._prepare_for_class(inputs_dict, model_class))[0] out_1 = first.cpu().numpy() out_2 = second.cpu().numpy() out_1 = out_1[~np.isnan(out_1)] out_2 = out_2[~np.isnan(out_2)] max_diff = np.amax(np.abs(out_1 - out_2)) self.assertLessEqual(max_diff, 1e-5) def test_forward_signature(self): config, _ = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) signature = inspect.signature(model.forward) # signature.parameters is an OrderedDict => so arg_names order is deterministic arg_names = [*signature.parameters.keys()] if model.config.is_encoder_decoder: expected_arg_names = [ "input_ids", "attention_mask", "decoder_input_ids", "decoder_attention_mask", ] expected_arg_names.extend( ["head_mask", "decoder_head_mask", "cross_attn_head_mask", "encoder_outputs"] if "head_mask" and "decoder_head_mask" and "cross_attn_head_mask" in arg_names else ["encoder_outputs"] ) self.assertListEqual(arg_names[: len(expected_arg_names)], expected_arg_names) else: expected_arg_names = ["input_ids"] self.assertListEqual(arg_names[:1], expected_arg_names) def test_training(self): if not self.model_tester.is_training: return config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() config.return_dict = True for model_class in self.all_model_classes: if model_class in get_values(MODEL_MAPPING): continue model = model_class(config) model.to(torch_device) model.train() inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) loss = model(**inputs).loss loss.backward() def test_training_gradient_checkpointing(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() if not self.model_tester.is_training or not hasattr(config, "gradient_checkpointing"): return config.gradient_checkpointing = True config.use_cache = False config.return_dict = True for model_class in self.all_model_classes: if model_class in get_values(MODEL_MAPPING): continue model = model_class(config) model.to(torch_device) model.train() inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) loss = model(**inputs).loss loss.backward() def test_attention_outputs(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() config.return_dict = True seq_len = getattr(self.model_tester, "seq_length", None) decoder_seq_length = getattr(self.model_tester, "decoder_seq_length", seq_len) encoder_seq_length = getattr(self.model_tester, "encoder_seq_length", seq_len) decoder_key_length = getattr(self.model_tester, "decoder_key_length", decoder_seq_length) encoder_key_length = getattr(self.model_tester, "key_length", encoder_seq_length) chunk_length = getattr(self.model_tester, "chunk_length", None) if chunk_length is not None and hasattr(self.model_tester, "num_hashes"): encoder_seq_length = encoder_seq_length * self.model_tester.num_hashes for model_class in self.all_model_classes: inputs_dict["output_attentions"] = True inputs_dict["output_hidden_states"] = False config.return_dict = True model = model_class(config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs.encoder_attentions if config.is_encoder_decoder else outputs.attentions self.assertEqual(len(attentions), self.model_tester.num_hidden_layers) # check that output_attentions also work using config del inputs_dict["output_attentions"] config.output_attentions = True model = model_class(config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs.encoder_attentions if config.is_encoder_decoder else outputs.attentions self.assertEqual(len(attentions), self.model_tester.num_hidden_layers) if chunk_length is not None: self.assertListEqual( list(attentions[0].shape[-4:]), [self.model_tester.num_attention_heads, encoder_seq_length, chunk_length, encoder_key_length], ) else: self.assertListEqual( list(attentions[0].shape[-3:]), [self.model_tester.num_attention_heads, encoder_seq_length, encoder_key_length], ) out_len = len(outputs) if self.is_encoder_decoder: correct_outlen = 5 # loss is at first position if "labels" in inputs_dict: correct_outlen += 1 # loss is added to beginning # Question Answering model returns start_logits and end_logits if model_class in get_values(MODEL_FOR_QUESTION_ANSWERING_MAPPING): correct_outlen += 1 # start_logits and end_logits instead of only 1 output if "past_key_values" in outputs: correct_outlen += 1 # past_key_values have been returned self.assertEqual(out_len, correct_outlen) # decoder attentions decoder_attentions = outputs.decoder_attentions self.assertIsInstance(decoder_attentions, (list, tuple)) self.assertEqual(len(decoder_attentions), self.model_tester.num_hidden_layers) self.assertListEqual( list(decoder_attentions[0].shape[-3:]), [self.model_tester.num_attention_heads, decoder_seq_length, decoder_key_length], ) # cross attentions cross_attentions = outputs.cross_attentions self.assertIsInstance(cross_attentions, (list, tuple)) self.assertEqual(len(cross_attentions), self.model_tester.num_hidden_layers) self.assertListEqual( list(cross_attentions[0].shape[-3:]), [ self.model_tester.num_attention_heads, decoder_seq_length, encoder_key_length, ], ) # Check attention is always last and order is fine inputs_dict["output_attentions"] = True inputs_dict["output_hidden_states"] = True model = model_class(config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) if hasattr(self.model_tester, "num_hidden_states_types"): added_hidden_states = self.model_tester.num_hidden_states_types elif self.is_encoder_decoder: added_hidden_states = 2 else: added_hidden_states = 1 self.assertEqual(out_len + added_hidden_states, len(outputs)) self_attentions = outputs.encoder_attentions if config.is_encoder_decoder else outputs.attentions self.assertEqual(len(self_attentions), self.model_tester.num_hidden_layers) if chunk_length is not None: self.assertListEqual( list(self_attentions[0].shape[-4:]), [self.model_tester.num_attention_heads, encoder_seq_length, chunk_length, encoder_key_length], ) else: self.assertListEqual( list(self_attentions[0].shape[-3:]), [self.model_tester.num_attention_heads, encoder_seq_length, encoder_key_length], ) @slow def test_torchscript(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() self._create_and_check_torchscript(config, inputs_dict) @slow def test_torchscript_output_attentions(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() config.output_attentions = True self._create_and_check_torchscript(config, inputs_dict) @slow def test_torchscript_output_hidden_state(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() config.output_hidden_states = True self._create_and_check_torchscript(config, inputs_dict) def _create_and_check_torchscript(self, config, inputs_dict): if not self.test_torchscript: return configs_no_init = _config_zero_init(config) # To be sure we have no Nan configs_no_init.torchscript = True for model_class in self.all_model_classes: model = model_class(config=configs_no_init) model.to(torch_device) model.eval() inputs = self._prepare_for_class(inputs_dict, model_class) try: if model.config.is_encoder_decoder: model.config.use_cache = False # FSTM still requires this hack -> FSTM should probably be refactored similar to BART afterward input_ids = inputs["input_ids"] attention_mask = inputs["attention_mask"] decoder_input_ids = inputs["decoder_input_ids"] decoder_attention_mask = inputs["decoder_attention_mask"] traced_model = torch.jit.trace( model, (input_ids, attention_mask, decoder_input_ids, decoder_attention_mask) ) else: input_ids = inputs["input_ids"] traced_model = torch.jit.trace(model, input_ids) except RuntimeError: self.fail("Couldn't trace module.") with tempfile.TemporaryDirectory() as tmp_dir_name: pt_file_name = os.path.join(tmp_dir_name, "traced_model.pt") try: torch.jit.save(traced_model, pt_file_name) except Exception: self.fail("Couldn't save module.") try: loaded_model = torch.jit.load(pt_file_name) except Exception: self.fail("Couldn't load module.") model.to(torch_device) model.eval() loaded_model.to(torch_device) loaded_model.eval() model_state_dict = model.state_dict() loaded_model_state_dict = loaded_model.state_dict() non_persistent_buffers = {} for key in loaded_model_state_dict.keys(): if key not in model_state_dict.keys(): non_persistent_buffers[key] = loaded_model_state_dict[key] loaded_model_state_dict = { key: value for key, value in loaded_model_state_dict.items() if key not in non_persistent_buffers } self.assertEqual(set(model_state_dict.keys()), set(loaded_model_state_dict.keys())) model_buffers = list(model.buffers()) for non_persistent_buffer in non_persistent_buffers.values(): found_buffer = False for i, model_buffer in enumerate(model_buffers): if torch.equal(non_persistent_buffer, model_buffer): found_buffer = True break self.assertTrue(found_buffer) model_buffers.pop(i) models_equal = True for layer_name, p1 in model_state_dict.items(): if layer_name in loaded_model_state_dict: p2 = loaded_model_state_dict[layer_name] if p1.data.ne(p2.data).sum() > 0: models_equal = False self.assertTrue(models_equal) def test_torch_fx(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() self._create_and_check_torch_fx_tracing(config, inputs_dict) def test_torch_fx_output_loss(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() self._create_and_check_torch_fx_tracing(config, inputs_dict, output_loss=True) def _create_and_check_torch_fx_tracing(self, config, inputs_dict, output_loss=False): if not is_torch_fx_available(): return configs_no_init = _config_zero_init(config) # To be sure we have no Nan configs_no_init.return_dict = False for model_class in self.fx_ready_model_classes: model = model_class(config=configs_no_init) model.to(torch_device) model.eval() inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=output_loss) try: if model.config.is_encoder_decoder: model.config.use_cache = False # FSTM still requires this hack -> FSTM should probably be refactored similar to BART afterward input_ids = inputs["input_ids"] decoder_attention_mask = inputs["decoder_attention_mask"] labels = inputs.get("labels", None) input_names = ["input_ids", "attention_mask", "decoder_input_ids", "decoder_attention_mask"] if labels is not None: input_names.append("labels") filtered_inputs = {k: v for (k, v) in inputs.items() if k in input_names} model_output = model(**filtered_inputs) batch_size = input_ids.shape[0] encoder_sequence_length = input_ids.shape[1] decoder_sequence_length = decoder_attention_mask.shape[1] traced_model = symbolic_trace( model, input_names, batch_size=batch_size, sequence_length=[encoder_sequence_length, decoder_sequence_length], ) traced_output = traced_model(**filtered_inputs) else: input_names = ["input_ids", "attention_mask", "token_type_ids"] input_ids = inputs["input_ids"] labels = inputs.get("labels", None) start_positions = inputs.get("start_positions", None) end_positions = inputs.get("end_positions", None) if labels is not None: input_names.append("labels") if start_positions is not None: input_names.append("start_positions") if end_positions is not None: input_names.append("end_positions") filtered_inputs = {k: v for (k, v) in inputs.items() if k in input_names} input_names = filtered_inputs.keys() model_output = model(**filtered_inputs) rank = len(input_ids.shape) if rank == 2: batch_size, sequence_length = input_ids.shape num_choices = -1 elif rank == 3: batch_size, num_choices, sequence_length = input_ids.shape else: raise NotImplementedError( f"symbolic_trace automatic parameters inference not implemented for input of rank {rank}." ) traced_model = symbolic_trace( model, input_names, batch_size=batch_size, sequence_length=sequence_length, num_choices=num_choices, ) traced_output = traced_model(**filtered_inputs) except RuntimeError: self.fail("Couldn't trace module.") def flatten_output(output): flatten = [] for x in output: if isinstance(x, (tuple, list)): flatten += flatten_output(x) elif not isinstance(x, torch.Tensor): continue else: flatten.append(x) return flatten model_output = flatten_output(model_output) traced_output = flatten_output(traced_output) num_outputs = len(model_output) for i in range(num_outputs): self.assertTrue( torch.allclose(model_output[i], traced_output[i]), f"traced {i}th output doesn't match model {i}th output for {model_class}", ) def test_headmasking(self): if not self.test_head_masking: return global_rng.seed(42) config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() global_rng.seed() inputs_dict["output_attentions"] = True config.output_hidden_states = True configs_no_init = _config_zero_init(config) # To be sure we have no Nan for model_class in self.all_model_classes: model = model_class(config=configs_no_init) model.to(torch_device) model.eval() # Prepare head_mask # Set require_grad after having prepared the tensor to avoid error (leaf variable has been moved into the graph interior) head_mask = torch.ones( self.model_tester.num_hidden_layers, self.model_tester.num_attention_heads, device=torch_device, ) head_mask[0, 0] = 0 head_mask[-1, :-1] = 0 head_mask.requires_grad_(requires_grad=True) inputs = self._prepare_for_class(inputs_dict, model_class).copy() inputs["head_mask"] = head_mask if model.config.is_encoder_decoder: signature = inspect.signature(model.forward) arg_names = [*signature.parameters.keys()] if "decoder_head_mask" in arg_names: # necessary diferentiation because of T5 model inputs["decoder_head_mask"] = head_mask if "cross_attn_head_mask" in arg_names: inputs["cross_attn_head_mask"] = head_mask outputs = model(**inputs, return_dict=True) # Test that we can get a gradient back for importance score computation output = sum(t.sum() for t in outputs[0]) output = output.sum() output.backward() multihead_outputs = head_mask.grad self.assertIsNotNone(multihead_outputs) self.assertEqual(len(multihead_outputs), self.model_tester.num_hidden_layers) def check_attentions_validity(attentions): # Remove Nan for t in attentions: self.assertLess( torch.sum(torch.isnan(t)), t.numel() / 4 ) # Check we don't have more than 25% nans (arbitrary) attentions = [ t.masked_fill(torch.isnan(t), 0.0) for t in attentions ] # remove them (the test is less complete) self.assertAlmostEqual(attentions[0][..., 0, :, :].flatten().sum().item(), 0.0) self.assertNotEqual(attentions[0][..., -1, :, :].flatten().sum().item(), 0.0) if len(attentions) > 2: # encoder-decoder models have only 2 layers in each module self.assertNotEqual(attentions[1][..., 0, :, :].flatten().sum().item(), 0.0) self.assertAlmostEqual(attentions[-1][..., -2, :, :].flatten().sum().item(), 0.0) self.assertNotEqual(attentions[-1][..., -1, :, :].flatten().sum().item(), 0.0) if model.config.is_encoder_decoder: check_attentions_validity(outputs.encoder_attentions) check_attentions_validity(outputs.decoder_attentions) check_attentions_validity(outputs.cross_attentions) else: check_attentions_validity(outputs.attentions) def test_head_pruning(self): if not self.test_pruning: return for model_class in self.all_model_classes: ( config, inputs_dict, ) = self.model_tester.prepare_config_and_inputs_for_common() if "head_mask" in inputs_dict: del inputs_dict["head_mask"] inputs_dict["output_attentions"] = True config.output_hidden_states = False model = model_class(config=config) model.to(torch_device) model.eval() heads_to_prune = { 0: list(range(1, self.model_tester.num_attention_heads)), -1: [0], } model.prune_heads(heads_to_prune) with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs[-1] self.assertEqual(attentions[0].shape[-3], 1) self.assertEqual(attentions[1].shape[-3], self.model_tester.num_attention_heads) self.assertEqual(attentions[-1].shape[-3], self.model_tester.num_attention_heads - 1) def test_head_pruning_save_load_from_pretrained(self): if not self.test_pruning: return for model_class in self.all_model_classes: ( config, inputs_dict, ) = self.model_tester.prepare_config_and_inputs_for_common() if "head_mask" in inputs_dict: del inputs_dict["head_mask"] inputs_dict["output_attentions"] = True config.output_hidden_states = False model = model_class(config=config) model.to(torch_device) model.eval() heads_to_prune = { 0: list(range(1, self.model_tester.num_attention_heads)), -1: [0], } model.prune_heads(heads_to_prune) with tempfile.TemporaryDirectory() as temp_dir_name: model.save_pretrained(temp_dir_name) model = model_class.from_pretrained(temp_dir_name) model.to(torch_device) with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs[-1] self.assertEqual(attentions[0].shape[-3], 1) self.assertEqual(attentions[1].shape[-3], self.model_tester.num_attention_heads) self.assertEqual(attentions[-1].shape[-3], self.model_tester.num_attention_heads - 1) def test_head_pruning_save_load_from_config_init(self): if not self.test_pruning: return for model_class in self.all_model_classes: ( config, inputs_dict, ) = self.model_tester.prepare_config_and_inputs_for_common() if "head_mask" in inputs_dict: del inputs_dict["head_mask"] inputs_dict["output_attentions"] = True config.output_hidden_states = False heads_to_prune = { 0: list(range(1, self.model_tester.num_attention_heads)), -1: [0], } config.pruned_heads = heads_to_prune model = model_class(config=config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs[-1] self.assertEqual(attentions[0].shape[-3], 1) self.assertEqual(attentions[1].shape[-3], self.model_tester.num_attention_heads) self.assertEqual(attentions[-1].shape[-3], self.model_tester.num_attention_heads - 1) def test_head_pruning_integration(self): if not self.test_pruning: return for model_class in self.all_model_classes: ( config, inputs_dict, ) = self.model_tester.prepare_config_and_inputs_for_common() if "head_mask" in inputs_dict: del inputs_dict["head_mask"] inputs_dict["output_attentions"] = True config.output_hidden_states = False heads_to_prune = {0: [0], 1: [1, 2]} config.pruned_heads = heads_to_prune model = model_class(config=config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs[-1] self.assertEqual(attentions[0].shape[-3], self.model_tester.num_attention_heads - 1) self.assertEqual(attentions[1].shape[-3], self.model_tester.num_attention_heads - 2) self.assertEqual(attentions[2].shape[-3], self.model_tester.num_attention_heads) self.assertEqual(attentions[3].shape[-3], self.model_tester.num_attention_heads) with tempfile.TemporaryDirectory() as temp_dir_name: model.save_pretrained(temp_dir_name) model = model_class.from_pretrained(temp_dir_name) model.to(torch_device) with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs[-1] self.assertEqual(attentions[0].shape[-3], self.model_tester.num_attention_heads - 1) self.assertEqual(attentions[1].shape[-3], self.model_tester.num_attention_heads - 2) self.assertEqual(attentions[2].shape[-3], self.model_tester.num_attention_heads) self.assertEqual(attentions[3].shape[-3], self.model_tester.num_attention_heads) heads_to_prune = {0: [0], 2: [1, 2]} model.prune_heads(heads_to_prune) with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs[-1] self.assertEqual(attentions[0].shape[-3], self.model_tester.num_attention_heads - 1) self.assertEqual(attentions[1].shape[-3], self.model_tester.num_attention_heads - 2) self.assertEqual(attentions[2].shape[-3], self.model_tester.num_attention_heads - 2) self.assertEqual(attentions[3].shape[-3], self.model_tester.num_attention_heads) self.assertDictEqual(model.config.pruned_heads, {0: [0], 1: [1, 2], 2: [1, 2]}) def test_hidden_states_output(self): def check_hidden_states_output(inputs_dict, config, model_class): model = model_class(config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) hidden_states = outputs.encoder_hidden_states if config.is_encoder_decoder else outputs.hidden_states expected_num_layers = getattr( self.model_tester, "expected_num_hidden_layers", self.model_tester.num_hidden_layers + 1 ) self.assertEqual(len(hidden_states), expected_num_layers) if hasattr(self.model_tester, "encoder_seq_length"): seq_length = self.model_tester.encoder_seq_length if hasattr(self.model_tester, "chunk_length") and self.model_tester.chunk_length > 1: seq_length = seq_length * self.model_tester.chunk_length else: seq_length = self.model_tester.seq_length self.assertListEqual( list(hidden_states[0].shape[-2:]), [seq_length, self.model_tester.hidden_size], ) if config.is_encoder_decoder: hidden_states = outputs.decoder_hidden_states self.assertIsInstance(hidden_states, (list, tuple)) self.assertEqual(len(hidden_states), expected_num_layers) seq_len = getattr(self.model_tester, "seq_length", None) decoder_seq_length = getattr(self.model_tester, "decoder_seq_length", seq_len) self.assertListEqual( list(hidden_states[0].shape[-2:]), [decoder_seq_length, self.model_tester.hidden_size], ) config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: inputs_dict["output_hidden_states"] = True check_hidden_states_output(inputs_dict, config, model_class) # check that output_hidden_states also work using config del inputs_dict["output_hidden_states"] config.output_hidden_states = True check_hidden_states_output(inputs_dict, config, model_class) def test_retain_grad_hidden_states_attentions(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() config.output_hidden_states = True config.output_attentions = True # no need to test all models as different heads yield the same functionality model_class = self.all_model_classes[0] model = model_class(config) model.to(torch_device) inputs = self._prepare_for_class(inputs_dict, model_class) outputs = model(**inputs) output = outputs[0] if config.is_encoder_decoder: # Seq2Seq models encoder_hidden_states = outputs.encoder_hidden_states[0] encoder_attentions = outputs.encoder_attentions[0] encoder_hidden_states.retain_grad() encoder_attentions.retain_grad() decoder_hidden_states = outputs.decoder_hidden_states[0] decoder_attentions = outputs.decoder_attentions[0] decoder_hidden_states.retain_grad() decoder_attentions.retain_grad() cross_attentions = outputs.cross_attentions[0] cross_attentions.retain_grad() output.flatten()[0].backward(retain_graph=True) self.assertIsNotNone(encoder_hidden_states.grad) self.assertIsNotNone(encoder_attentions.grad) self.assertIsNotNone(decoder_hidden_states.grad) self.assertIsNotNone(decoder_attentions.grad) self.assertIsNotNone(cross_attentions.grad) else: # Encoder-/Decoder-only models hidden_states = outputs.hidden_states[0] attentions = outputs.attentions[0] hidden_states.retain_grad() attentions.retain_grad() output.flatten()[0].backward(retain_graph=True) self.assertIsNotNone(hidden_states.grad) self.assertIsNotNone(attentions.grad) def test_feed_forward_chunking(self): ( original_config, inputs_dict, ) = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: torch.manual_seed(0) config = copy.deepcopy(original_config) model = model_class(config) model.to(torch_device) model.eval() hidden_states_no_chunk = model(**self._prepare_for_class(inputs_dict, model_class))[0] torch.manual_seed(0) config.chunk_size_feed_forward = 1 model = model_class(config) model.to(torch_device) model.eval() hidden_states_with_chunk = model(**self._prepare_for_class(inputs_dict, model_class))[0] self.assertTrue(torch.allclose(hidden_states_no_chunk, hidden_states_with_chunk, atol=1e-3)) def test_resize_tokens_embeddings(self): ( original_config, inputs_dict, ) = self.model_tester.prepare_config_and_inputs_for_common() if not self.test_resize_embeddings: return for model_class in self.all_model_classes: config = copy.deepcopy(original_config) model = model_class(config) model.to(torch_device) if self.model_tester.is_training is False: model.eval() model_vocab_size = config.vocab_size # Retrieve the embeddings and clone theme model_embed = model.resize_token_embeddings(model_vocab_size) cloned_embeddings = model_embed.weight.clone() # Check that resizing the token embeddings with a larger vocab size increases the model's vocab size model_embed = model.resize_token_embeddings(model_vocab_size + 10) self.assertEqual(model.config.vocab_size, model_vocab_size + 10) # Check that it actually resizes the embeddings matrix self.assertEqual(model_embed.weight.shape[0], cloned_embeddings.shape[0] + 10) # Check that the model can still do a forward pass successfully (every parameter should be resized) model(**self._prepare_for_class(inputs_dict, model_class)) # Check that resizing the token embeddings with a smaller vocab size decreases the model's vocab size model_embed = model.resize_token_embeddings(model_vocab_size - 15) self.assertEqual(model.config.vocab_size, model_vocab_size - 15) # Check that it actually resizes the embeddings matrix self.assertEqual(model_embed.weight.shape[0], cloned_embeddings.shape[0] - 15) # Check that the model can still do a forward pass successfully (every parameter should be resized) # Input ids should be clamped to the maximum size of the vocabulary inputs_dict["input_ids"].clamp_(max=model_vocab_size - 15 - 1) # make sure that decoder_input_ids are resized as well if "decoder_input_ids" in inputs_dict: inputs_dict["decoder_input_ids"].clamp_(max=model_vocab_size - 15 - 1) model(**self._prepare_for_class(inputs_dict, model_class)) # Check that adding and removing tokens has not modified the first part of the embedding matrix. models_equal = True for p1, p2 in zip(cloned_embeddings, model_embed.weight): if p1.data.ne(p2.data).sum() > 0: models_equal = False self.assertTrue(models_equal) def test_resize_embeddings_untied(self): ( original_config, inputs_dict, ) = self.model_tester.prepare_config_and_inputs_for_common() if not self.test_resize_embeddings: return original_config.tie_word_embeddings = False # if model cannot untied embeddings -> leave test if original_config.tie_word_embeddings: return for model_class in self.all_model_classes: config = copy.deepcopy(original_config) model = model_class(config).to(torch_device) # if no output embeddings -> leave test if model.get_output_embeddings() is None: continue # Check that resizing the token embeddings with a larger vocab size increases the model's vocab size model_vocab_size = config.vocab_size model.resize_token_embeddings(model_vocab_size + 10) self.assertEqual(model.config.vocab_size, model_vocab_size + 10) output_embeds = model.get_output_embeddings() self.assertEqual(output_embeds.weight.shape[0], model_vocab_size + 10) # Check bias if present if output_embeds.bias is not None: self.assertEqual(output_embeds.bias.shape[0], model_vocab_size + 10) # Check that the model can still do a forward pass successfully (every parameter should be resized) model(**self._prepare_for_class(inputs_dict, model_class)) # Check that resizing the token embeddings with a smaller vocab size decreases the model's vocab size model.resize_token_embeddings(model_vocab_size - 15) self.assertEqual(model.config.vocab_size, model_vocab_size - 15) # Check that it actually resizes the embeddings matrix output_embeds = model.get_output_embeddings() self.assertEqual(output_embeds.weight.shape[0], model_vocab_size - 15) # Check bias if present if output_embeds.bias is not None: self.assertEqual(output_embeds.bias.shape[0], model_vocab_size - 15) # Check that the model can still do a forward pass successfully (every parameter should be resized) # Input ids should be clamped to the maximum size of the vocabulary inputs_dict["input_ids"].clamp_(max=model_vocab_size - 15 - 1) if "decoder_input_ids" in inputs_dict: inputs_dict["decoder_input_ids"].clamp_(max=model_vocab_size - 15 - 1) # Check that the model can still do a forward pass successfully (every parameter should be resized) model(**self._prepare_for_class(inputs_dict, model_class)) def test_model_common_attributes(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) self.assertIsInstance(model.get_input_embeddings(), (nn.Embedding, AdaptiveEmbedding)) model.set_input_embeddings(nn.Embedding(10, 10)) x = model.get_output_embeddings() self.assertTrue(x is None or isinstance(x, nn.Linear)) def test_correct_missing_keys(self): if not self.test_missing_keys: return config, _ = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) base_model_prefix = model.base_model_prefix if hasattr(model, base_model_prefix): with tempfile.TemporaryDirectory() as temp_dir_name: model.base_model.save_pretrained(temp_dir_name) model, loading_info = model_class.from_pretrained(temp_dir_name, output_loading_info=True) with self.subTest(msg=f"Missing keys for {model.__class__.__name__}"): self.assertGreater(len(loading_info["missing_keys"]), 0) def test_tie_model_weights(self): if not self.test_torchscript: return config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() def check_same_values(layer_1, layer_2): equal = True for p1, p2 in zip(layer_1.weight, layer_2.weight): if p1.data.ne(p2.data).sum() > 0: equal = False return equal for model_class in self.all_model_classes: config.torchscript = True model_not_tied = model_class(config) if model_not_tied.get_output_embeddings() is None: continue config_tied = copy.deepcopy(config) config_tied.torchscript = False model_tied = model_class(config_tied) params_tied = list(model_tied.parameters()) # Check that the embedding layer and decoding layer are the same in size and in value # self.assertTrue(check_same_values(embeddings, decoding)) # # Check that after modification, they remain the same. # embeddings.weight.data.div_(2) # # Check that the embedding layer and decoding layer are the same in size and in value # self.assertTrue(embeddings.weight.shape, decoding.weight.shape) # self.assertTrue(check_same_values(embeddings, decoding)) # # Check that after modification, they remain the same. # decoding.weight.data.div_(4) # # Check that the embedding layer and decoding layer are the same in size and in value # self.assertTrue(embeddings.weight.shape, decoding.weight.shape) # self.assertTrue(check_same_values(embeddings, decoding)) # Check that after resize they remain tied. model_tied.resize_token_embeddings(config.vocab_size + 10) params_tied_2 = list(model_tied.parameters()) self.assertEqual(len(params_tied_2), len(params_tied)) # decoding.weight.data.mul_(20) # # Check that the embedding layer and decoding layer are the same in size and in value # self.assertTrue(model.transformer.wte.weight.shape, model.lm_head.weight.shape) # self.assertTrue(check_same_values(model.transformer.wte, model.lm_head)) def test_model_outputs_equivalence(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() def set_nan_tensor_to_zero(t): t[t != t] = 0 return t def check_equivalence(model, tuple_inputs, dict_inputs, additional_kwargs={}): with torch.no_grad(): tuple_output = model(**tuple_inputs, return_dict=False, **additional_kwargs) dict_output = model(**dict_inputs, return_dict=True, **additional_kwargs).to_tuple() def recursive_check(tuple_object, dict_object): if isinstance(tuple_object, (List, Tuple)): for tuple_iterable_value, dict_iterable_value in zip(tuple_object, dict_object): recursive_check(tuple_iterable_value, dict_iterable_value) elif isinstance(tuple_object, Dict): for tuple_iterable_value, dict_iterable_value in zip( tuple_object.values(), dict_object.values() ): recursive_check(tuple_iterable_value, dict_iterable_value) elif tuple_object is None: return else: self.assertTrue( torch.allclose( set_nan_tensor_to_zero(tuple_object), set_nan_tensor_to_zero(dict_object), atol=1e-5 ), msg=f"Tuple and dict output are not equal. Difference: {torch.max(torch.abs(tuple_object - dict_object))}. Tuple has `nan`: {torch.isnan(tuple_object).any()} and `inf`: {torch.isinf(tuple_object)}. Dict has `nan`: {torch.isnan(dict_object).any()} and `inf`: {torch.isinf(dict_object)}.", ) recursive_check(tuple_output, dict_output) for model_class in self.all_model_classes: model = model_class(config) model.to(torch_device) model.eval() tuple_inputs = self._prepare_for_class(inputs_dict, model_class) dict_inputs = self._prepare_for_class(inputs_dict, model_class) check_equivalence(model, tuple_inputs, dict_inputs) tuple_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) dict_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) check_equivalence(model, tuple_inputs, dict_inputs) tuple_inputs = self._prepare_for_class(inputs_dict, model_class) dict_inputs = self._prepare_for_class(inputs_dict, model_class) check_equivalence(model, tuple_inputs, dict_inputs, {"output_hidden_states": True}) tuple_inputs = self._prepare_for_class(inputs_dict, model_class) dict_inputs = self._prepare_for_class(inputs_dict, model_class) check_equivalence(model, tuple_inputs, dict_inputs, {"output_attentions": True}) tuple_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) dict_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) check_equivalence(model, tuple_inputs, dict_inputs, {"output_hidden_states": True}) tuple_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) dict_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) check_equivalence(model, tuple_inputs, dict_inputs, {"output_attentions": True}) tuple_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) dict_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) check_equivalence( model, tuple_inputs, dict_inputs, {"output_hidden_states": True, "output_attentions": True} ) def test_inputs_embeds(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) model.to(torch_device) model.eval() inputs = copy.deepcopy(self._prepare_for_class(inputs_dict, model_class)) if not self.is_encoder_decoder: input_ids = inputs["input_ids"] del inputs["input_ids"] else: encoder_input_ids = inputs["input_ids"] decoder_input_ids = inputs.get("decoder_input_ids", encoder_input_ids) del inputs["input_ids"] inputs.pop("decoder_input_ids", None) wte = model.get_input_embeddings() if not self.is_encoder_decoder: inputs["inputs_embeds"] = wte(input_ids) else: inputs["inputs_embeds"] = wte(encoder_input_ids) inputs["decoder_inputs_embeds"] = wte(decoder_input_ids) with torch.no_grad(): model(**inputs)[0] @require_torch_multi_gpu def test_multi_gpu_data_parallel_forward(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() # some params shouldn't be scattered by nn.DataParallel # so just remove them if they are present. blacklist_non_batched_params = ["head_mask", "decoder_head_mask", "cross_attn_head_mask"] for k in blacklist_non_batched_params: inputs_dict.pop(k, None) # move input tensors to cuda:O for k, v in inputs_dict.items(): if torch.is_tensor(v): inputs_dict[k] = v.to(0) for model_class in self.all_model_classes: model = model_class(config=config) model.to(0) model.eval() # Wrap model in nn.DataParallel model = nn.DataParallel(model) with torch.no_grad(): _ = model(**self._prepare_for_class(inputs_dict, model_class)) @require_torch_multi_gpu def test_model_parallelization(self): if not self.test_model_parallel: return # a candidate for testing_utils def get_current_gpu_memory_use(): """returns a list of cuda memory allocations per GPU in MBs""" per_device_memory = [] for id in range(torch.cuda.device_count()): with torch.cuda.device(id): per_device_memory.append(torch.cuda.memory_allocated() >> 20) return per_device_memory # Needs a large model to see the difference. config = self.model_tester.get_large_model_config() for model_class in self.all_parallelizable_model_classes: torch.cuda.empty_cache() # 1. single gpu memory load + unload + memory measurements # Retrieve initial memory usage (can easily be ~0.6-1.5GB if cuda-kernels have been preloaded by previous tests) memory_at_start = get_current_gpu_memory_use() # Put model on device 0 and take a memory snapshot model = model_class(config) model.to("cuda:0") memory_after_model_load = get_current_gpu_memory_use() # The memory use on device 0 should be higher than it was initially. self.assertGreater(memory_after_model_load[0], memory_at_start[0]) del model gc.collect() torch.cuda.empty_cache() # 2. MP test # it's essential to re-calibrate the usage before the next stage memory_at_start = get_current_gpu_memory_use() # Spread model layers over multiple devices model = model_class(config) model.parallelize() memory_after_parallelization = get_current_gpu_memory_use() # Assert that the memory use on all devices is higher than it was when loaded only on CPU for n in range(torch.cuda.device_count()): self.assertGreater(memory_after_parallelization[n], memory_at_start[n]) # Assert that the memory use of device 0 is lower than it was when the entire model was loaded on it self.assertLess(memory_after_parallelization[0], memory_after_model_load[0]) # Assert that the memory use of device 1 is higher than it was when the entire model was loaded # on device 0 and device 1 wasn't used at all self.assertGreater(memory_after_parallelization[1], memory_after_model_load[1]) del model gc.collect() torch.cuda.empty_cache() @require_torch_multi_gpu def test_model_parallel_equal_results(self): if not self.test_model_parallel: return config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_parallelizable_model_classes: inputs_dict = self._prepare_for_class(inputs_dict, model_class) def cast_to_device(dictionary, device): output = {} for k, v in dictionary.items(): if isinstance(v, torch.Tensor): output[k] = v.to(device) else: output[k] = v return output model = model_class(config) output = model(**cast_to_device(inputs_dict, "cpu")) model.parallelize() parallel_output = model(**cast_to_device(inputs_dict, "cuda:0")) for value, parallel_value in zip(output, parallel_output): if isinstance(value, torch.Tensor): self.assertTrue(torch.allclose(value, parallel_value.to("cpu"), atol=1e-7)) elif isinstance(value, (Tuple, List)): for value_, parallel_value_ in zip(value, parallel_value): self.assertTrue(torch.allclose(value_, parallel_value_.to("cpu"), atol=1e-7)) @require_torch_multi_gpu def test_model_parallel_beam_search(self): if not self.test_model_parallel: return all_generative_and_parallelizable_model_classes = tuple( set(self.all_generative_model_classes).intersection(self.all_parallelizable_model_classes) ) config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in all_generative_and_parallelizable_model_classes: inputs_dict = self._prepare_for_class(inputs_dict, model_class) model = model_class(config) def cast_to_device(dictionary, device): output = {} for k, v in dictionary.items(): if isinstance(v, torch.Tensor): output[k] = v.to(device) else: output[k] = v return output model.parallelize() model.generate(**cast_to_device(inputs_dict, "cuda:0"), num_beams=2) def test_problem_types(self): if not self.test_sequence_classification_problem_types: return config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() problem_types = [ {"title": "multi_label_classification", "num_labels": 2, "dtype": torch.float}, {"title": "single_label_classification", "num_labels": 1, "dtype": torch.long}, {"title": "regression", "num_labels": 1, "dtype": torch.float}, ] for model_class in self.all_model_classes: if model_class not in get_values(MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING): continue for problem_type in problem_types: with self.subTest(msg=f"Testing {model_class} with {problem_type['title']}"): config.problem_type = problem_type["title"] config.num_labels = problem_type["num_labels"] model = model_class(config) model.to(torch_device) model.train() inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) if problem_type["num_labels"] > 1: inputs["labels"] = inputs["labels"].unsqueeze(1).repeat(1, problem_type["num_labels"]) inputs["labels"] = inputs["labels"].to(problem_type["dtype"]) # This tests that we do not trigger the warning form PyTorch "Using a target size that is different # to the input size. This will likely lead to incorrect results due to broadcasting. Please ensure # they have the same size." which is a symptom something in wrong for the regression problem. # See https://github.com/huggingface/transformers/issues/11780 with warnings.catch_warnings(record=True) as warning_list: loss = model(**inputs).loss self.assertListEqual(warning_list, []) loss.backward() global_rng = random.Random() def ids_tensor(shape, vocab_size, rng=None, name=None): # Creates a random int32 tensor of the shape within the vocab size if rng is None: rng = global_rng total_dims = 1 for dim in shape: total_dims *= dim values = [] for _ in range(total_dims): values.append(rng.randint(0, vocab_size - 1)) return torch.tensor(data=values, dtype=torch.long, device=torch_device).view(shape).contiguous() def random_attention_mask(shape, rng=None, name=None): attn_mask = ids_tensor(shape, vocab_size=2, rng=None, name=None) # make sure that at least one token is attended to for each batch attn_mask[:, -1] = 1 return attn_mask def floats_tensor(shape, scale=1.0, rng=None, name=None): """Creates a random float32 tensor""" if rng is None: rng = global_rng total_dims = 1 for dim in shape: total_dims *= dim values = [] for _ in range(total_dims): values.append(rng.random() * scale) return torch.tensor(data=values, dtype=torch.float, device=torch_device).view(shape).contiguous() @require_torch class ModelUtilsTest(unittest.TestCase): @slow def test_model_from_pretrained(self): for model_name in BERT_PRETRAINED_MODEL_ARCHIVE_LIST[:1]: config = BertConfig.from_pretrained(model_name) self.assertIsNotNone(config) self.assertIsInstance(config, PretrainedConfig) model = BertModel.from_pretrained(model_name) model, loading_info = BertModel.from_pretrained(model_name, output_loading_info=True) self.assertIsNotNone(model) self.assertIsInstance(model, PreTrainedModel) for value in loading_info.values(): self.assertEqual(len(value), 0) config = BertConfig.from_pretrained(model_name, output_attentions=True, output_hidden_states=True) # Not sure this is the intended behavior. TODO fix Lysandre & Thom config.name_or_path = model_name model = BertModel.from_pretrained(model_name, output_attentions=True, output_hidden_states=True) self.assertEqual(model.config.output_hidden_states, True) self.assertEqual(model.config, config) def test_model_from_pretrained_with_different_pretrained_model_name(self): model = T5ForConditionalGeneration.from_pretrained(TINY_T5) self.assertIsNotNone(model) logger = logging.get_logger("transformers.configuration_utils") with CaptureLogger(logger) as cl: BertModel.from_pretrained(TINY_T5) self.assertTrue("You are using a model of type t5 to instantiate a model of type bert" in cl.out) @require_torch @is_staging_test class ModelPushToHubTester(unittest.TestCase): @classmethod def setUpClass(cls): cls._api = HfApi(endpoint=ENDPOINT_STAGING) cls._token = cls._api.login(username=USER, password=PASS) @classmethod def tearDownClass(cls): try: cls._api.delete_repo(token=cls._token, name="test-model") except HTTPError: pass try: cls._api.delete_repo(token=cls._token, name="test-model-org", organization="valid_org") except HTTPError: pass def test_push_to_hub(self): config = BertConfig( vocab_size=99, hidden_size=32, num_hidden_layers=5, num_attention_heads=4, intermediate_size=37 ) model = BertModel(config) with tempfile.TemporaryDirectory() as tmp_dir: model.save_pretrained(os.path.join(tmp_dir, "test-model"), push_to_hub=True, use_auth_token=self._token) new_model = BertModel.from_pretrained(f"{USER}/test-model") for p1, p2 in zip(model.parameters(), new_model.parameters()): self.assertTrue(torch.equal(p1, p2)) def test_push_to_hub_in_organization(self): config = BertConfig( vocab_size=99, hidden_size=32, num_hidden_layers=5, num_attention_heads=4, intermediate_size=37 ) model = BertModel(config) with tempfile.TemporaryDirectory() as tmp_dir: model.save_pretrained( os.path.join(tmp_dir, "test-model-org"), push_to_hub=True, use_auth_token=self._token, organization="valid_org", ) new_model = BertModel.from_pretrained("valid_org/test-model-org") for p1, p2 in zip(model.parameters(), new_model.parameters()): self.assertTrue(torch.equal(p1, p2))