# coding=utf-8 # Copyright 2021 HuggingFace Inc. team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import tempfile import unittest from transformers import is_torch_available from transformers.testing_utils import require_torch, slow, torch_device from .test_modeling_bert import BertModelTester from .test_modeling_common import ids_tensor from .test_modeling_speech_to_text import Speech2TextModelTester from .test_modeling_speech_to_text_2 import Speech2Text2StandaloneDecoderModelTester from .test_modeling_wav2vec2 import Wav2Vec2ModelTester if is_torch_available(): import numpy as np import torch from transformers import ( BertLMHeadModel, Speech2Text2ForCausalLM, SpeechEncoderDecoderConfig, SpeechEncoderDecoderModel, Wav2Vec2Model, ) from transformers.modeling_outputs import BaseModelOutput from transformers.models.speech_to_text.modeling_speech_to_text import Speech2TextEncoder @require_torch class EncoderDecoderMixin: def get_encoder_decoder_model(self, config, decoder_config): pass def prepare_config_and_inputs(self): pass def get_pretrained_model(self): pass def check_encoder_decoder_model_from_pretrained_configs( self, config, attention_mask, decoder_config, decoder_input_ids, decoder_attention_mask, input_values=None, input_features=None, **kwargs ): encoder_decoder_config = SpeechEncoderDecoderConfig.from_encoder_decoder_configs(config, decoder_config) self.assertTrue(encoder_decoder_config.decoder.is_decoder) enc_dec_model = SpeechEncoderDecoderModel(encoder_decoder_config) enc_dec_model.to(torch_device) enc_dec_model.eval() self.assertTrue(enc_dec_model.config.is_encoder_decoder) outputs_encoder_decoder = enc_dec_model( input_values=input_values, input_features=input_features, decoder_input_ids=decoder_input_ids, attention_mask=attention_mask, decoder_attention_mask=decoder_attention_mask, ) self.assertEqual( outputs_encoder_decoder["logits"].shape, (decoder_input_ids.shape + (decoder_config.vocab_size,)) ) def check_encoder_decoder_model( self, config, attention_mask, decoder_config, decoder_input_ids, decoder_attention_mask, input_values=None, input_features=None, **kwargs ): encoder_model, decoder_model = self.get_encoder_decoder_model(config, decoder_config) enc_dec_model = SpeechEncoderDecoderModel(encoder=encoder_model, decoder=decoder_model) self.assertTrue(enc_dec_model.config.decoder.is_decoder) self.assertTrue(enc_dec_model.config.decoder.add_cross_attention) self.assertTrue(enc_dec_model.config.is_encoder_decoder) enc_dec_model.to(torch_device) outputs_encoder_decoder = enc_dec_model( input_values=input_values, input_features=input_features, decoder_input_ids=decoder_input_ids, attention_mask=attention_mask, decoder_attention_mask=decoder_attention_mask, output_hidden_states=True, ) self.assertEqual( outputs_encoder_decoder["logits"].shape, (decoder_input_ids.shape + (decoder_config.vocab_size,)) ) encoder_outputs = BaseModelOutput(last_hidden_state=outputs_encoder_decoder.encoder_hidden_states[-1]) outputs_encoder_decoder = enc_dec_model( encoder_outputs=encoder_outputs, decoder_input_ids=decoder_input_ids, attention_mask=attention_mask, decoder_attention_mask=decoder_attention_mask, ) self.assertEqual( outputs_encoder_decoder["logits"].shape, (decoder_input_ids.shape + (decoder_config.vocab_size,)) ) def check_encoder_decoder_model_from_pretrained( self, config, attention_mask, decoder_config, decoder_input_ids, decoder_attention_mask, return_dict, input_values=None, input_features=None, **kwargs ): encoder_model, decoder_model = self.get_encoder_decoder_model(config, decoder_config) kwargs = {"encoder_model": encoder_model, "decoder_model": decoder_model, "return_dict": return_dict} enc_dec_model = SpeechEncoderDecoderModel.from_encoder_decoder_pretrained(**kwargs) enc_dec_model.to(torch_device) outputs_encoder_decoder = enc_dec_model( input_values=input_values, input_features=input_features, decoder_input_ids=decoder_input_ids, attention_mask=attention_mask, decoder_attention_mask=decoder_attention_mask, output_hidden_states=True, return_dict=True, ) self.assertEqual( outputs_encoder_decoder["logits"].shape, (decoder_input_ids.shape + (decoder_config.vocab_size,)) ) def check_save_and_load( self, config, attention_mask, decoder_config, decoder_input_ids, decoder_attention_mask, input_values=None, input_features=None, **kwargs ): encoder_model, decoder_model = self.get_encoder_decoder_model(config, decoder_config) enc_dec_model = SpeechEncoderDecoderModel(encoder=encoder_model, decoder=decoder_model) enc_dec_model.to(torch_device) enc_dec_model.eval() with torch.no_grad(): outputs = enc_dec_model( input_values=input_values, input_features=input_features, decoder_input_ids=decoder_input_ids, attention_mask=attention_mask, decoder_attention_mask=decoder_attention_mask, ) out_2 = outputs[0].cpu().numpy() out_2[np.isnan(out_2)] = 0 with tempfile.TemporaryDirectory() as tmpdirname: enc_dec_model.save_pretrained(tmpdirname) enc_dec_model = SpeechEncoderDecoderModel.from_pretrained(tmpdirname) enc_dec_model.to(torch_device) after_outputs = enc_dec_model( input_values=input_values, input_features=input_features, decoder_input_ids=decoder_input_ids, attention_mask=attention_mask, decoder_attention_mask=decoder_attention_mask, ) out_1 = after_outputs[0].cpu().numpy() out_1[np.isnan(out_1)] = 0 max_diff = np.amax(np.abs(out_1 - out_2)) self.assertLessEqual(max_diff, 1e-5) def check_save_and_load_encoder_decoder_model( self, config, attention_mask, decoder_config, decoder_input_ids, decoder_attention_mask, input_values=None, input_features=None, **kwargs ): encoder_model, decoder_model = self.get_encoder_decoder_model(config, decoder_config) enc_dec_model = SpeechEncoderDecoderModel(encoder=encoder_model, decoder=decoder_model) enc_dec_model.to(torch_device) enc_dec_model.eval() with torch.no_grad(): outputs = enc_dec_model( input_values=input_values, input_features=input_features, decoder_input_ids=decoder_input_ids, attention_mask=attention_mask, decoder_attention_mask=decoder_attention_mask, ) out_2 = outputs[0].cpu().numpy() out_2[np.isnan(out_2)] = 0 with tempfile.TemporaryDirectory() as encoder_tmp_dirname, tempfile.TemporaryDirectory() as decoder_tmp_dirname: enc_dec_model.encoder.save_pretrained(encoder_tmp_dirname) enc_dec_model.decoder.save_pretrained(decoder_tmp_dirname) SpeechEncoderDecoderModel.from_encoder_decoder_pretrained( encoder_pretrained_model_name_or_path=encoder_tmp_dirname, decoder_pretrained_model_name_or_path=decoder_tmp_dirname, ) after_outputs = enc_dec_model( input_values=input_values, input_features=input_features, decoder_input_ids=decoder_input_ids, attention_mask=attention_mask, decoder_attention_mask=decoder_attention_mask, ) out_1 = after_outputs[0].cpu().numpy() out_1[np.isnan(out_1)] = 0 max_diff = np.amax(np.abs(out_1 - out_2)) self.assertLessEqual(max_diff, 1e-5) def check_encoder_decoder_model_output_attentions( self, config, attention_mask, decoder_config, decoder_input_ids, decoder_attention_mask, labels=None, input_values=None, input_features=None, **kwargs ): # make the decoder inputs a different shape from the encoder inputs to harden the test decoder_input_ids = decoder_input_ids[:, :-1] decoder_attention_mask = decoder_attention_mask[:, :-1] encoder_model, decoder_model = self.get_encoder_decoder_model(config, decoder_config) enc_dec_model = SpeechEncoderDecoderModel(encoder=encoder_model, decoder=decoder_model) enc_dec_model.to(torch_device) outputs_encoder_decoder = enc_dec_model( input_values=input_values, input_features=input_features, decoder_input_ids=decoder_input_ids, attention_mask=attention_mask, decoder_attention_mask=decoder_attention_mask, output_attentions=True, ) inputs = input_values if input_features is None else input_features encoder_attentions = outputs_encoder_decoder["encoder_attentions"] self.assertEqual(len(encoder_attentions), config.num_hidden_layers) seq_len = enc_dec_model.encoder._get_feat_extract_output_lengths(inputs.shape[1]) self.assertEqual(encoder_attentions[0].shape[-3:], (config.num_attention_heads, seq_len, seq_len)) decoder_attentions = outputs_encoder_decoder["decoder_attentions"] num_decoder_layers = ( decoder_config.num_decoder_layers if hasattr(decoder_config, "num_decoder_layers") else decoder_config.num_hidden_layers ) self.assertEqual(len(decoder_attentions), num_decoder_layers) self.assertEqual( decoder_attentions[0].shape[-3:], (decoder_config.num_attention_heads, decoder_input_ids.shape[-1], decoder_input_ids.shape[-1]), ) cross_attentions = outputs_encoder_decoder["cross_attentions"] self.assertEqual(len(cross_attentions), num_decoder_layers) cross_attention_input_seq_len = decoder_input_ids.shape[-1] self.assertEqual( cross_attentions[0].shape[-3:], (decoder_config.num_attention_heads, cross_attention_input_seq_len, seq_len), ) def check_encoder_decoder_model_generate( self, config, decoder_config, input_values=None, input_features=None, **kwargs ): encoder_model, decoder_model = self.get_encoder_decoder_model(config, decoder_config) enc_dec_model = SpeechEncoderDecoderModel(encoder=encoder_model, decoder=decoder_model) enc_dec_model.to(torch_device) inputs = input_values if input_features is None else input_features # Bert does not have a bos token id, so use pad_token_id instead generated_output = enc_dec_model.generate( inputs, decoder_start_token_id=enc_dec_model.config.decoder.pad_token_id ) self.assertEqual(generated_output.shape, (inputs.shape[0],) + (decoder_config.max_length,)) def test_encoder_decoder_model(self): input_ids_dict = self.prepare_config_and_inputs() self.check_encoder_decoder_model(**input_ids_dict) def test_encoder_decoder_model_from_pretrained_configs(self): input_ids_dict = self.prepare_config_and_inputs() self.check_encoder_decoder_model_from_pretrained_configs(**input_ids_dict) def test_encoder_decoder_model_from_pretrained(self): input_ids_dict = self.prepare_config_and_inputs() self.check_encoder_decoder_model_from_pretrained(**input_ids_dict, return_dict=False) def test_encoder_decoder_model_from_pretrained_return_dict(self): input_ids_dict = self.prepare_config_and_inputs() self.check_encoder_decoder_model_from_pretrained(**input_ids_dict, return_dict=True) def test_save_and_load_from_pretrained(self): input_ids_dict = self.prepare_config_and_inputs() self.check_save_and_load(**input_ids_dict) def test_save_and_load_from_encoder_decoder_pretrained(self): input_ids_dict = self.prepare_config_and_inputs() self.check_save_and_load_encoder_decoder_model(**input_ids_dict) def test_encoder_decoder_model_output_attentions(self): input_ids_dict = self.prepare_config_and_inputs() self.check_encoder_decoder_model_output_attentions(**input_ids_dict) def test_encoder_decoder_model_generate(self): input_ids_dict = self.prepare_config_and_inputs() self.check_encoder_decoder_model_generate(**input_ids_dict) @slow def test_real_model_save_load_from_pretrained(self): model_2 = self.get_pretrained_model() model_2.to(torch_device) input_name, inputs = self.get_inputs() decoder_input_ids = ids_tensor([13, 1], model_2.config.encoder.vocab_size) attention_mask = ids_tensor([13, 5], vocab_size=2) with torch.no_grad(): outputs = model_2( **{input_name: inputs}, decoder_input_ids=decoder_input_ids, attention_mask=attention_mask, ) out_2 = outputs[0].cpu().numpy() out_2[np.isnan(out_2)] = 0 with tempfile.TemporaryDirectory() as tmp_dirname: model_2.save_pretrained(tmp_dirname) model_1 = SpeechEncoderDecoderModel.from_pretrained(tmp_dirname) model_1.to(torch_device) after_outputs = model_1( **{input_name: inputs}, decoder_input_ids=decoder_input_ids, attention_mask=attention_mask, ) out_1 = after_outputs[0].cpu().numpy() out_1[np.isnan(out_1)] = 0 max_diff = np.amax(np.abs(out_1 - out_2)) self.assertLessEqual(max_diff, 1e-5) @require_torch class Wav2Vec2BertModelTest(EncoderDecoderMixin, unittest.TestCase): def get_pretrained_model(self): return SpeechEncoderDecoderModel.from_encoder_decoder_pretrained( "facebook/wav2vec2-base-960h", "bert-base-cased" ) def get_encoder_decoder_model(self, config, decoder_config): encoder_model = Wav2Vec2Model(config).eval() decoder_model = BertLMHeadModel(decoder_config).eval() return encoder_model, decoder_model def prepare_config_and_inputs(self): bert_model_tester = BertModelTester(self) wav2vec2_model_tester = Wav2Vec2ModelTester(self) encoder_config_and_inputs = wav2vec2_model_tester.prepare_config_and_inputs() decoder_config_and_inputs = bert_model_tester.prepare_config_and_inputs_for_decoder() ( config, input_values, input_mask, ) = encoder_config_and_inputs ( decoder_config, decoder_input_ids, decoder_token_type_ids, decoder_input_mask, decoder_sequence_labels, decoder_token_labels, decoder_choice_labels, encoder_attention_mask, _, ) = decoder_config_and_inputs # make sure that cross attention layers are added decoder_config.add_cross_attention = True return { "config": config, "input_values": input_values, "attention_mask": input_mask, "decoder_config": decoder_config, "decoder_input_ids": decoder_input_ids, "decoder_token_type_ids": decoder_token_type_ids, "decoder_attention_mask": decoder_input_mask, "decoder_sequence_labels": decoder_sequence_labels, "decoder_token_labels": decoder_token_labels, "decoder_choice_labels": decoder_choice_labels, "labels": decoder_token_labels, } @require_torch class Speech2TextBertModelTest(EncoderDecoderMixin, unittest.TestCase): def get_pretrained_model(self): return SpeechEncoderDecoderModel.from_encoder_decoder_pretrained( "facebook/s2t-small-librispeech-asr", "bert-base-cased" ) def get_encoder_decoder_model(self, config, decoder_config): encoder_model = Speech2TextEncoder(config).eval() decoder_model = BertLMHeadModel(decoder_config).eval() return encoder_model, decoder_model def prepare_config_and_inputs(self): bert_model_tester = BertModelTester(self) speech2text_model_tester = Speech2TextModelTester(self) encoder_config_and_inputs = speech2text_model_tester.prepare_config_and_inputs() decoder_config_and_inputs = bert_model_tester.prepare_config_and_inputs_for_decoder() config, inputs = encoder_config_and_inputs input_features = inputs["input_features"] input_mask = inputs["attention_mask"] ( decoder_config, decoder_input_ids, decoder_token_type_ids, decoder_input_mask, decoder_sequence_labels, decoder_token_labels, decoder_choice_labels, encoder_attention_mask, _, ) = decoder_config_and_inputs # make sure that cross attention layers are added decoder_config.add_cross_attention = True return { "config": config, "input_features": input_features, "attention_mask": input_mask, "decoder_config": decoder_config, "decoder_input_ids": decoder_input_ids, "decoder_token_type_ids": decoder_token_type_ids, "decoder_attention_mask": decoder_input_mask, "decoder_sequence_labels": decoder_sequence_labels, "decoder_token_labels": decoder_token_labels, "decoder_choice_labels": decoder_choice_labels, "labels": decoder_token_labels, } # can't save full model for now because Speech2TextModel != Speech2TextEncoder def test_encoder_decoder_model_from_pretrained_configs(self): pass # can't save full model for now because Speech2TextModel != Speech2TextEncoder def test_save_and_load_from_pretrained(self): pass @require_torch class Wav2Vec2Speech2Text2(EncoderDecoderMixin, unittest.TestCase): def get_encoder_decoder_model(self, config, decoder_config): encoder_model = Wav2Vec2Model(config).eval() decoder_model = Speech2Text2ForCausalLM(decoder_config).eval() return encoder_model, decoder_model def prepare_config_and_inputs(self): model_tester_encoder = Wav2Vec2ModelTester(self, batch_size=13) model_tester_decoder = Speech2Text2StandaloneDecoderModelTester( self, batch_size=13, d_model=32, max_position_embeddings=512 ) encoder_config_and_inputs = model_tester_encoder.prepare_config_and_inputs() decoder_config_and_inputs = model_tester_decoder.prepare_config_and_inputs() ( config, input_values, input_mask, ) = encoder_config_and_inputs (decoder_config, decoder_input_ids, decoder_attention_mask, _) = decoder_config_and_inputs # make sure that cross attention layers are added decoder_config.add_cross_attention = True # disable cache for now decoder_config.use_cache = False return { "config": config, "input_values": input_values, "attention_mask": input_mask, "decoder_config": decoder_config, "decoder_input_ids": decoder_input_ids, "decoder_attention_mask": decoder_attention_mask, } def get_pretrained_model(self): return SpeechEncoderDecoderModel.from_encoder_decoder_pretrained("bert-large-uncased", "facebook/bart-large")