# Copyright 2021 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Testing suite for the PyTorch M2M100 model.""" import copy import tempfile import unittest import pytest from transformers import M2M100Config, is_torch_available from transformers.testing_utils import ( require_flash_attn, require_sentencepiece, require_tokenizers, require_torch, require_torch_fp16, require_torch_gpu, slow, torch_device, ) from transformers.utils import cached_property from ...generation.test_utils import GenerationTesterMixin from ...test_configuration_common import ConfigTester from ...test_modeling_common import ModelTesterMixin, ids_tensor from ...test_pipeline_mixin import PipelineTesterMixin if is_torch_available(): import torch from transformers import M2M100ForConditionalGeneration, M2M100Model, M2M100Tokenizer from transformers.models.m2m_100.modeling_m2m_100 import M2M100Decoder, M2M100Encoder def prepare_m2m_100_inputs_dict( config, input_ids, decoder_input_ids, attention_mask=None, decoder_attention_mask=None, ): if attention_mask is None: attention_mask = input_ids.ne(config.pad_token_id) if decoder_attention_mask is None: decoder_attention_mask = decoder_input_ids.ne(config.pad_token_id) return { "input_ids": input_ids, "decoder_input_ids": decoder_input_ids, "attention_mask": attention_mask, "decoder_attention_mask": attention_mask, } class M2M100ModelTester: def __init__( self, parent, batch_size=13, seq_length=7, is_training=True, use_labels=False, vocab_size=99, hidden_size=16, num_hidden_layers=2, num_attention_heads=4, intermediate_size=4, hidden_act="relu", hidden_dropout_prob=0.1, attention_probs_dropout_prob=0.1, encoder_layerdrop=0.0, decoder_layerdrop=0.0, max_position_embeddings=20, eos_token_id=2, pad_token_id=1, bos_token_id=0, ): self.parent = parent self.batch_size = batch_size self.seq_length = seq_length self.is_training = is_training self.use_labels = use_labels self.vocab_size = vocab_size self.hidden_size = hidden_size self.num_hidden_layers = num_hidden_layers self.num_attention_heads = num_attention_heads self.intermediate_size = intermediate_size self.hidden_act = hidden_act self.hidden_dropout_prob = hidden_dropout_prob self.attention_probs_dropout_prob = attention_probs_dropout_prob self.encoder_layerdrop = encoder_layerdrop self.decoder_layerdrop = decoder_layerdrop self.max_position_embeddings = max_position_embeddings self.eos_token_id = eos_token_id self.pad_token_id = pad_token_id self.bos_token_id = bos_token_id def prepare_config_and_inputs(self): input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size) input_ids[:, -1] = self.eos_token_id # Eos Token decoder_input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size) # we need to clamp the input ids here to avoid having pad token in between # this is because for M2M100 the position_ids are prepared such that # all pad tokens have pos id = 2 and rest are between 2..seq_length # and the seq_length here is seq_length - num_pad_tokens # but when using past, there is no way of knowing if the past input ids had # pad tokens in them, which results in incorrect seq_lenth and which in turn results in # position_ids being off by num_pad_tokens in past input input_ids = input_ids.clamp(self.pad_token_id + 1) decoder_input_ids = decoder_input_ids.clamp(self.pad_token_id + 1) config = self.get_config() inputs_dict = prepare_m2m_100_inputs_dict(config, input_ids, decoder_input_ids) return config, inputs_dict def get_config(self): return M2M100Config( vocab_size=self.vocab_size, d_model=self.hidden_size, encoder_layers=self.num_hidden_layers, decoder_layers=self.num_hidden_layers, encoder_attention_heads=self.num_attention_heads, decoder_attention_heads=self.num_attention_heads, encoder_ffn_dim=self.intermediate_size, decoder_ffn_dim=self.intermediate_size, dropout=self.hidden_dropout_prob, attention_dropout=self.attention_probs_dropout_prob, encoder_layerdrop=self.encoder_layerdrop, decoder_layerdrop=self.decoder_layerdrop, max_position_embeddings=self.max_position_embeddings, eos_token_id=self.eos_token_id, bos_token_id=self.bos_token_id, pad_token_id=self.pad_token_id, ) def prepare_config_and_inputs_for_common(self): config, inputs_dict = self.prepare_config_and_inputs() return config, inputs_dict def create_and_check_decoder_model_past_large_inputs(self, config, inputs_dict): model = M2M100Model(config=config).get_decoder().to(torch_device).eval() input_ids = inputs_dict["input_ids"] attention_mask = inputs_dict["attention_mask"] # first forward pass outputs = model(input_ids, attention_mask=attention_mask, use_cache=True) output, past_key_values = outputs.to_tuple() # create hypothetical multiple next token and extent to next_input_ids next_tokens = ids_tensor((self.batch_size, 3), config.vocab_size) next_attn_mask = ids_tensor((self.batch_size, 3), 2) # append to next input_ids and next_input_ids = torch.cat([input_ids, next_tokens], dim=-1) next_attention_mask = torch.cat([attention_mask, next_attn_mask], dim=-1) output_from_no_past = model(next_input_ids, attention_mask=next_attention_mask)["last_hidden_state"] output_from_past = model(next_tokens, attention_mask=next_attention_mask, past_key_values=past_key_values)[ "last_hidden_state" ] # select random slice random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item() output_from_no_past_slice = output_from_no_past[:, -3:, random_slice_idx].detach() output_from_past_slice = output_from_past[:, :, random_slice_idx].detach() self.parent.assertTrue(output_from_past_slice.shape[1] == next_tokens.shape[1]) # test that outputs are equal for slice self.parent.assertTrue(torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-2)) def check_encoder_decoder_model_standalone(self, config, inputs_dict): model = M2M100Model(config=config).to(torch_device).eval() outputs = model(**inputs_dict) encoder_last_hidden_state = outputs.encoder_last_hidden_state last_hidden_state = outputs.last_hidden_state with tempfile.TemporaryDirectory() as tmpdirname: encoder = model.get_encoder() encoder.save_pretrained(tmpdirname) encoder = M2M100Encoder.from_pretrained(tmpdirname).to(torch_device) encoder_last_hidden_state_2 = encoder(inputs_dict["input_ids"], attention_mask=inputs_dict["attention_mask"])[ 0 ] self.parent.assertTrue((encoder_last_hidden_state_2 - encoder_last_hidden_state).abs().max().item() < 1e-3) with tempfile.TemporaryDirectory() as tmpdirname: decoder = model.get_decoder() decoder.save_pretrained(tmpdirname) decoder = M2M100Decoder.from_pretrained(tmpdirname).to(torch_device) last_hidden_state_2 = decoder( input_ids=inputs_dict["decoder_input_ids"], attention_mask=inputs_dict["decoder_attention_mask"], encoder_hidden_states=encoder_last_hidden_state, encoder_attention_mask=inputs_dict["attention_mask"], )[0] self.parent.assertTrue((last_hidden_state_2 - last_hidden_state).abs().max().item() < 1e-3) @require_torch class M2M100ModelTest(ModelTesterMixin, GenerationTesterMixin, PipelineTesterMixin, unittest.TestCase): all_model_classes = ( ( M2M100Model, M2M100ForConditionalGeneration, ) if is_torch_available() else () ) pipeline_model_mapping = ( { "feature-extraction": M2M100Model, "summarization": M2M100ForConditionalGeneration, "text2text-generation": M2M100ForConditionalGeneration, "translation": M2M100ForConditionalGeneration, } if is_torch_available() else {} ) is_encoder_decoder = True fx_compatible = True test_pruning = False test_missing_keys = False # TODO: Fix the failed tests def is_pipeline_test_to_skip( self, pipeline_test_case_name, config_class, model_architecture, tokenizer_name, image_processor_name, feature_extractor_name, processor_name, ): if pipeline_test_case_name == "TranslationPipelineTests": # Get `ValueError: Translation requires a `src_lang` and a `tgt_lang` for this model`. # `M2M100Config` was never used in pipeline tests: cannot create a simple tokenizer. return True return False def setUp(self): self.model_tester = M2M100ModelTester(self) self.config_tester = ConfigTester(self, config_class=M2M100Config) def test_config(self): self.config_tester.run_common_tests() def test_save_load_strict(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs() for model_class in self.all_model_classes: model = model_class(config) with tempfile.TemporaryDirectory() as tmpdirname: model.save_pretrained(tmpdirname) model2, info = model_class.from_pretrained(tmpdirname, output_loading_info=True) self.assertEqual(info["missing_keys"], []) def test_decoder_model_past_with_large_inputs(self): config_and_inputs = self.model_tester.prepare_config_and_inputs() self.model_tester.create_and_check_decoder_model_past_large_inputs(*config_and_inputs) def test_encoder_decoder_model_standalone(self): config_and_inputs = self.model_tester.prepare_config_and_inputs_for_common() self.model_tester.check_encoder_decoder_model_standalone(*config_and_inputs) def test_inputs_embeds(self): config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() for model_class in (M2M100Model, M2M100ForConditionalGeneration): model = model_class(config) model.to(torch_device) model.eval() inputs = copy.deepcopy(self._prepare_for_class(inputs_dict, model_class)) if not self.is_encoder_decoder: input_ids = inputs["input_ids"] del inputs["input_ids"] else: encoder_input_ids = inputs["input_ids"] decoder_input_ids = inputs.get("decoder_input_ids", encoder_input_ids) del inputs["input_ids"] inputs.pop("decoder_input_ids", None) wte = model.get_input_embeddings() if not self.is_encoder_decoder: inputs["inputs_embeds"] = wte(input_ids) else: inputs["inputs_embeds"] = wte(encoder_input_ids) inputs["decoder_inputs_embeds"] = wte(decoder_input_ids) with torch.no_grad(): model(**inputs)[0] @require_torch_fp16 def test_generate_fp16(self): config, input_dict = self.model_tester.prepare_config_and_inputs() input_ids = input_dict["input_ids"] attention_mask = input_ids.ne(1).to(torch_device) model = M2M100ForConditionalGeneration(config).eval().to(torch_device) model.half() model.generate(input_ids, attention_mask=attention_mask) model.generate(num_beams=4, do_sample=True, early_stopping=False, num_return_sequences=3) @unittest.skip( reason="This architecture has tied weights by default and there is no way to remove it, check: https://github.com/huggingface/transformers/pull/31771#issuecomment-2210915245" ) def test_load_save_without_tied_weights(self): pass def _long_tensor(tok_lst): return torch.tensor(tok_lst, dtype=torch.long, device=torch_device) TOLERANCE = 1e-4 @require_torch @require_sentencepiece @require_tokenizers @slow class M2M100ModelIntegrationTests(unittest.TestCase): @cached_property def default_tokenizer(self): return M2M100Tokenizer.from_pretrained("facebook/m2m100_418M") def test_inference_no_head(self): model = M2M100Model.from_pretrained("facebook/m2m100_418M").to(torch_device) input_ids = _long_tensor([[128028, 98, 12, 30527, 2732, 159, 7755, 61904, 39144, 38, 2]]) decoder_input_ids = _long_tensor([[2, 128028, 98, 12, 30527, 2732, 159, 7755, 61904, 39144, 38]]) inputs_dict = prepare_m2m_100_inputs_dict(model.config, input_ids, decoder_input_ids) with torch.no_grad(): output = model(**inputs_dict)[0] expected_shape = torch.Size((1, 11, 1024)) self.assertEqual(output.shape, expected_shape) # change to expected output here expected_slice = torch.tensor( [[-0.7780, -0.1676, 0.1038], [-6.7556, -1.3992, 0.0567], [-7.5383, -0.5920, -0.2779]], device=torch_device ) torch.testing.assert_close(output[:, :3, :3], expected_slice, rtol=TOLERANCE, atol=TOLERANCE) def test_inference_head(self): model = M2M100ForConditionalGeneration.from_pretrained("facebook/m2m100_418M").to(torch_device) # change to intended input input_ids = _long_tensor([[128028, 98, 12, 30527, 2732, 159, 7755, 61904, 39144, 38, 2]]) decoder_input_ids = _long_tensor([[2, 128028, 98, 12, 30527, 2732, 159, 7755, 61904, 39144, 38]]) inputs_dict = prepare_m2m_100_inputs_dict(model.config, input_ids, decoder_input_ids) with torch.no_grad(): output = model(**inputs_dict)[0] expected_shape = torch.Size((1, 11, model.config.vocab_size)) self.assertEqual(output.shape, expected_shape) # change to expected output here expected_slice = torch.tensor( [[-1.0448, -1.0411, 3.7992], [-3.2191, -3.2386, -1.3451], [-3.6210, -3.5993, 0.4925]], device=torch_device ) torch.testing.assert_close(output[:, :3, :3], expected_slice, rtol=TOLERANCE, atol=TOLERANCE) def test_seq_to_seq_generation(self): model = M2M100ForConditionalGeneration.from_pretrained("facebook/m2m100_418M").to(torch_device) tokenizer = M2M100Tokenizer.from_pretrained("facebook/m2m100_418M", src_lang="fr", tgt_lang="en") src_fr = [ "L'affaire NSA souligne l'absence totale de débat sur le renseignement", "Selon moi, il y a deux niveaux de réponse de la part du gouvernement français.", "Lorsque François Hollande téléphone à Barack Obama ou quand le ministre des affaires étrangères Laurent" " Fabius convoque l'ambassadeur des Etats-Unis, ils réagissent à une vraie découverte, qui est celle de" " l'ampleur de la surveillance américaine sur l'ensemble des communications en France.", ] # The below article tests that we don't add any hypotheses outside of the top n_beams dct = tokenizer(src_fr, padding=True, return_tensors="pt") hypotheses_batch = model.generate( input_ids=dct["input_ids"].to(torch_device), attention_mask=dct["attention_mask"].to(torch_device), num_beams=5, forced_bos_token_id=tokenizer.get_lang_id("en"), ) expected_en = [ " __en__ " "The NSA case highlights the total absence of intelligence debate" "", " __en__ " "I think there are two levels of response from the French government." "", " __en__ " "When François Hollande calls Barack Obama or when Foreign Minister Laurent Fabius calls the U.S." " Ambassador, they respond to a real discovery, which is that of the scale of U.S. surveillance on all" " communications in France." "", ] generated = tokenizer.batch_decode(hypotheses_batch) assert generated == expected_en @require_flash_attn @require_torch_gpu @pytest.mark.flash_attn_test @slow def test_flash_attn_2_seq_to_seq_generation(self): """ Overwriting the common test as the test is flaky on tiny models """ model = M2M100ForConditionalGeneration.from_pretrained( "facebook/m2m100_418M", attn_implementation="flash_attention_2" ).to(torch_device) tokenizer = M2M100Tokenizer.from_pretrained("facebook/m2m100_418M", src_lang="fr", tgt_lang="en") src_fr = [ "L'affaire NSA souligne l'absence totale de débat sur le renseignement", "Selon moi, il y a deux niveaux de réponse de la part du gouvernement français.", "Lorsque François Hollande téléphone à Barack Obama ou quand le ministre des affaires étrangères Laurent" " Fabius convoque l'ambassadeur des Etats-Unis, ils réagissent à une vraie découverte, qui est celle de" " l'ampleur de la surveillance américaine sur l'ensemble des communications en France.", ] # The below article tests that we don't add any hypotheses outside of the top n_beams dct = tokenizer(src_fr, padding=True, return_tensors="pt") hypotheses_batch = model.generate( input_ids=dct["input_ids"].to(torch_device), attention_mask=dct["attention_mask"].to(torch_device), num_beams=5, forced_bos_token_id=tokenizer.get_lang_id("en"), ) expected_en = [ "The NSA case highlights the total absence of intelligence debate", "I think there are two levels of response from the French government.", "When François Hollande calls Barack Obama or when Foreign Minister Laurent Fabius calls the U.S." " Ambassador, they respond to a real discovery, which is that of the scale of U.S. surveillance on all" " communications in France.", ] generated = tokenizer.batch_decode( hypotheses_batch.tolist(), clean_up_tokenization_spaces=True, skip_special_tokens=True ) assert generated == expected_en