mirror of
https://github.com/huggingface/transformers.git
synced 2025-07-05 22:00:09 +06:00

* [WIP] Adding new parameter to `generate`: `max_time`. Generation by tokens number is sometimes a bit clunky because we don't know how many tokens are good enough or even how many tokens are in the payload (for pipelines users for instance). This leads to hard to understand behavior. This PR proposes a new argument `max_time` which is a float of seconds for the allowed time for `generate` to run on. Ideally combinations of `max_tokens=None`, `max_time=2` could be used to generate as many tokens as possible within time budget. NB: Another possible approach consists of passing a callback to `generate` putting the caller in charge of the actual decision of when to stop generating tokens. It opens the door to 'which args should we pass' to this callback. It's hard to imagine other use-cases for this early stopping behavior than time (that are not already covered by parameters of generate) * Revamp with StoppingCriteria * Removing deprecated mentions. * Forgot arguments to stopping criteria. * Readding max_length it's not just used as a stopping criteria. * Default value for `stopping_criteria`. * Address @patrickvonplaten comments. - More docstrings - Actual doc - Include in global namespace - Remove TF work. * Put back `max_length` (deprecation different PR). * Doc quality. * Fixing old behavior without `stopping_criteria` but with `max_length`. Making sure we don't break that in the future. * Adding more tests for possible inconsistencies between `max_length` and `stopping_criteria`. * Fixing the torch imports.
694 lines
29 KiB
Python
694 lines
29 KiB
Python
# coding=utf-8
|
|
# Copyright 2020 The HuggingFace Team. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
|
|
import datetime
|
|
import unittest
|
|
|
|
from transformers import is_torch_available
|
|
from transformers.testing_utils import require_torch, slow, torch_device
|
|
|
|
from .test_configuration_common import ConfigTester
|
|
from .test_generation_utils import GenerationTesterMixin
|
|
from .test_modeling_common import ModelTesterMixin, floats_tensor, ids_tensor, random_attention_mask
|
|
|
|
|
|
if is_torch_available():
|
|
import torch
|
|
|
|
from transformers import (
|
|
GPT2_PRETRAINED_MODEL_ARCHIVE_LIST,
|
|
GPT2Config,
|
|
GPT2DoubleHeadsModel,
|
|
GPT2ForSequenceClassification,
|
|
GPT2LMHeadModel,
|
|
GPT2Model,
|
|
GPT2Tokenizer,
|
|
)
|
|
|
|
|
|
class GPT2ModelTester:
|
|
def __init__(
|
|
self,
|
|
parent,
|
|
batch_size=14,
|
|
seq_length=7,
|
|
is_training=True,
|
|
use_token_type_ids=True,
|
|
use_input_mask=True,
|
|
use_labels=True,
|
|
use_mc_token_ids=True,
|
|
vocab_size=99,
|
|
hidden_size=32,
|
|
num_hidden_layers=5,
|
|
num_attention_heads=4,
|
|
intermediate_size=37,
|
|
hidden_act="gelu",
|
|
hidden_dropout_prob=0.1,
|
|
attention_probs_dropout_prob=0.1,
|
|
max_position_embeddings=512,
|
|
type_vocab_size=16,
|
|
type_sequence_label_size=2,
|
|
initializer_range=0.02,
|
|
num_labels=3,
|
|
num_choices=4,
|
|
scope=None,
|
|
):
|
|
self.parent = parent
|
|
self.batch_size = batch_size
|
|
self.seq_length = seq_length
|
|
self.is_training = is_training
|
|
self.use_token_type_ids = use_token_type_ids
|
|
self.use_input_mask = use_input_mask
|
|
self.use_labels = use_labels
|
|
self.use_mc_token_ids = use_mc_token_ids
|
|
self.vocab_size = vocab_size
|
|
self.hidden_size = hidden_size
|
|
self.num_hidden_layers = num_hidden_layers
|
|
self.num_attention_heads = num_attention_heads
|
|
self.intermediate_size = intermediate_size
|
|
self.hidden_act = hidden_act
|
|
self.hidden_dropout_prob = hidden_dropout_prob
|
|
self.attention_probs_dropout_prob = attention_probs_dropout_prob
|
|
self.max_position_embeddings = max_position_embeddings
|
|
self.type_vocab_size = type_vocab_size
|
|
self.type_sequence_label_size = type_sequence_label_size
|
|
self.initializer_range = initializer_range
|
|
self.num_labels = num_labels
|
|
self.num_choices = num_choices
|
|
self.scope = None
|
|
self.bos_token_id = vocab_size - 1
|
|
self.eos_token_id = vocab_size - 1
|
|
self.pad_token_id = vocab_size - 1
|
|
|
|
def get_large_model_config(self):
|
|
return GPT2Config.from_pretrained("gpt2")
|
|
|
|
def prepare_config_and_inputs(self, gradient_checkpointing=False):
|
|
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
|
|
|
|
input_mask = None
|
|
if self.use_input_mask:
|
|
input_mask = random_attention_mask([self.batch_size, self.seq_length])
|
|
|
|
token_type_ids = None
|
|
if self.use_token_type_ids:
|
|
token_type_ids = ids_tensor([self.batch_size, self.seq_length], self.type_vocab_size)
|
|
|
|
mc_token_ids = None
|
|
if self.use_mc_token_ids:
|
|
mc_token_ids = ids_tensor([self.batch_size, self.num_choices], self.seq_length)
|
|
|
|
sequence_labels = None
|
|
token_labels = None
|
|
choice_labels = None
|
|
if self.use_labels:
|
|
sequence_labels = ids_tensor([self.batch_size], self.type_sequence_label_size)
|
|
token_labels = ids_tensor([self.batch_size, self.seq_length], self.num_labels)
|
|
choice_labels = ids_tensor([self.batch_size], self.num_choices)
|
|
|
|
config = GPT2Config(
|
|
vocab_size=self.vocab_size,
|
|
n_embd=self.hidden_size,
|
|
n_layer=self.num_hidden_layers,
|
|
n_head=self.num_attention_heads,
|
|
# intermediate_size=self.intermediate_size,
|
|
# hidden_act=self.hidden_act,
|
|
# hidden_dropout_prob=self.hidden_dropout_prob,
|
|
# attention_probs_dropout_prob=self.attention_probs_dropout_prob,
|
|
n_positions=self.max_position_embeddings,
|
|
n_ctx=self.max_position_embeddings,
|
|
# type_vocab_size=self.type_vocab_size,
|
|
# initializer_range=self.initializer_range,
|
|
use_cache=not gradient_checkpointing,
|
|
bos_token_id=self.bos_token_id,
|
|
eos_token_id=self.eos_token_id,
|
|
pad_token_id=self.pad_token_id,
|
|
gradient_checkpointing=gradient_checkpointing,
|
|
)
|
|
|
|
head_mask = ids_tensor([self.num_hidden_layers, self.num_attention_heads], 2)
|
|
|
|
return (
|
|
config,
|
|
input_ids,
|
|
input_mask,
|
|
head_mask,
|
|
token_type_ids,
|
|
mc_token_ids,
|
|
sequence_labels,
|
|
token_labels,
|
|
choice_labels,
|
|
)
|
|
|
|
def prepare_config_and_inputs_for_decoder(self):
|
|
(
|
|
config,
|
|
input_ids,
|
|
input_mask,
|
|
head_mask,
|
|
token_type_ids,
|
|
mc_token_ids,
|
|
sequence_labels,
|
|
token_labels,
|
|
choice_labels,
|
|
) = self.prepare_config_and_inputs()
|
|
|
|
encoder_hidden_states = floats_tensor([self.batch_size, self.seq_length, self.hidden_size])
|
|
encoder_attention_mask = ids_tensor([self.batch_size, self.seq_length], vocab_size=2)
|
|
|
|
return (
|
|
config,
|
|
input_ids,
|
|
input_mask,
|
|
head_mask,
|
|
token_type_ids,
|
|
sequence_labels,
|
|
token_labels,
|
|
choice_labels,
|
|
encoder_hidden_states,
|
|
encoder_attention_mask,
|
|
)
|
|
|
|
def create_and_check_gpt2_model(self, config, input_ids, input_mask, head_mask, token_type_ids, *args):
|
|
model = GPT2Model(config=config)
|
|
model.to(torch_device)
|
|
model.eval()
|
|
|
|
result = model(input_ids, token_type_ids=token_type_ids, head_mask=head_mask)
|
|
result = model(input_ids, token_type_ids=token_type_ids)
|
|
result = model(input_ids)
|
|
|
|
self.parent.assertEqual(result.last_hidden_state.shape, (self.batch_size, self.seq_length, self.hidden_size))
|
|
self.parent.assertEqual(len(result.past_key_values), config.n_layer)
|
|
|
|
def create_and_check_gpt2_model_past(self, config, input_ids, input_mask, head_mask, token_type_ids, *args):
|
|
model = GPT2Model(config=config)
|
|
model.to(torch_device)
|
|
model.eval()
|
|
|
|
# first forward pass
|
|
outputs = model(input_ids, token_type_ids=token_type_ids, use_cache=True)
|
|
outputs_use_cache_conf = model(input_ids, token_type_ids=token_type_ids)
|
|
outputs_no_past = model(input_ids, token_type_ids=token_type_ids, use_cache=False)
|
|
|
|
self.parent.assertTrue(len(outputs) == len(outputs_use_cache_conf))
|
|
self.parent.assertTrue(len(outputs) == len(outputs_no_past) + 1)
|
|
|
|
output, past = outputs.to_tuple()
|
|
|
|
# create hypothetical next token and extent to next_input_ids
|
|
next_tokens = ids_tensor((self.batch_size, 1), config.vocab_size)
|
|
next_token_types = ids_tensor([self.batch_size, 1], self.type_vocab_size)
|
|
|
|
# append to next input_ids and token_type_ids
|
|
next_input_ids = torch.cat([input_ids, next_tokens], dim=-1)
|
|
next_token_type_ids = torch.cat([token_type_ids, next_token_types], dim=-1)
|
|
|
|
output_from_no_past = model(next_input_ids, token_type_ids=next_token_type_ids)["last_hidden_state"]
|
|
output_from_past = model(next_tokens, token_type_ids=next_token_types, past_key_values=past)[
|
|
"last_hidden_state"
|
|
]
|
|
|
|
# select random slice
|
|
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
|
|
output_from_no_past_slice = output_from_no_past[:, -1, random_slice_idx].detach()
|
|
output_from_past_slice = output_from_past[:, 0, random_slice_idx].detach()
|
|
|
|
# test that outputs are equal for slice
|
|
self.parent.assertTrue(torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3))
|
|
|
|
def create_and_check_gpt2_model_attention_mask_past(
|
|
self, config, input_ids, input_mask, head_mask, token_type_ids, *args
|
|
):
|
|
model = GPT2Model(config=config)
|
|
model.to(torch_device)
|
|
model.eval()
|
|
|
|
# create attention mask
|
|
attn_mask = torch.ones(input_ids.shape, dtype=torch.long, device=torch_device)
|
|
half_seq_length = self.seq_length // 2
|
|
attn_mask[:, half_seq_length:] = 0
|
|
|
|
# first forward pass
|
|
output, past = model(input_ids, attention_mask=attn_mask).to_tuple()
|
|
|
|
# create hypothetical next token and extent to next_input_ids
|
|
next_tokens = ids_tensor((self.batch_size, 1), config.vocab_size)
|
|
|
|
# change a random masked slice from input_ids
|
|
random_seq_idx_to_change = ids_tensor((1,), half_seq_length).item() + 1
|
|
random_other_next_tokens = ids_tensor((self.batch_size, 1), config.vocab_size).squeeze(-1)
|
|
input_ids[:, -random_seq_idx_to_change] = random_other_next_tokens
|
|
|
|
# append to next input_ids and attn_mask
|
|
next_input_ids = torch.cat([input_ids, next_tokens], dim=-1)
|
|
attn_mask = torch.cat(
|
|
[attn_mask, torch.ones((attn_mask.shape[0], 1), dtype=torch.long, device=torch_device)],
|
|
dim=1,
|
|
)
|
|
|
|
# get two different outputs
|
|
output_from_no_past = model(next_input_ids, attention_mask=attn_mask)["last_hidden_state"]
|
|
output_from_past = model(next_tokens, past_key_values=past, attention_mask=attn_mask)["last_hidden_state"]
|
|
|
|
# select random slice
|
|
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
|
|
output_from_no_past_slice = output_from_no_past[:, -1, random_slice_idx].detach()
|
|
output_from_past_slice = output_from_past[:, 0, random_slice_idx].detach()
|
|
|
|
# test that outputs are equal for slice
|
|
self.parent.assertTrue(torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3))
|
|
|
|
def create_and_check_gpt2_model_past_large_inputs(
|
|
self, config, input_ids, input_mask, head_mask, token_type_ids, *args
|
|
):
|
|
model = GPT2Model(config=config)
|
|
model.to(torch_device)
|
|
model.eval()
|
|
|
|
# first forward pass
|
|
outputs = model(input_ids, token_type_ids=token_type_ids, attention_mask=input_mask, use_cache=True)
|
|
|
|
output, past = outputs.to_tuple()
|
|
|
|
# create hypothetical next token and extent to next_input_ids
|
|
next_tokens = ids_tensor((self.batch_size, 3), config.vocab_size)
|
|
next_token_types = ids_tensor([self.batch_size, 3], self.type_vocab_size)
|
|
next_mask = ids_tensor((self.batch_size, 3), vocab_size=2)
|
|
|
|
# append to next input_ids and token_type_ids
|
|
next_input_ids = torch.cat([input_ids, next_tokens], dim=-1)
|
|
next_token_type_ids = torch.cat([token_type_ids, next_token_types], dim=-1)
|
|
next_attention_mask = torch.cat([input_mask, next_mask], dim=-1)
|
|
|
|
output_from_no_past = model(
|
|
next_input_ids, token_type_ids=next_token_type_ids, attention_mask=next_attention_mask
|
|
)["last_hidden_state"]
|
|
output_from_past = model(
|
|
next_tokens, token_type_ids=next_token_types, attention_mask=next_attention_mask, past_key_values=past
|
|
)["last_hidden_state"]
|
|
self.parent.assertTrue(output_from_past.shape[1] == next_tokens.shape[1])
|
|
|
|
# select random slice
|
|
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
|
|
output_from_no_past_slice = output_from_no_past[:, -3:, random_slice_idx].detach()
|
|
output_from_past_slice = output_from_past[:, :, random_slice_idx].detach()
|
|
|
|
# test that outputs are equal for slice
|
|
self.parent.assertTrue(torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3))
|
|
|
|
def create_and_check_lm_head_model(self, config, input_ids, input_mask, head_mask, token_type_ids, *args):
|
|
model = GPT2LMHeadModel(config)
|
|
model.to(torch_device)
|
|
model.eval()
|
|
|
|
result = model(input_ids, token_type_ids=token_type_ids, labels=input_ids)
|
|
self.parent.assertEqual(result.loss.shape, ())
|
|
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.seq_length, self.vocab_size))
|
|
|
|
def create_and_check_forward_and_backwards(self, config, input_ids, input_mask, head_mask, token_type_ids, *args):
|
|
model = GPT2LMHeadModel(config)
|
|
model.to(torch_device)
|
|
|
|
result = model(input_ids, token_type_ids=token_type_ids, labels=input_ids)
|
|
self.parent.assertEqual(result.loss.shape, ())
|
|
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.seq_length, self.vocab_size))
|
|
result.loss.backward()
|
|
|
|
def create_and_check_double_lm_head_model(
|
|
self, config, input_ids, input_mask, head_mask, token_type_ids, mc_token_ids, *args
|
|
):
|
|
model = GPT2DoubleHeadsModel(config)
|
|
model.to(torch_device)
|
|
model.eval()
|
|
|
|
multiple_choice_inputs_ids = input_ids.unsqueeze(1).expand(-1, self.num_choices, -1).contiguous()
|
|
multiple_choice_input_mask = input_mask.unsqueeze(1).expand(-1, self.num_choices, -1).contiguous()
|
|
multiple_choice_token_type_ids = token_type_ids.unsqueeze(1).expand(-1, self.num_choices, -1).contiguous()
|
|
|
|
inputs = {
|
|
"input_ids": multiple_choice_inputs_ids,
|
|
"mc_token_ids": mc_token_ids,
|
|
"attention_mask": multiple_choice_input_mask,
|
|
"token_type_ids": multiple_choice_token_type_ids,
|
|
"labels": multiple_choice_inputs_ids,
|
|
}
|
|
|
|
result = model(**inputs)
|
|
self.parent.assertEqual(result.loss.shape, ())
|
|
self.parent.assertEqual(
|
|
result.logits.shape, (self.batch_size, self.num_choices, self.seq_length, self.vocab_size)
|
|
)
|
|
self.parent.assertEqual(result.mc_logits.shape, (self.batch_size, self.num_choices))
|
|
|
|
def create_and_check_gpt2_for_sequence_classification(
|
|
self, config, input_ids, input_mask, head_mask, token_type_ids, mc_token_ids, sequence_labels, *args
|
|
):
|
|
config.num_labels = self.num_labels
|
|
model = GPT2ForSequenceClassification(config)
|
|
model.to(torch_device)
|
|
model.eval()
|
|
print(config.num_labels, sequence_labels.size())
|
|
result = model(input_ids, attention_mask=input_mask, token_type_ids=token_type_ids, labels=sequence_labels)
|
|
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.num_labels))
|
|
|
|
def prepare_config_and_inputs_for_common(self):
|
|
config_and_inputs = self.prepare_config_and_inputs()
|
|
|
|
(
|
|
config,
|
|
input_ids,
|
|
input_mask,
|
|
head_mask,
|
|
token_type_ids,
|
|
mc_token_ids,
|
|
sequence_labels,
|
|
token_labels,
|
|
choice_labels,
|
|
) = config_and_inputs
|
|
|
|
inputs_dict = {
|
|
"input_ids": input_ids,
|
|
"token_type_ids": token_type_ids,
|
|
"head_mask": head_mask,
|
|
}
|
|
|
|
return config, inputs_dict
|
|
|
|
|
|
@require_torch
|
|
class GPT2ModelTest(ModelTesterMixin, GenerationTesterMixin, unittest.TestCase):
|
|
|
|
all_model_classes = (
|
|
(GPT2Model, GPT2LMHeadModel, GPT2DoubleHeadsModel, GPT2ForSequenceClassification)
|
|
if is_torch_available()
|
|
else ()
|
|
)
|
|
all_generative_model_classes = (GPT2LMHeadModel, GPT2DoubleHeadsModel) if is_torch_available() else ()
|
|
all_parallelizable_model_classes = (GPT2LMHeadModel,) if is_torch_available() else ()
|
|
test_missing_keys = False
|
|
test_model_parallel = True
|
|
|
|
# special case for DoubleHeads model
|
|
def _prepare_for_class(self, inputs_dict, model_class, return_labels=False):
|
|
inputs_dict = super()._prepare_for_class(inputs_dict, model_class, return_labels=return_labels)
|
|
|
|
if return_labels:
|
|
if model_class.__name__ == "GPT2DoubleHeadsModel":
|
|
inputs_dict["labels"] = torch.zeros(
|
|
(self.model_tester.batch_size, self.model_tester.num_choices, self.model_tester.seq_length),
|
|
dtype=torch.long,
|
|
device=torch_device,
|
|
)
|
|
inputs_dict["input_ids"] = inputs_dict["labels"]
|
|
inputs_dict["token_type_ids"] = inputs_dict["labels"]
|
|
inputs_dict["mc_token_ids"] = torch.zeros(
|
|
(self.model_tester.batch_size, self.model_tester.num_choices),
|
|
dtype=torch.long,
|
|
device=torch_device,
|
|
)
|
|
inputs_dict["mc_labels"] = torch.zeros(
|
|
self.model_tester.batch_size, dtype=torch.long, device=torch_device
|
|
)
|
|
return inputs_dict
|
|
|
|
def setUp(self):
|
|
self.model_tester = GPT2ModelTester(self)
|
|
self.config_tester = ConfigTester(self, config_class=GPT2Config, n_embd=37)
|
|
|
|
def test_config(self):
|
|
self.config_tester.run_common_tests()
|
|
|
|
def test_gpt2_model(self):
|
|
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
|
self.model_tester.create_and_check_gpt2_model(*config_and_inputs)
|
|
|
|
def test_gpt2_model_past(self):
|
|
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
|
self.model_tester.create_and_check_gpt2_model_past(*config_and_inputs)
|
|
|
|
def test_gpt2_model_att_mask_past(self):
|
|
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
|
self.model_tester.create_and_check_gpt2_model_attention_mask_past(*config_and_inputs)
|
|
|
|
def test_gpt2_model_past_large_inputs(self):
|
|
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
|
self.model_tester.create_and_check_gpt2_model_past_large_inputs(*config_and_inputs)
|
|
|
|
def test_gpt2_lm_head_model(self):
|
|
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
|
self.model_tester.create_and_check_lm_head_model(*config_and_inputs)
|
|
|
|
def test_gpt2_double_lm_head_model(self):
|
|
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
|
self.model_tester.create_and_check_double_lm_head_model(*config_and_inputs)
|
|
|
|
def test_gpt2_sequence_classification_model(self):
|
|
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
|
self.model_tester.create_and_check_gpt2_for_sequence_classification(*config_and_inputs)
|
|
|
|
def test_gpt2_gradient_checkpointing(self):
|
|
config_and_inputs = self.model_tester.prepare_config_and_inputs(gradient_checkpointing=True)
|
|
self.model_tester.create_and_check_forward_and_backwards(*config_and_inputs)
|
|
|
|
@slow
|
|
def test_batch_generation(self):
|
|
model = GPT2LMHeadModel.from_pretrained("gpt2")
|
|
model.to(torch_device)
|
|
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
|
|
|
|
tokenizer.padding_side = "left"
|
|
|
|
# Define PAD Token = EOS Token = 50256
|
|
tokenizer.pad_token = tokenizer.eos_token
|
|
model.config.pad_token_id = model.config.eos_token_id
|
|
|
|
# use different length sentences to test batching
|
|
sentences = [
|
|
"Hello, my dog is a little",
|
|
"Today, I",
|
|
]
|
|
|
|
inputs = tokenizer(sentences, return_tensors="pt", padding=True)
|
|
input_ids = inputs["input_ids"].to(torch_device)
|
|
token_type_ids = torch.cat(
|
|
[
|
|
input_ids.new_full((input_ids.shape[0], input_ids.shape[1] - 1), 0),
|
|
input_ids.new_full((input_ids.shape[0], 1), 500),
|
|
],
|
|
dim=-1,
|
|
)
|
|
|
|
outputs = model.generate(
|
|
input_ids=input_ids,
|
|
attention_mask=inputs["attention_mask"].to(torch_device),
|
|
)
|
|
|
|
outputs_tt = model.generate(
|
|
input_ids=input_ids,
|
|
attention_mask=inputs["attention_mask"].to(torch_device),
|
|
token_type_ids=token_type_ids,
|
|
)
|
|
|
|
inputs_non_padded = tokenizer(sentences[0], return_tensors="pt").input_ids.to(torch_device)
|
|
output_non_padded = model.generate(input_ids=inputs_non_padded)
|
|
|
|
num_paddings = inputs_non_padded.shape[-1] - inputs["attention_mask"][-1].long().sum().cpu().item()
|
|
inputs_padded = tokenizer(sentences[1], return_tensors="pt").input_ids.to(torch_device)
|
|
output_padded = model.generate(input_ids=inputs_padded, max_length=model.config.max_length - num_paddings)
|
|
|
|
batch_out_sentence = tokenizer.batch_decode(outputs, skip_special_tokens=True)
|
|
batch_out_sentence_tt = tokenizer.batch_decode(outputs_tt, skip_special_tokens=True)
|
|
non_padded_sentence = tokenizer.decode(output_non_padded[0], skip_special_tokens=True)
|
|
padded_sentence = tokenizer.decode(output_padded[0], skip_special_tokens=True)
|
|
|
|
expected_output_sentence = [
|
|
"Hello, my dog is a little bit of a mess. I'm not sure if he's going",
|
|
"Today, I'm going to be doing a lot of research on this. I",
|
|
]
|
|
self.assertListEqual(expected_output_sentence, batch_out_sentence)
|
|
self.assertTrue(batch_out_sentence_tt != batch_out_sentence) # token_type_ids should change output
|
|
self.assertListEqual(expected_output_sentence, [non_padded_sentence, padded_sentence])
|
|
|
|
@slow
|
|
def test_batch_generation_2heads(self):
|
|
model = GPT2DoubleHeadsModel.from_pretrained("gpt2")
|
|
model.to(torch_device)
|
|
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
|
|
|
|
tokenizer.padding_side = "left"
|
|
|
|
# This tokenizer has no pad token, so we have to set it in some way
|
|
# Define PAD Token = EOS Token = 50256
|
|
tokenizer.pad_token = tokenizer.eos_token
|
|
model.config.pad_token_id = model.config.eos_token_id
|
|
|
|
# use different length sentences to test batching
|
|
sentences = [
|
|
"Hello, my dog is a little",
|
|
"Today, I",
|
|
]
|
|
|
|
inputs = tokenizer(sentences, return_tensors="pt", padding=True)
|
|
input_ids = inputs["input_ids"].to(torch_device)
|
|
token_type_ids = torch.cat(
|
|
[
|
|
input_ids.new_full((input_ids.shape[0], input_ids.shape[1] - 1), 0),
|
|
input_ids.new_full((input_ids.shape[0], 1), 500),
|
|
],
|
|
dim=-1,
|
|
)
|
|
|
|
outputs = model.generate(
|
|
input_ids=input_ids,
|
|
attention_mask=inputs["attention_mask"].to(torch_device),
|
|
)
|
|
|
|
outputs_tt = model.generate(
|
|
input_ids=input_ids,
|
|
attention_mask=inputs["attention_mask"].to(torch_device),
|
|
token_type_ids=token_type_ids,
|
|
)
|
|
|
|
inputs_non_padded = tokenizer(sentences[0], return_tensors="pt").input_ids.to(torch_device)
|
|
output_non_padded = model.generate(input_ids=inputs_non_padded)
|
|
|
|
num_paddings = inputs_non_padded.shape[-1] - inputs["attention_mask"][-1].long().sum().cpu().item()
|
|
inputs_padded = tokenizer(sentences[1], return_tensors="pt").input_ids.to(torch_device)
|
|
output_padded = model.generate(input_ids=inputs_padded, max_length=model.config.max_length - num_paddings)
|
|
|
|
batch_out_sentence = tokenizer.batch_decode(outputs, skip_special_tokens=True)
|
|
batch_out_sentence_tt = tokenizer.batch_decode(outputs_tt, skip_special_tokens=True)
|
|
non_padded_sentence = tokenizer.decode(output_non_padded[0], skip_special_tokens=True)
|
|
padded_sentence = tokenizer.decode(output_padded[0], skip_special_tokens=True)
|
|
|
|
expected_output_sentence = [
|
|
"Hello, my dog is a little bit of a mess. I'm not sure if he's going",
|
|
"Today, I'm going to be doing a lot of research on this. I",
|
|
]
|
|
self.assertListEqual(expected_output_sentence, batch_out_sentence)
|
|
self.assertTrue(batch_out_sentence_tt != batch_out_sentence) # token_type_ids should change output
|
|
self.assertListEqual(expected_output_sentence, [non_padded_sentence, padded_sentence])
|
|
|
|
@slow
|
|
def test_model_from_pretrained(self):
|
|
for model_name in GPT2_PRETRAINED_MODEL_ARCHIVE_LIST[:1]:
|
|
model = GPT2Model.from_pretrained(model_name)
|
|
self.assertIsNotNone(model)
|
|
|
|
|
|
@require_torch
|
|
class GPT2ModelLanguageGenerationTest(unittest.TestCase):
|
|
@slow
|
|
def test_lm_generate_gpt2(self):
|
|
for checkpointing in [True, False]:
|
|
model = GPT2LMHeadModel.from_pretrained("gpt2", gradient_checkpointing=checkpointing)
|
|
model.to(torch_device)
|
|
input_ids = torch.tensor([[464, 3290]], dtype=torch.long, device=torch_device) # The dog
|
|
expected_output_ids = [
|
|
464,
|
|
3290,
|
|
373,
|
|
1043,
|
|
287,
|
|
257,
|
|
2214,
|
|
1474,
|
|
262,
|
|
16246,
|
|
286,
|
|
2688,
|
|
290,
|
|
2688,
|
|
27262,
|
|
13,
|
|
198,
|
|
198,
|
|
464,
|
|
3290,
|
|
] # The dog was found in a field near the intersection of West and West Streets.\n\nThe dog
|
|
output_ids = model.generate(input_ids, do_sample=False)
|
|
self.assertListEqual(output_ids[0].tolist(), expected_output_ids)
|
|
|
|
@slow
|
|
def test_gpt2_sample(self):
|
|
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
|
|
model = GPT2LMHeadModel.from_pretrained("gpt2")
|
|
model.to(torch_device)
|
|
|
|
torch.manual_seed(0)
|
|
tokenized = tokenizer("Today is a nice day and", return_tensors="pt", return_token_type_ids=True)
|
|
input_ids = tokenized.input_ids.to(torch_device)
|
|
output_ids = model.generate(input_ids, do_sample=True)
|
|
output_str = tokenizer.decode(output_ids[0], skip_special_tokens=True)
|
|
|
|
token_type_ids = tokenized.token_type_ids.to(torch_device)
|
|
output_seq = model.generate(input_ids=input_ids, do_sample=True, num_return_sequences=5)
|
|
output_seq_tt = model.generate(
|
|
input_ids=input_ids, token_type_ids=token_type_ids, do_sample=True, num_return_sequences=5
|
|
)
|
|
output_seq_strs = tokenizer.batch_decode(output_seq, skip_special_tokens=True)
|
|
output_seq_tt_strs = tokenizer.batch_decode(output_seq_tt, skip_special_tokens=True)
|
|
|
|
EXPECTED_OUTPUT_STR = (
|
|
"Today is a nice day and if you don't know anything about the state of play during your holiday"
|
|
)
|
|
self.assertEqual(output_str, EXPECTED_OUTPUT_STR)
|
|
self.assertTrue(
|
|
all([output_seq_strs[idx] != output_seq_tt_strs[idx] for idx in range(len(output_seq_tt_strs))])
|
|
) # token_type_ids should change output
|
|
|
|
@slow
|
|
def test_gpt2_sample_max_time(self):
|
|
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
|
|
model = GPT2LMHeadModel.from_pretrained("gpt2")
|
|
model.to(torch_device)
|
|
|
|
torch.manual_seed(0)
|
|
tokenized = tokenizer("Today is a nice day and", return_tensors="pt", return_token_type_ids=True)
|
|
input_ids = tokenized.input_ids.to(torch_device)
|
|
|
|
MAX_TIME = 0.5
|
|
|
|
start = datetime.datetime.now()
|
|
model.generate(input_ids, do_sample=True, max_time=MAX_TIME, max_length=256)
|
|
duration = datetime.datetime.now() - start
|
|
self.assertGreater(duration, datetime.timedelta(seconds=MAX_TIME))
|
|
self.assertLess(duration, datetime.timedelta(seconds=1.5 * MAX_TIME))
|
|
|
|
start = datetime.datetime.now()
|
|
model.generate(input_ids, do_sample=False, max_time=MAX_TIME, max_length=256)
|
|
duration = datetime.datetime.now() - start
|
|
self.assertGreater(duration, datetime.timedelta(seconds=MAX_TIME))
|
|
self.assertLess(duration, datetime.timedelta(seconds=1.5 * MAX_TIME))
|
|
|
|
start = datetime.datetime.now()
|
|
model.generate(input_ids, do_sample=False, num_beams=2, max_time=MAX_TIME, max_length=256)
|
|
duration = datetime.datetime.now() - start
|
|
self.assertGreater(duration, datetime.timedelta(seconds=MAX_TIME))
|
|
self.assertLess(duration, datetime.timedelta(seconds=1.5 * MAX_TIME))
|
|
|
|
start = datetime.datetime.now()
|
|
model.generate(input_ids, do_sample=True, num_beams=2, max_time=MAX_TIME, max_length=256)
|
|
duration = datetime.datetime.now() - start
|
|
self.assertGreater(duration, datetime.timedelta(seconds=MAX_TIME))
|
|
self.assertLess(duration, datetime.timedelta(seconds=1.5 * MAX_TIME))
|
|
|
|
start = datetime.datetime.now()
|
|
model.generate(input_ids, do_sample=False, max_time=None, max_length=256)
|
|
duration = datetime.datetime.now() - start
|
|
self.assertGreater(duration, datetime.timedelta(seconds=1.5 * MAX_TIME))
|