# coding=utf-8 # Copyright 2019 HuggingFace Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy import inspect import os.path import random import tempfile import unittest from typing import List, Tuple from transformers import is_torch_available from transformers.file_utils import WEIGHTS_NAME from transformers.testing_utils import require_torch, require_torch_multigpu, slow, torch_device if is_torch_available(): import numpy as np import torch from transformers import ( BERT_PRETRAINED_MODEL_ARCHIVE_LIST, MODEL_FOR_CAUSAL_LM_MAPPING, MODEL_FOR_MASKED_LM_MAPPING, MODEL_FOR_MULTIPLE_CHOICE_MAPPING, MODEL_FOR_QUESTION_ANSWERING_MAPPING, MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING, MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING, MODEL_FOR_TOKEN_CLASSIFICATION_MAPPING, AdaptiveEmbedding, BertConfig, BertModel, PretrainedConfig, PreTrainedModel, ) def _config_zero_init(config): configs_no_init = copy.deepcopy(config) for key in configs_no_init.__dict__.keys(): if "_range" in key or "_std" in key or "initializer_factor" in key: setattr(configs_no_init, key, 1e-10) return configs_no_init @require_torch class ModelTesterMixin: model_tester = None all_model_classes = () all_generative_model_classes = () test_torchscript = True test_pruning = True test_resize_embeddings = True test_head_masking = True test_missing_keys = True is_encoder_decoder = False def _prepare_for_class(self, inputs_dict, model_class, return_labels=False): inputs_dict = copy.deepcopy(inputs_dict) if model_class in MODEL_FOR_MULTIPLE_CHOICE_MAPPING.values(): inputs_dict = { k: v.unsqueeze(1).expand(-1, self.model_tester.num_choices, -1).contiguous() if isinstance(v, torch.Tensor) and v.ndim > 1 else v for k, v in inputs_dict.items() } if return_labels: if model_class in MODEL_FOR_MULTIPLE_CHOICE_MAPPING.values(): inputs_dict["labels"] = torch.ones(self.model_tester.batch_size, dtype=torch.long, device=torch_device) elif model_class in MODEL_FOR_QUESTION_ANSWERING_MAPPING.values(): inputs_dict["start_positions"] = torch.zeros( self.model_tester.batch_size, dtype=torch.long, device=torch_device ) inputs_dict["end_positions"] = torch.zeros( self.model_tester.batch_size, dtype=torch.long, device=torch_device ) elif model_class in MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING.values(): inputs_dict["labels"] = torch.zeros( self.model_tester.batch_size, dtype=torch.long, device=torch_device ) elif model_class in [ *MODEL_FOR_TOKEN_CLASSIFICATION_MAPPING.values(), *MODEL_FOR_CAUSAL_LM_MAPPING.values(), *MODEL_FOR_MASKED_LM_MAPPING.values(), *MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING.values(), ]: inputs_dict["labels"] = torch.zeros( (self.model_tester.batch_size, self.model_tester.seq_length), dtype=torch.long, device=torch_device ) return inputs_dict def test_save_load(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) out_2 = outputs[0].cpu().numpy() out_2[np.isnan(out_2)] = 0 with tempfile.TemporaryDirectory() as tmpdirname: model.save_pretrained(tmpdirname) model = model_class.from_pretrained(tmpdirname) model.to(torch_device) with torch.no_grad(): after_outputs = model(**self._prepare_for_class(inputs_dict, model_class)) # Make sure we don't have nans out_1 = after_outputs[0].cpu().numpy() out_1[np.isnan(out_1)] = 0 max_diff = np.amax(np.abs(out_1 - out_2)) self.assertLessEqual(max_diff, 1e-5) def test_save_load_keys_to_never_save(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) keys_to_never_save = getattr(model, "keys_to_never_save", None) if keys_to_never_save is None: continue # check the keys are in the original state_dict for k in keys_to_never_save: self.assertIn(k, model.state_dict()) # check that certain keys didn't get saved with the model with tempfile.TemporaryDirectory() as tmpdirname: model.save_pretrained(tmpdirname) output_model_file = os.path.join(tmpdirname, WEIGHTS_NAME) state_dict_saved = torch.load(output_model_file) for k in keys_to_never_save: self.assertNotIn(k, state_dict_saved) def test_initialization(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() configs_no_init = _config_zero_init(config) for model_class in self.all_model_classes: model = model_class(config=configs_no_init) for name, param in model.named_parameters(): if param.requires_grad: self.assertIn( ((param.data.mean() * 1e9).round() / 1e9).item(), [0.0, 1.0], msg="Parameter {} of model {} seems not properly initialized".format(name, model_class), ) def test_determinism(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) model.to(torch_device) model.eval() with torch.no_grad(): first = model(**self._prepare_for_class(inputs_dict, model_class))[0] second = model(**self._prepare_for_class(inputs_dict, model_class))[0] out_1 = first.cpu().numpy() out_2 = second.cpu().numpy() out_1 = out_1[~np.isnan(out_1)] out_2 = out_2[~np.isnan(out_2)] max_diff = np.amax(np.abs(out_1 - out_2)) self.assertLessEqual(max_diff, 1e-5) def test_forward_signature(self): config, _ = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) signature = inspect.signature(model.forward) # signature.parameters is an OrderedDict => so arg_names order is deterministic arg_names = [*signature.parameters.keys()] if model.config.is_encoder_decoder: expected_arg_names = [ "input_ids", "attention_mask", "decoder_input_ids", "decoder_attention_mask", "encoder_outputs", ] self.assertListEqual(arg_names[:5], expected_arg_names) else: expected_arg_names = ["input_ids"] self.assertListEqual(arg_names[:1], expected_arg_names) def test_attention_outputs(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() config.return_dict = True seq_len = getattr(self.model_tester, "seq_length", None) decoder_seq_length = getattr(self.model_tester, "decoder_seq_length", seq_len) encoder_seq_length = getattr(self.model_tester, "encoder_seq_length", seq_len) decoder_key_length = getattr(self.model_tester, "decoder_key_length", decoder_seq_length) encoder_key_length = getattr(self.model_tester, "key_length", encoder_seq_length) chunk_length = getattr(self.model_tester, "chunk_length", None) if chunk_length is not None and hasattr(self.model_tester, "num_hashes"): encoder_seq_length = encoder_seq_length * self.model_tester.num_hashes for model_class in self.all_model_classes: inputs_dict["output_attentions"] = True inputs_dict["output_hidden_states"] = False config.return_dict = True model = model_class(config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs.encoder_attentions if config.is_encoder_decoder else outputs.attentions self.assertEqual(len(attentions), self.model_tester.num_hidden_layers) # check that output_attentions also work using config del inputs_dict["output_attentions"] config.output_attentions = True model = model_class(config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs.encoder_attentions if config.is_encoder_decoder else outputs.attentions self.assertEqual(len(attentions), self.model_tester.num_hidden_layers) if chunk_length is not None: self.assertListEqual( list(attentions[0].shape[-4:]), [self.model_tester.num_attention_heads, encoder_seq_length, chunk_length, encoder_key_length], ) else: self.assertListEqual( list(attentions[0].shape[-3:]), [self.model_tester.num_attention_heads, encoder_seq_length, encoder_key_length], ) out_len = len(outputs) if self.is_encoder_decoder: correct_outlen = 5 # loss is at first position if "labels" in inputs_dict: correct_outlen += 1 # loss is added to beginning # Question Answering model returns start_logits and end_logits if model_class in MODEL_FOR_QUESTION_ANSWERING_MAPPING.values(): correct_outlen += 1 # start_logits and end_logits instead of only 1 output self.assertEqual(out_len, correct_outlen) # decoder attentions decoder_attentions = outputs.decoder_attentions self.assertIsInstance(decoder_attentions, (list, tuple)) self.assertEqual(len(decoder_attentions), self.model_tester.num_hidden_layers) self.assertListEqual( list(decoder_attentions[0].shape[-3:]), [self.model_tester.num_attention_heads, decoder_seq_length, decoder_key_length], ) # cross attentions cross_attentions = outputs.cross_attentions self.assertIsInstance(cross_attentions, (list, tuple)) self.assertEqual(len(cross_attentions), self.model_tester.num_hidden_layers) self.assertListEqual( list(cross_attentions[0].shape[-3:]), [ self.model_tester.num_attention_heads, decoder_seq_length, encoder_key_length, ], ) # Check attention is always last and order is fine inputs_dict["output_attentions"] = True inputs_dict["output_hidden_states"] = True model = model_class(config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) if hasattr(self.model_tester, "num_hidden_states_types"): added_hidden_states = self.model_tester.num_hidden_states_types elif self.is_encoder_decoder: added_hidden_states = 2 else: added_hidden_states = 1 self.assertEqual(out_len + added_hidden_states, len(outputs)) self_attentions = outputs.encoder_attentions if config.is_encoder_decoder else outputs.attentions self.assertEqual(len(self_attentions), self.model_tester.num_hidden_layers) if chunk_length is not None: self.assertListEqual( list(self_attentions[0].shape[-4:]), [self.model_tester.num_attention_heads, encoder_seq_length, chunk_length, encoder_key_length], ) else: self.assertListEqual( list(self_attentions[0].shape[-3:]), [self.model_tester.num_attention_heads, encoder_seq_length, encoder_key_length], ) def test_torchscript(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() self._create_and_check_torchscript(config, inputs_dict) def test_torchscript_output_attentions(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() config.output_attentions = True self._create_and_check_torchscript(config, inputs_dict) def test_torchscript_output_hidden_state(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() config.output_hidden_states = True self._create_and_check_torchscript(config, inputs_dict) def _create_and_check_torchscript(self, config, inputs_dict): if not self.test_torchscript: return configs_no_init = _config_zero_init(config) # To be sure we have no Nan configs_no_init.torchscript = True for model_class in self.all_model_classes: model = model_class(config=configs_no_init) model.to(torch_device) model.eval() inputs = self._prepare_for_class(inputs_dict, model_class) try: if model.config.is_encoder_decoder: model.config.use_cache = False # TODO: this should be deleted after bug #7474 is solved input_ids = inputs["input_ids"] attention_mask = inputs["attention_mask"] decoder_input_ids = inputs["decoder_input_ids"] decoder_attention_mask = inputs["decoder_attention_mask"] traced_model = torch.jit.trace( model, (input_ids, attention_mask, decoder_input_ids, decoder_attention_mask) ) else: input_ids = inputs["input_ids"] traced_model = torch.jit.trace(model, input_ids) except RuntimeError: self.fail("Couldn't trace module.") with tempfile.TemporaryDirectory() as tmp_dir_name: pt_file_name = os.path.join(tmp_dir_name, "traced_model.pt") try: torch.jit.save(traced_model, pt_file_name) except Exception: self.fail("Couldn't save module.") try: loaded_model = torch.jit.load(pt_file_name) except Exception: self.fail("Couldn't load module.") model.to(torch_device) model.eval() loaded_model.to(torch_device) loaded_model.eval() model_state_dict = model.state_dict() loaded_model_state_dict = loaded_model.state_dict() self.assertEqual(set(model_state_dict.keys()), set(loaded_model_state_dict.keys())) models_equal = True for layer_name, p1 in model_state_dict.items(): p2 = loaded_model_state_dict[layer_name] if p1.data.ne(p2.data).sum() > 0: models_equal = False self.assertTrue(models_equal) def test_headmasking(self): if not self.test_head_masking: return global_rng.seed(42) config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() global_rng.seed() inputs_dict["output_attentions"] = True config.output_hidden_states = True configs_no_init = _config_zero_init(config) # To be sure we have no Nan for model_class in self.all_model_classes: model = model_class(config=configs_no_init) model.to(torch_device) model.eval() # Prepare head_mask # Set require_grad after having prepared the tensor to avoid error (leaf variable has been moved into the graph interior) head_mask = torch.ones( self.model_tester.num_hidden_layers, self.model_tester.num_attention_heads, device=torch_device, ) head_mask[0, 0] = 0 head_mask[-1, :-1] = 0 head_mask.requires_grad_(requires_grad=True) inputs = self._prepare_for_class(inputs_dict, model_class).copy() inputs["head_mask"] = head_mask outputs = model(**inputs) # Test that we can get a gradient back for importance score computation output = sum(t.sum() for t in outputs[0]) output = output.sum() output.backward() multihead_outputs = head_mask.grad attentions = outputs[-1] # Remove Nan for t in attentions: self.assertLess( torch.sum(torch.isnan(t)), t.numel() / 4 ) # Check we don't have more than 25% nans (arbitrary) attentions = [ t.masked_fill(torch.isnan(t), 0.0) for t in attentions ] # remove them (the test is less complete) self.assertIsNotNone(multihead_outputs) self.assertEqual(len(multihead_outputs), self.model_tester.num_hidden_layers) self.assertAlmostEqual(attentions[0][..., 0, :, :].flatten().sum().item(), 0.0) self.assertNotEqual(attentions[0][..., -1, :, :].flatten().sum().item(), 0.0) self.assertNotEqual(attentions[1][..., 0, :, :].flatten().sum().item(), 0.0) self.assertAlmostEqual(attentions[-1][..., -2, :, :].flatten().sum().item(), 0.0) self.assertNotEqual(attentions[-1][..., -1, :, :].flatten().sum().item(), 0.0) def test_head_pruning(self): if not self.test_pruning: return for model_class in self.all_model_classes: ( config, inputs_dict, ) = self.model_tester.prepare_config_and_inputs_for_common() if "head_mask" in inputs_dict: del inputs_dict["head_mask"] inputs_dict["output_attentions"] = True config.output_hidden_states = False model = model_class(config=config) model.to(torch_device) model.eval() heads_to_prune = { 0: list(range(1, self.model_tester.num_attention_heads)), -1: [0], } model.prune_heads(heads_to_prune) with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs[-1] self.assertEqual(attentions[0].shape[-3], 1) self.assertEqual(attentions[1].shape[-3], self.model_tester.num_attention_heads) self.assertEqual(attentions[-1].shape[-3], self.model_tester.num_attention_heads - 1) def test_head_pruning_save_load_from_pretrained(self): if not self.test_pruning: return for model_class in self.all_model_classes: ( config, inputs_dict, ) = self.model_tester.prepare_config_and_inputs_for_common() if "head_mask" in inputs_dict: del inputs_dict["head_mask"] inputs_dict["output_attentions"] = True config.output_hidden_states = False model = model_class(config=config) model.to(torch_device) model.eval() heads_to_prune = { 0: list(range(1, self.model_tester.num_attention_heads)), -1: [0], } model.prune_heads(heads_to_prune) with tempfile.TemporaryDirectory() as temp_dir_name: model.save_pretrained(temp_dir_name) model = model_class.from_pretrained(temp_dir_name) model.to(torch_device) with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs[-1] self.assertEqual(attentions[0].shape[-3], 1) self.assertEqual(attentions[1].shape[-3], self.model_tester.num_attention_heads) self.assertEqual(attentions[-1].shape[-3], self.model_tester.num_attention_heads - 1) def test_head_pruning_save_load_from_config_init(self): if not self.test_pruning: return for model_class in self.all_model_classes: ( config, inputs_dict, ) = self.model_tester.prepare_config_and_inputs_for_common() if "head_mask" in inputs_dict: del inputs_dict["head_mask"] inputs_dict["output_attentions"] = True config.output_hidden_states = False heads_to_prune = { 0: list(range(1, self.model_tester.num_attention_heads)), -1: [0], } config.pruned_heads = heads_to_prune model = model_class(config=config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs[-1] self.assertEqual(attentions[0].shape[-3], 1) self.assertEqual(attentions[1].shape[-3], self.model_tester.num_attention_heads) self.assertEqual(attentions[-1].shape[-3], self.model_tester.num_attention_heads - 1) def test_head_pruning_integration(self): if not self.test_pruning: return for model_class in self.all_model_classes: ( config, inputs_dict, ) = self.model_tester.prepare_config_and_inputs_for_common() if "head_mask" in inputs_dict: del inputs_dict["head_mask"] inputs_dict["output_attentions"] = True config.output_hidden_states = False heads_to_prune = {0: [0], 1: [1, 2]} config.pruned_heads = heads_to_prune model = model_class(config=config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs[-1] self.assertEqual(attentions[0].shape[-3], self.model_tester.num_attention_heads - 1) self.assertEqual(attentions[1].shape[-3], self.model_tester.num_attention_heads - 2) self.assertEqual(attentions[2].shape[-3], self.model_tester.num_attention_heads) self.assertEqual(attentions[3].shape[-3], self.model_tester.num_attention_heads) with tempfile.TemporaryDirectory() as temp_dir_name: model.save_pretrained(temp_dir_name) model = model_class.from_pretrained(temp_dir_name) model.to(torch_device) with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs[-1] self.assertEqual(attentions[0].shape[-3], self.model_tester.num_attention_heads - 1) self.assertEqual(attentions[1].shape[-3], self.model_tester.num_attention_heads - 2) self.assertEqual(attentions[2].shape[-3], self.model_tester.num_attention_heads) self.assertEqual(attentions[3].shape[-3], self.model_tester.num_attention_heads) heads_to_prune = {0: [0], 2: [1, 2]} model.prune_heads(heads_to_prune) with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class)) attentions = outputs[-1] self.assertEqual(attentions[0].shape[-3], self.model_tester.num_attention_heads - 1) self.assertEqual(attentions[1].shape[-3], self.model_tester.num_attention_heads - 2) self.assertEqual(attentions[2].shape[-3], self.model_tester.num_attention_heads - 2) self.assertEqual(attentions[3].shape[-3], self.model_tester.num_attention_heads) self.assertDictEqual(model.config.pruned_heads, {0: [0], 1: [1, 2], 2: [1, 2]}) def test_hidden_states_output(self): def check_hidden_states_output(inputs_dict, config, model_class): model = model_class(config) model.to(torch_device) model.eval() with torch.no_grad(): outputs = model(**self._prepare_for_class(inputs_dict, model_class), return_dict=True) hidden_states = outputs["hidden_states"] if "hidden_states" in outputs else outputs[-1] expected_num_layers = getattr( self.model_tester, "expected_num_hidden_layers", self.model_tester.num_hidden_layers + 1 ) self.assertEqual(len(hidden_states), expected_num_layers) if hasattr(self.model_tester, "encoder_seq_length"): seq_length = self.model_tester.encoder_seq_length if hasattr(self.model_tester, "chunk_length") and self.model_tester.chunk_length > 1: seq_length = seq_length * self.model_tester.chunk_length else: seq_length = self.model_tester.seq_length self.assertListEqual( list(hidden_states[0].shape[-2:]), [seq_length, self.model_tester.hidden_size], ) config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: inputs_dict["output_hidden_states"] = True check_hidden_states_output(inputs_dict, config, model_class) # check that output_hidden_states also work using config del inputs_dict["output_hidden_states"] config.output_hidden_states = True check_hidden_states_output(inputs_dict, config, model_class) def test_feed_forward_chunking(self): ( original_config, inputs_dict, ) = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: torch.manual_seed(0) config = copy.deepcopy(original_config) model = model_class(config) model.to(torch_device) model.eval() hidden_states_no_chunk = model(**self._prepare_for_class(inputs_dict, model_class))[0] torch.manual_seed(0) config.chunk_size_feed_forward = 1 model = model_class(config) model.to(torch_device) model.eval() hidden_states_with_chunk = model(**self._prepare_for_class(inputs_dict, model_class))[0] self.assertTrue(torch.allclose(hidden_states_no_chunk, hidden_states_with_chunk, atol=1e-3)) def test_resize_tokens_embeddings(self): ( original_config, inputs_dict, ) = self.model_tester.prepare_config_and_inputs_for_common() if not self.test_resize_embeddings: return for model_class in self.all_model_classes: config = copy.deepcopy(original_config) model = model_class(config) model.to(torch_device) if self.model_tester.is_training is False: model.eval() model_vocab_size = config.vocab_size # Retrieve the embeddings and clone theme model_embed = model.resize_token_embeddings(model_vocab_size) cloned_embeddings = model_embed.weight.clone() # Check that resizing the token embeddings with a larger vocab size increases the model's vocab size model_embed = model.resize_token_embeddings(model_vocab_size + 10) self.assertEqual(model.config.vocab_size, model_vocab_size + 10) # Check that it actually resizes the embeddings matrix self.assertEqual(model_embed.weight.shape[0], cloned_embeddings.shape[0] + 10) # Check that the model can still do a forward pass successfully (every parameter should be resized) model(**self._prepare_for_class(inputs_dict, model_class)) # Check that resizing the token embeddings with a smaller vocab size decreases the model's vocab size model_embed = model.resize_token_embeddings(model_vocab_size - 15) self.assertEqual(model.config.vocab_size, model_vocab_size - 15) # Check that it actually resizes the embeddings matrix self.assertEqual(model_embed.weight.shape[0], cloned_embeddings.shape[0] - 15) # Check that the model can still do a forward pass successfully (every parameter should be resized) # Input ids should be clamped to the maximum size of the vocabulary inputs_dict["input_ids"].clamp_(max=model_vocab_size - 15 - 1) model(**self._prepare_for_class(inputs_dict, model_class)) # Check that adding and removing tokens has not modified the first part of the embedding matrix. models_equal = True for p1, p2 in zip(cloned_embeddings, model_embed.weight): if p1.data.ne(p2.data).sum() > 0: models_equal = False self.assertTrue(models_equal) def test_model_common_attributes(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) self.assertIsInstance(model.get_input_embeddings(), (torch.nn.Embedding, AdaptiveEmbedding)) model.set_input_embeddings(torch.nn.Embedding(10, 10)) x = model.get_output_embeddings() self.assertTrue(x is None or isinstance(x, torch.nn.Linear)) def test_correct_missing_keys(self): if not self.test_missing_keys: return config, _ = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) base_model_prefix = model.base_model_prefix if hasattr(model, base_model_prefix): with tempfile.TemporaryDirectory() as temp_dir_name: model.base_model.save_pretrained(temp_dir_name) model, loading_info = model_class.from_pretrained(temp_dir_name, output_loading_info=True) with self.subTest(msg="Missing keys for {}".format(model.__class__.__name__)): self.assertGreater(len(loading_info["missing_keys"]), 0) def test_tie_model_weights(self): if not self.test_torchscript: return config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() def check_same_values(layer_1, layer_2): equal = True for p1, p2 in zip(layer_1.weight, layer_2.weight): if p1.data.ne(p2.data).sum() > 0: equal = False return equal for model_class in self.all_model_classes: config.torchscript = True model_not_tied = model_class(config) if model_not_tied.get_output_embeddings() is None: continue config_tied = copy.deepcopy(config) config_tied.torchscript = False model_tied = model_class(config_tied) params_tied = list(model_tied.parameters()) # Check that the embedding layer and decoding layer are the same in size and in value # self.assertTrue(check_same_values(embeddings, decoding)) # # Check that after modification, they remain the same. # embeddings.weight.data.div_(2) # # Check that the embedding layer and decoding layer are the same in size and in value # self.assertTrue(embeddings.weight.shape, decoding.weight.shape) # self.assertTrue(check_same_values(embeddings, decoding)) # # Check that after modification, they remain the same. # decoding.weight.data.div_(4) # # Check that the embedding layer and decoding layer are the same in size and in value # self.assertTrue(embeddings.weight.shape, decoding.weight.shape) # self.assertTrue(check_same_values(embeddings, decoding)) # Check that after resize they remain tied. model_tied.resize_token_embeddings(config.vocab_size + 10) params_tied_2 = list(model_tied.parameters()) self.assertEqual(len(params_tied_2), len(params_tied)) # decoding.weight.data.mul_(20) # # Check that the embedding layer and decoding layer are the same in size and in value # self.assertTrue(model.transformer.wte.weight.shape, model.lm_head.weight.shape) # self.assertTrue(check_same_values(model.transformer.wte, model.lm_head)) def test_model_outputs_equivalence(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() def set_nan_tensor_to_zero(t): t[t != t] = 0 return t def check_equivalence(model, tuple_inputs, dict_inputs, additional_kwargs={}): with torch.no_grad(): tuple_output = model(**tuple_inputs, return_dict=False, **additional_kwargs) dict_output = model(**dict_inputs, return_dict=True, **additional_kwargs).to_tuple() def recursive_check(tuple_object, dict_object): if isinstance(tuple_object, (List, Tuple)): for tuple_iterable_value, dict_iterable_value in zip(tuple_object, dict_object): recursive_check(tuple_iterable_value, dict_iterable_value) elif tuple_object is None: return else: self.assertTrue( torch.allclose( set_nan_tensor_to_zero(tuple_object), set_nan_tensor_to_zero(dict_object), atol=1e-5 ), msg=f"Tuple and dict output are not equal. Difference: {torch.max(torch.abs(tuple_object - dict_object))}. Tuple has `nan`: {torch.isnan(tuple_object).any()} and `inf`: {torch.isinf(tuple_object)}. Dict has `nan`: {torch.isnan(dict_object).any()} and `inf`: {torch.isinf(dict_object)}.", ) recursive_check(tuple_output, dict_output) for model_class in self.all_model_classes: model = model_class(config) model.to(torch_device) model.eval() tuple_inputs = self._prepare_for_class(inputs_dict, model_class) dict_inputs = self._prepare_for_class(inputs_dict, model_class) check_equivalence(model, tuple_inputs, dict_inputs) tuple_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) dict_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) check_equivalence(model, tuple_inputs, dict_inputs) tuple_inputs = self._prepare_for_class(inputs_dict, model_class) dict_inputs = self._prepare_for_class(inputs_dict, model_class) check_equivalence(model, tuple_inputs, dict_inputs, {"output_hidden_states": True}) tuple_inputs = self._prepare_for_class(inputs_dict, model_class) dict_inputs = self._prepare_for_class(inputs_dict, model_class) check_equivalence(model, tuple_inputs, dict_inputs, {"output_attentions": True}) tuple_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) dict_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) check_equivalence(model, tuple_inputs, dict_inputs, {"output_hidden_states": True}) tuple_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) dict_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) check_equivalence(model, tuple_inputs, dict_inputs, {"output_attentions": True}) tuple_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) dict_inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True) check_equivalence( model, tuple_inputs, dict_inputs, {"output_hidden_states": True, "output_attentions": True} ) def test_inputs_embeds(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) model.to(torch_device) model.eval() inputs = copy.deepcopy(self._prepare_for_class(inputs_dict, model_class)) if not self.is_encoder_decoder: input_ids = inputs["input_ids"] del inputs["input_ids"] else: encoder_input_ids = inputs["input_ids"] decoder_input_ids = inputs.get("decoder_input_ids", encoder_input_ids) del inputs["input_ids"] inputs.pop("decoder_input_ids", None) wte = model.get_input_embeddings() if not self.is_encoder_decoder: inputs["inputs_embeds"] = wte(input_ids) else: inputs["inputs_embeds"] = wte(encoder_input_ids) inputs["decoder_inputs_embeds"] = wte(decoder_input_ids) with torch.no_grad(): model(**inputs)[0] @require_torch_multigpu def test_multigpu_data_parallel_forward(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() # some params shouldn't be scattered by nn.DataParallel # so just remove them if they are present. blacklist_non_batched_params = ["head_mask"] for k in blacklist_non_batched_params: inputs_dict.pop(k, None) # move input tensors to cuda:O for k, v in inputs_dict.items(): if torch.is_tensor(v): inputs_dict[k] = v.to(0) for model_class in self.all_model_classes: model = model_class(config=config) model.to(0) model.eval() # Wrap model in nn.DataParallel model = torch.nn.DataParallel(model) with torch.no_grad(): _ = model(**self._prepare_for_class(inputs_dict, model_class)) global_rng = random.Random() def ids_tensor(shape, vocab_size, rng=None, name=None): # Creates a random int32 tensor of the shape within the vocab size if rng is None: rng = global_rng total_dims = 1 for dim in shape: total_dims *= dim values = [] for _ in range(total_dims): values.append(rng.randint(0, vocab_size - 1)) return torch.tensor(data=values, dtype=torch.long, device=torch_device).view(shape).contiguous() def random_attention_mask(shape, rng=None, name=None): attn_mask = ids_tensor(shape, vocab_size=2, rng=None, name=None) # make sure that at least one token is attended to for each batch attn_mask[:, -1] = 1 return attn_mask def floats_tensor(shape, scale=1.0, rng=None, name=None): """Creates a random float32 tensor""" if rng is None: rng = global_rng total_dims = 1 for dim in shape: total_dims *= dim values = [] for _ in range(total_dims): values.append(rng.random() * scale) return torch.tensor(data=values, dtype=torch.float, device=torch_device).view(shape).contiguous() @require_torch class ModelUtilsTest(unittest.TestCase): @slow def test_model_from_pretrained(self): for model_name in BERT_PRETRAINED_MODEL_ARCHIVE_LIST[:1]: config = BertConfig.from_pretrained(model_name) self.assertIsNotNone(config) self.assertIsInstance(config, PretrainedConfig) model = BertModel.from_pretrained(model_name) model, loading_info = BertModel.from_pretrained(model_name, output_loading_info=True) self.assertIsNotNone(model) self.assertIsInstance(model, PreTrainedModel) for value in loading_info.values(): self.assertEqual(len(value), 0) config = BertConfig.from_pretrained(model_name, output_attentions=True, output_hidden_states=True) # Not sure this is the intended behavior. TODO fix Lysandre & Thom config.name_or_path = model_name model = BertModel.from_pretrained(model_name, output_attentions=True, output_hidden_states=True) self.assertEqual(model.config.output_hidden_states, True) self.assertEqual(model.config, config)