# coding=utf-8 # Copyright 2019 HuggingFace Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy import inspect import os import random import tempfile import unittest from importlib import import_module from transformers import is_tf_available, is_torch_available from transformers.testing_utils import _tf_gpu_memory_limit, require_tf if is_tf_available(): import tensorflow as tf import numpy as np from transformers import ( tf_top_k_top_p_filtering, TFAdaptiveEmbedding, TFSharedEmbeddings, TF_MODEL_FOR_MULTIPLE_CHOICE_MAPPING, TF_MODEL_FOR_QUESTION_ANSWERING_MAPPING, TF_MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING, TF_MODEL_FOR_TOKEN_CLASSIFICATION_MAPPING, TF_MODEL_FOR_CAUSAL_LM_MAPPING, TF_MODEL_FOR_MASKED_LM_MAPPING, TF_MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING, ) if _tf_gpu_memory_limit is not None: gpus = tf.config.list_physical_devices("GPU") for gpu in gpus: # Restrict TensorFlow to only allocate x GB of memory on the GPUs try: tf.config.experimental.set_virtual_device_configuration( gpu, [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=_tf_gpu_memory_limit)] ) logical_gpus = tf.config.experimental.list_logical_devices("GPU") print("Logical GPUs", logical_gpus) except RuntimeError as e: # Virtual devices must be set before GPUs have been initialized print(e) def _config_zero_init(config): configs_no_init = copy.deepcopy(config) for key in configs_no_init.__dict__.keys(): if "_range" in key or "_std" in key: setattr(configs_no_init, key, 0.0) return configs_no_init @require_tf class TFModelTesterMixin: model_tester = None all_model_classes = () all_generative_model_classes = () test_torchscript = True test_pruning = True test_resize_embeddings = True is_encoder_decoder = False def _prepare_for_class(self, inputs_dict, model_class, return_labels=False): if model_class in TF_MODEL_FOR_MULTIPLE_CHOICE_MAPPING.values(): inputs_dict = { k: tf.tile(tf.expand_dims(v, 1), (1, self.model_tester.num_choices, 1)) if isinstance(v, tf.Tensor) and v.ndim != 0 else v for k, v in inputs_dict.items() } if return_labels: if model_class in TF_MODEL_FOR_MULTIPLE_CHOICE_MAPPING.values(): inputs_dict["labels"] = tf.ones(self.model_tester.batch_size) elif model_class in TF_MODEL_FOR_QUESTION_ANSWERING_MAPPING.values(): inputs_dict["start_positions"] = tf.zeros(self.model_tester.batch_size) inputs_dict["end_positions"] = tf.zeros(self.model_tester.batch_size) elif model_class in TF_MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING.values(): inputs_dict["labels"] = tf.zeros(self.model_tester.batch_size) elif model_class in TF_MODEL_FOR_TOKEN_CLASSIFICATION_MAPPING.values(): inputs_dict["labels"] = tf.zeros((self.model_tester.batch_size, self.model_tester.seq_length)) elif model_class in TF_MODEL_FOR_CAUSAL_LM_MAPPING.values(): inputs_dict["labels"] = tf.zeros((self.model_tester.batch_size, self.model_tester.seq_length)) elif model_class in TF_MODEL_FOR_MASKED_LM_MAPPING.values(): inputs_dict["labels"] = tf.zeros((self.model_tester.batch_size, self.model_tester.seq_length)) elif model_class in TF_MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING.values(): inputs_dict["labels"] = tf.zeros((self.model_tester.batch_size, self.model_tester.seq_length)) return inputs_dict def test_initialization(self): pass # config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() # configs_no_init = _config_zero_init(config) # for model_class in self.all_model_classes: # model = model_class(config=configs_no_init) # for name, param in model.named_parameters(): # if param.requires_grad: # self.assertIn(param.data.mean().item(), [0.0, 1.0], # msg="Parameter {} of model {} seems not properly initialized".format(name, model_class)) def test_save_load(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) outputs = model(self._prepare_for_class(inputs_dict, model_class)) with tempfile.TemporaryDirectory() as tmpdirname: model.save_pretrained(tmpdirname) model = model_class.from_pretrained(tmpdirname) after_outputs = model(self._prepare_for_class(inputs_dict, model_class)) self.assert_outputs_same(after_outputs, outputs) def test_keras_save_load(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() tf_main_layer_classes = set( module_member for model_class in self.all_model_classes for module in (import_module(model_class.__module__),) for module_member_name in dir(module) if module_member_name.endswith("MainLayer") for module_member in (getattr(module, module_member_name),) if isinstance(module_member, type) and tf.keras.layers.Layer in module_member.__bases__ and getattr(module_member, "_keras_serializable", False) ) for main_layer_class in tf_main_layer_classes: # T5MainLayer needs an embed_tokens parameter when called without the inputs_embeds parameter if "T5" in main_layer_class.__name__: # Take the same values than in TFT5ModelTester for this shared layer shared = TFSharedEmbeddings(99, 32, name="shared") config.use_cache = False main_layer = main_layer_class(config, embed_tokens=shared) else: main_layer = main_layer_class(config) symbolic_inputs = { name: tf.keras.Input(tensor.shape[1:], dtype=tensor.dtype) for name, tensor in inputs_dict.items() } model = tf.keras.Model(symbolic_inputs, outputs=main_layer(symbolic_inputs)) outputs = model(inputs_dict) with tempfile.TemporaryDirectory() as tmpdirname: filepath = os.path.join(tmpdirname, "keras_model.h5") model.save(filepath) if "T5" in main_layer_class.__name__: model = tf.keras.models.load_model( filepath, custom_objects={ main_layer_class.__name__: main_layer_class, "TFSharedEmbeddings": TFSharedEmbeddings, }, ) else: model = tf.keras.models.load_model( filepath, custom_objects={main_layer_class.__name__: main_layer_class} ) assert isinstance(model, tf.keras.Model) after_outputs = model(inputs_dict) self.assert_outputs_same(after_outputs, outputs) def assert_outputs_same(self, after_outputs, outputs): # Make sure we don't have nans if isinstance(after_outputs, tf.Tensor): out_1 = after_outputs.numpy() else: out_1 = after_outputs[0].numpy() out_2 = outputs[0].numpy() self.assertEqual(out_1.shape, out_2.shape) out_1 = out_1[~np.isnan(out_1)] out_2 = out_2[~np.isnan(out_2)] max_diff = np.amax(np.abs(out_1 - out_2)) self.assertLessEqual(max_diff, 1e-5) def test_pt_tf_model_equivalence(self): if not is_torch_available(): return import torch import transformers config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: pt_model_class_name = model_class.__name__[2:] # Skip the "TF" at the beggining pt_model_class = getattr(transformers, pt_model_class_name) config.output_hidden_states = True tf_model = model_class(config) pt_model = pt_model_class(config) # Check we can load pt model in tf and vice-versa with model => model functions tf_model = transformers.load_pytorch_model_in_tf2_model( tf_model, pt_model, tf_inputs=self._prepare_for_class(inputs_dict, model_class) ) pt_model = transformers.load_tf2_model_in_pytorch_model(pt_model, tf_model) # Check predictions on first output (logits/hidden-states) are close enought given low-level computational differences pt_model.eval() pt_inputs_dict = dict( (name, torch.from_numpy(key.numpy()).to(torch.long)) for name, key in self._prepare_for_class(inputs_dict, model_class).items() ) # need to rename encoder-decoder "inputs" for PyTorch if "inputs" in pt_inputs_dict and self.is_encoder_decoder: pt_inputs_dict["input_ids"] = pt_inputs_dict.pop("inputs") with torch.no_grad(): pto = pt_model(**pt_inputs_dict) tfo = tf_model(self._prepare_for_class(inputs_dict, model_class), training=False) tf_hidden_states = tfo[0].numpy() pt_hidden_states = pto[0].numpy() tf_nans = np.copy(np.isnan(tf_hidden_states)) pt_nans = np.copy(np.isnan(pt_hidden_states)) pt_hidden_states[tf_nans] = 0 tf_hidden_states[tf_nans] = 0 pt_hidden_states[pt_nans] = 0 tf_hidden_states[pt_nans] = 0 max_diff = np.amax(np.abs(tf_hidden_states - pt_hidden_states)) # Debug info (remove when fixed) if max_diff >= 2e-2: print("===") print(model_class) print(config) print(inputs_dict) print(pt_inputs_dict) self.assertLessEqual(max_diff, 2e-2) # Check we can load pt model in tf and vice-versa with checkpoint => model functions with tempfile.TemporaryDirectory() as tmpdirname: pt_checkpoint_path = os.path.join(tmpdirname, "pt_model.bin") torch.save(pt_model.state_dict(), pt_checkpoint_path) tf_model = transformers.load_pytorch_checkpoint_in_tf2_model(tf_model, pt_checkpoint_path) tf_checkpoint_path = os.path.join(tmpdirname, "tf_model.h5") tf_model.save_weights(tf_checkpoint_path) pt_model = transformers.load_tf2_checkpoint_in_pytorch_model(pt_model, tf_checkpoint_path) # Check predictions on first output (logits/hidden-states) are close enought given low-level computational differences pt_model.eval() pt_inputs_dict = dict( (name, torch.from_numpy(key.numpy()).to(torch.long)) for name, key in self._prepare_for_class(inputs_dict, model_class).items() ) # need to rename encoder-decoder "inputs" for PyTorch if "inputs" in pt_inputs_dict and self.is_encoder_decoder: pt_inputs_dict["input_ids"] = pt_inputs_dict.pop("inputs") with torch.no_grad(): pto = pt_model(**pt_inputs_dict) tfo = tf_model(self._prepare_for_class(inputs_dict, model_class)) tfo = tfo[0].numpy() pto = pto[0].numpy() tf_nans = np.copy(np.isnan(tfo)) pt_nans = np.copy(np.isnan(pto)) pto[tf_nans] = 0 tfo[tf_nans] = 0 pto[pt_nans] = 0 tfo[pt_nans] = 0 max_diff = np.amax(np.abs(tfo - pto)) self.assertLessEqual(max_diff, 2e-2) def test_compile_tf_model(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() optimizer = tf.keras.optimizers.Adam(learning_rate=3e-5, epsilon=1e-08, clipnorm=1.0) loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) metric = tf.keras.metrics.SparseCategoricalAccuracy("accuracy") for model_class in self.all_model_classes: if self.is_encoder_decoder: input_ids = { "decoder_input_ids": tf.keras.Input( batch_shape=(2, 2000), name="decoder_input_ids", dtype="int32" ), "input_ids": tf.keras.Input(batch_shape=(2, 2000), name="input_ids", dtype="int32"), } elif model_class in TF_MODEL_FOR_MULTIPLE_CHOICE_MAPPING.values(): input_ids = tf.keras.Input(batch_shape=(4, 2, 2000), name="input_ids", dtype="int32") else: input_ids = tf.keras.Input(batch_shape=(2, 2000), name="input_ids", dtype="int32") # Prepare our model model = model_class(config) # Let's load it from the disk to be sure we can use pretrained weights with tempfile.TemporaryDirectory() as tmpdirname: outputs = model(self._prepare_for_class(inputs_dict, model_class)) # build the model model.save_pretrained(tmpdirname) model = model_class.from_pretrained(tmpdirname) outputs_dict = model(input_ids) hidden_states = outputs_dict[0] # Add a dense layer on top to test integration with other keras modules outputs = tf.keras.layers.Dense(2, activation="softmax", name="outputs")(hidden_states) # Compile extended model extended_model = tf.keras.Model(inputs=[input_ids], outputs=[outputs]) extended_model.compile(optimizer=optimizer, loss=loss, metrics=[metric]) def test_keyword_and_dict_args(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) outputs_dict = model(self._prepare_for_class(inputs_dict, model_class)) inputs_keywords = copy.deepcopy(self._prepare_for_class(inputs_dict, model_class)) input_ids = inputs_keywords.pop("input_ids", None) outputs_keywords = model(input_ids, **inputs_keywords) output_dict = outputs_dict[0].numpy() output_keywords = outputs_keywords[0].numpy() self.assertLess(np.sum(np.abs(output_dict - output_keywords)), 1e-6) def test_attention_outputs(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() decoder_seq_length = ( self.model_tester.decoder_seq_length if hasattr(self.model_tester, "decoder_seq_length") else self.model_tester.seq_length ) encoder_seq_length = ( self.model_tester.encoder_seq_length if hasattr(self.model_tester, "encoder_seq_length") else self.model_tester.seq_length ) decoder_key_length = ( self.model_tester.key_length if hasattr(self.model_tester, "key_length") else decoder_seq_length ) encoder_key_length = ( self.model_tester.key_length if hasattr(self.model_tester, "key_length") else encoder_seq_length ) for model_class in self.all_model_classes: inputs_dict["output_attentions"] = True config.output_hidden_states = False model = model_class(config) outputs = model(self._prepare_for_class(inputs_dict, model_class)) attentions = [t.numpy() for t in outputs[-1]] self.assertEqual(model.config.output_hidden_states, False) self.assertEqual(len(attentions), self.model_tester.num_hidden_layers) self.assertListEqual( list(attentions[0].shape[-3:]), [self.model_tester.num_attention_heads, encoder_seq_length, encoder_key_length], ) out_len = len(outputs) if self.is_encoder_decoder: self.assertEqual(out_len % 2, 0) decoder_attentions = outputs[(out_len // 2) - 1] self.assertEqual(model.config.output_hidden_states, False) self.assertEqual(len(decoder_attentions), self.model_tester.num_hidden_layers) self.assertListEqual( list(decoder_attentions[0].shape[-3:]), [self.model_tester.num_attention_heads, decoder_seq_length, decoder_key_length], ) # Check that output attentions can also be changed via the config del inputs_dict["output_attentions"] config.output_attentions = True model = model_class(config) outputs = model(self._prepare_for_class(inputs_dict, model_class)) attentions = [t.numpy() for t in outputs[-1]] self.assertEqual(model.config.output_hidden_states, False) self.assertEqual(len(attentions), self.model_tester.num_hidden_layers) self.assertListEqual( list(attentions[0].shape[-3:]), [self.model_tester.num_attention_heads, encoder_seq_length, encoder_key_length], ) # Check attention is always last and order is fine inputs_dict["output_attentions"] = True config.output_hidden_states = True model = model_class(config) outputs = model(self._prepare_for_class(inputs_dict, model_class)) self.assertEqual(out_len + (2 if self.is_encoder_decoder else 1), len(outputs)) self.assertEqual(model.config.output_hidden_states, True) attentions = [t.numpy() for t in outputs[-1]] self.assertEqual(len(attentions), self.model_tester.num_hidden_layers) self.assertListEqual( list(attentions[0].shape[-3:]), [self.model_tester.num_attention_heads, encoder_seq_length, encoder_key_length], ) def test_hidden_states_output(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() def check_hidden_states_output(config, inputs_dict, model_class): model = model_class(config) outputs = model(self._prepare_for_class(inputs_dict, model_class)) hidden_states = [t.numpy() for t in outputs[-1]] self.assertEqual(len(hidden_states), self.model_tester.num_hidden_layers + 1) self.assertListEqual( list(hidden_states[0].shape[-2:]), [self.model_tester.seq_length, self.model_tester.hidden_size], ) for model_class in self.all_model_classes: inputs_dict["output_hidden_states"] = True check_hidden_states_output(config, inputs_dict, model_class) del inputs_dict["output_hidden_states"] config.output_hidden_states = True check_hidden_states_output(config, inputs_dict, model_class) def test_model_common_attributes(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) assert isinstance(model.get_input_embeddings(), (tf.keras.layers.Layer, TFAdaptiveEmbedding)) x = model.get_output_embeddings() assert x is None or isinstance(x, tf.keras.layers.Layer) def test_determinism(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) first, second = ( model(self._prepare_for_class(inputs_dict, model_class), training=False)[0], model(self._prepare_for_class(inputs_dict, model_class), training=False)[0], ) out_1 = first.numpy() out_2 = second.numpy() out_1 = out_1[~np.isnan(out_1)] out_2 = out_2[~np.isnan(out_2)] max_diff = np.amax(np.abs(out_1 - out_2)) self.assertLessEqual(max_diff, 1e-5) def _get_embeds(self, wte, input_ids): # ^^ In our TF models, the input_embeddings can take slightly different forms, # so we try a few of them. # We used to fall back to just synthetically creating a dummy tensor of ones: try: x = wte(input_ids, mode="embedding") except Exception: try: x = wte([input_ids], mode="embedding") except Exception: try: x = wte([input_ids, None, None, None], mode="embedding") except Exception: if hasattr(self.model_tester, "embedding_size"): x = tf.ones(input_ids.shape + [self.model_tester.embedding_size], dtype=tf.dtypes.float32,) else: x = tf.ones(input_ids.shape + [self.model_tester.hidden_size], dtype=tf.dtypes.float32,) return x def test_inputs_embeds(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) inputs = copy.deepcopy(self._prepare_for_class(inputs_dict, model_class)) if not self.is_encoder_decoder: input_ids = inputs["input_ids"] del inputs["input_ids"] else: encoder_input_ids = inputs["input_ids"] decoder_input_ids = inputs.get("decoder_input_ids", encoder_input_ids) del inputs["input_ids"] inputs.pop("decoder_input_ids", None) wte = model.get_input_embeddings() if not self.is_encoder_decoder: inputs["inputs_embeds"] = self._get_embeds(wte, input_ids) else: inputs["inputs_embeds"] = self._get_embeds(wte, encoder_input_ids) inputs["decoder_inputs_embeds"] = self._get_embeds(wte, decoder_input_ids) model(inputs) def test_resize_token_embeddings(self): if not self.test_resize_embeddings: return config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() INPUT_SHAPE = [1, 10, config.hidden_size] for model_class in self.all_model_classes: for size in [config.vocab_size - 10, config.vocab_size + 10, None]: # build the embeddings model = model_class(config=config) emb_old = model.get_input_embeddings() emb_old.build(INPUT_SHAPE) # reshape the embeddings new_embeddings = model._get_resized_embeddings(emb_old, size) # # check that the the resized embeddings size matches the desired size. assert_size = size if size is not None else config.vocab_size self.assertEqual(new_embeddings.shape[0], assert_size) # check that weights remain the same after resizing emd_old_weights = model._get_word_embeddings(emb_old) models_equal = True for p1, p2 in zip(emd_old_weights.numpy(), new_embeddings.numpy()): if np.sum(abs(p1 - p2)) > 0: models_equal = False self.assertTrue(models_equal) def test_lm_head_model_random_no_beam_search_generate(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() input_ids = inputs_dict["input_ids"] if "input_ids" in inputs_dict else inputs_dict["inputs"] # iterate over all generative models for model_class in self.all_generative_model_classes: model = model_class(config) if config.bos_token_id is None: # if bos token id is not defined mobel needs input_ids with self.assertRaises(AssertionError): model.generate(do_sample=True, max_length=5) # num_return_sequences = 1 self._check_generated_ids(model.generate(input_ids, do_sample=True)) else: # num_return_sequences = 1 self._check_generated_ids(model.generate(do_sample=True, max_length=5)) with self.assertRaises(AssertionError): # generating multiple sequences when no beam search generation # is not allowed as it would always generate the same sequences model.generate(input_ids, do_sample=False, num_return_sequences=2) # num_return_sequences > 1, sample self._check_generated_ids(model.generate(input_ids, do_sample=True, num_return_sequences=2)) # check bad words tokens language generation # create list of 1-seq bad token and list of 2-seq of bad tokens bad_words_ids = [self._generate_random_bad_tokens(1, model), self._generate_random_bad_tokens(2, model)] output_tokens = model.generate( input_ids, do_sample=True, bad_words_ids=bad_words_ids, num_return_sequences=2 ) # only count generated tokens generated_ids = output_tokens[:, input_ids.shape[-1] :] self.assertFalse(self._check_match_tokens(generated_ids.numpy().tolist(), bad_words_ids)) def test_lm_head_model_random_beam_search_generate(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() input_ids = inputs_dict["input_ids"] if "input_ids" in inputs_dict else inputs_dict["inputs"] for model_class in self.all_generative_model_classes: model = model_class(config) if config.bos_token_id is None: # if bos token id is not defined mobel needs input_ids, num_return_sequences = 1 self._check_generated_ids(model.generate(input_ids, do_sample=True, num_beams=2)) else: # num_return_sequences = 1 self._check_generated_ids(model.generate(do_sample=True, max_length=5, num_beams=2)) with self.assertRaises(AssertionError): # generating more sequences than having beams leads is not possible model.generate(input_ids, do_sample=False, num_return_sequences=3, num_beams=2) # num_return_sequences > 1, sample self._check_generated_ids(model.generate(input_ids, do_sample=True, num_beams=2, num_return_sequences=2,)) # num_return_sequences > 1, greedy self._check_generated_ids(model.generate(input_ids, do_sample=False, num_beams=2, num_return_sequences=2)) # check bad words tokens language generation # create list of 1-seq bad token and list of 2-seq of bad tokens bad_words_ids = [self._generate_random_bad_tokens(1, model), self._generate_random_bad_tokens(2, model)] output_tokens = model.generate( input_ids, do_sample=False, bad_words_ids=bad_words_ids, num_beams=2, num_return_sequences=2 ) # only count generated tokens generated_ids = output_tokens[:, input_ids.shape[-1] :] self.assertFalse(self._check_match_tokens(generated_ids.numpy().tolist(), bad_words_ids)) def test_loss_computation(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in self.all_model_classes: model = model_class(config) if getattr(model, "compute_loss", None): # The number of elements in the loss should be the same as the number of elements in the label prepared_for_class = self._prepare_for_class(inputs_dict.copy(), model_class, return_labels=True) added_label = prepared_for_class[list(prepared_for_class.keys() - inputs_dict.keys())[0]] loss_size = tf.size(added_label) if model.__class__ in TF_MODEL_FOR_CAUSAL_LM_MAPPING.values(): # if loss is causal lm loss, labels are shift, so that one label per batch # is cut loss_size = loss_size - self.model_tester.batch_size # Test that model correctly compute the loss with kwargs prepared_for_class = self._prepare_for_class(inputs_dict.copy(), model_class, return_labels=True) input_ids = prepared_for_class.pop("input_ids") loss = model(input_ids, **prepared_for_class)[0] self.assertEqual(loss.shape, [loss_size]) # Test that model correctly compute the loss with a dict prepared_for_class = self._prepare_for_class(inputs_dict.copy(), model_class, return_labels=True) loss = model(prepared_for_class)[0] self.assertEqual(loss.shape, [loss_size]) # Test that model correctly compute the loss with a tuple prepared_for_class = self._prepare_for_class(inputs_dict.copy(), model_class, return_labels=True) # Get keys that were added with the _prepare_for_class function label_keys = prepared_for_class.keys() - inputs_dict.keys() signature = inspect.getfullargspec(model.call)[0] # Create a dictionary holding the location of the tensors in the tuple tuple_index_mapping = {1: "input_ids"} for label_key in label_keys: label_key_index = signature.index(label_key) tuple_index_mapping[label_key_index] = label_key sorted_tuple_index_mapping = sorted(tuple_index_mapping.items()) # Initialize a list with None, update the values and convert to a tuple list_input = [None] * sorted_tuple_index_mapping[-1][0] for index, value in sorted_tuple_index_mapping: list_input[index - 1] = prepared_for_class[value] tuple_input = tuple(list_input) # Send to model loss = model(tuple_input)[0] self.assertEqual(loss.shape, [loss_size]) def _generate_random_bad_tokens(self, num_bad_tokens, model): # special tokens cannot be bad tokens special_tokens = [] if model.config.bos_token_id is not None: special_tokens.append(model.config.bos_token_id) if model.config.pad_token_id is not None: special_tokens.append(model.config.pad_token_id) if model.config.eos_token_id is not None: special_tokens.append(model.config.eos_token_id) # create random bad tokens that are not special tokens bad_tokens = [] while len(bad_tokens) < num_bad_tokens: token = tf.squeeze(ids_tensor((1, 1), self.model_tester.vocab_size), 0).numpy()[0] if token not in special_tokens: bad_tokens.append(token) return bad_tokens def _check_generated_ids(self, output_ids): for token_id in output_ids[0].numpy().tolist(): self.assertGreaterEqual(token_id, 0) self.assertLess(token_id, self.model_tester.vocab_size) def _check_match_tokens(self, generated_ids, bad_words_ids): # for all bad word tokens for bad_word_ids in bad_words_ids: # for all slices in batch for generated_ids_slice in generated_ids: # for all word idx for i in range(len(bad_word_ids), len(generated_ids_slice)): # if tokens match if generated_ids_slice[i - len(bad_word_ids) : i] == bad_word_ids: return True return False def ids_tensor(shape, vocab_size, rng=None, name=None, dtype=None): """Creates a random int32 tensor of the shape within the vocab size.""" if rng is None: rng = random.Random() total_dims = 1 for dim in shape: total_dims *= dim values = [] for _ in range(total_dims): values.append(rng.randint(0, vocab_size - 1)) output = tf.constant(values, shape=shape, dtype=dtype if dtype is not None else tf.int32) return output @require_tf class UtilsFunctionsTest(unittest.TestCase): # tests whether the top_k_top_p_filtering function behaves as expected def test_top_k_top_p_filtering(self): logits = tf.convert_to_tensor( [ [ 8.2220991, # 3rd highest value; idx. 0 -0.5620044, 5.23229752, 4.0386393, -6.8798378, -0.54785802, -3.2012153, 2.92777176, 1.88171953, 7.35341276, # 5th highest value; idx. 9 8.43207833, # 2nd highest value; idx. 10 -9.85711836, -5.96209236, -1.13039161, -7.1115294, -0.8369633, -5.3186408, 7.06427407, 0.81369344, -0.82023817, -5.9179796, 0.58813443, -6.99778438, 4.71551189, -0.18771637, 7.44020759, # 4th highest value; idx. 25 9.38450987, # 1st highest value; idx. 26 2.12662941, -9.32562038, 2.35652522, ], # cummulative prob of 5 highest values <= 0.6 [ 0.58425518, 4.53139238, -5.57510464, -6.28030699, -7.19529503, -4.02122551, 1.39337037, -6.06707057, 1.59480517, -9.643119, 0.03907799, 0.67231762, -8.88206726, 6.27115922, # 4th highest value; idx. 13 2.28520723, 4.82767506, 4.30421368, 8.8275313, # 2nd highest value; idx. 17 5.44029958, # 5th highest value; idx. 18 -4.4735794, 7.38579536, # 3rd highest value; idx. 20 -2.91051663, 2.61946077, -2.5674762, -9.48959302, -4.02922645, -1.35416918, 9.67702323, # 1st highest value; idx. 27 -5.89478553, 1.85370467, ], # cummulative prob of 5 highest values <= 0.6 ], dtype=tf.float32, ) non_inf_expected_idx = tf.convert_to_tensor( [[0, 0], [0, 9], [0, 10], [0, 25], [0, 26], [1, 13], [1, 17], [1, 18], [1, 20], [1, 27]], dtype=tf.int32, ) # expected non filtered idx as noted above non_inf_expected_output = tf.convert_to_tensor( [8.222099, 7.3534126, 8.432078, 7.4402075, 9.38451, 6.271159, 8.827531, 5.4402995, 7.3857956, 9.677023], dtype=tf.float32, ) # expected non filtered values as noted above output = tf_top_k_top_p_filtering(logits, top_k=10, top_p=0.6, min_tokens_to_keep=4) non_inf_output = output[output != -float("inf")] non_inf_idx = tf.cast( tf.where(tf.not_equal(output, tf.constant(-float("inf"), dtype=tf.float32))), dtype=tf.int32, ) tf.debugging.assert_near(non_inf_output, non_inf_expected_output, rtol=1e-12) tf.debugging.assert_equal(non_inf_idx, non_inf_expected_idx)