Use pyupgrade --py39-plus to improve code (#36843)

This commit is contained in:
cyyever 2025-03-20 22:39:44 +08:00 committed by GitHub
parent 3e8f0fbf44
commit ce091b1bda
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 285 additions and 308 deletions

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2023 The HuggingFace Inc. team and the librosa & torchaudio authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -18,7 +17,7 @@ and remove unnecessary dependencies.
"""
import warnings
from typing import List, Optional, Tuple, Union
from typing import Optional, Union
import numpy as np
@ -146,7 +145,7 @@ def chroma_filter_bank(
sampling_rate: int,
tuning: float = 0.0,
power: Optional[float] = 2.0,
weighting_parameters: Optional[Tuple[float, float]] = (5.0, 2.0),
weighting_parameters: Optional[tuple[float, float]] = (5.0, 2.0),
start_at_c_chroma: Optional[bool] = True,
):
"""
@ -592,7 +591,7 @@ def spectrogram(
def spectrogram_batch(
waveform_list: List[np.ndarray],
waveform_list: list[np.ndarray],
window: np.ndarray,
frame_length: int,
hop_length: int,
@ -611,7 +610,7 @@ def spectrogram_batch(
db_range: Optional[float] = None,
remove_dc_offset: Optional[bool] = None,
dtype: np.dtype = np.float32,
) -> List[np.ndarray]:
) -> list[np.ndarray]:
"""
Calculates spectrograms for a list of waveforms using the Short-Time Fourier Transform, optimized for batch processing.
This function extends the capabilities of the `spectrogram` function to handle multiple waveforms efficiently by leveraging broadcasting.

View File

@ -16,7 +16,7 @@ import warnings
from argparse import ArgumentParser
from os import listdir, makedirs
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from typing import Optional
from packaging.version import Version, parse
@ -159,7 +159,7 @@ def ensure_valid_input(model, tokens, input_names):
return ordered_input_names, tuple(model_args)
def infer_shapes(nlp: Pipeline, framework: str) -> Tuple[List[str], List[str], Dict, BatchEncoding]:
def infer_shapes(nlp: Pipeline, framework: str) -> tuple[list[str], list[str], dict, BatchEncoding]:
"""
Attempt to infer the static vs dynamic axes for each input and output tensors for a specific model

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2018 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2018 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -20,7 +19,6 @@ allow to make our dependency on SentencePiece optional.
"""
import warnings
from typing import Dict, List, Tuple
from packaging import version
from tokenizers import AddedToken, Regex, Tokenizer, decoders, normalizers, pre_tokenizers, processors
@ -91,7 +89,7 @@ class SentencePieceExtractor:
self.sp = SentencePieceProcessor()
self.sp.Load(model)
def extract(self, vocab_scores=None) -> Tuple[Dict[str, int], List[Tuple]]:
def extract(self, vocab_scores=None) -> tuple[dict[str, int], list[tuple]]:
"""
By default will return vocab and merges with respect to their order, by sending `vocab_scores` we're going to
order the merges with respect to the piece scores instead.
@ -105,7 +103,7 @@ class SentencePieceExtractor:
class GemmaSentencePieceExtractor(SentencePieceExtractor):
def extract(self, vocab_scores=None) -> Tuple[Dict[str, int], List[Tuple]]:
def extract(self, vocab_scores=None) -> tuple[dict[str, int], list[tuple]]:
"""
By default will return vocab and merges with respect to their order, by sending `vocab_scores` we're going to
order the merges with respect to the piece scores instead.
@ -328,7 +326,7 @@ class OpenAIGPTConverter(Converter):
class GPT2Converter(Converter):
def converted(self, vocab: Dict[str, int] = None, merges: List[Tuple[str, str]] = None) -> Tokenizer:
def converted(self, vocab: dict[str, int] = None, merges: list[tuple[str, str]] = None) -> Tokenizer:
if not vocab:
vocab = self.original_tokenizer.encoder
if not merges:
@ -397,7 +395,7 @@ class HerbertConverter(Converter):
class Qwen2Converter(Converter):
def converted(self, vocab: Dict[str, int] = None, merges: List[Tuple[str, str]] = None) -> Tokenizer:
def converted(self, vocab: dict[str, int] = None, merges: list[tuple[str, str]] = None) -> Tokenizer:
if not vocab:
vocab = self.original_tokenizer.encoder
if not merges:

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2018 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2020 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2021 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -16,7 +15,7 @@
Sequence feature extraction class for common feature extractors to preprocess sequences.
"""
from typing import Dict, List, Optional, Union
from typing import Optional, Union
import numpy as np
@ -54,10 +53,10 @@ class SequenceFeatureExtractor(FeatureExtractionMixin):
self,
processed_features: Union[
BatchFeature,
List[BatchFeature],
Dict[str, BatchFeature],
Dict[str, List[BatchFeature]],
List[Dict[str, BatchFeature]],
list[BatchFeature],
dict[str, BatchFeature],
dict[str, list[BatchFeature]],
list[dict[str, BatchFeature]],
],
padding: Union[bool, str, PaddingStrategy] = True,
max_length: Optional[int] = None,
@ -226,7 +225,7 @@ class SequenceFeatureExtractor(FeatureExtractionMixin):
def _pad(
self,
processed_features: Union[Dict[str, np.ndarray], BatchFeature],
processed_features: Union[dict[str, np.ndarray], BatchFeature],
max_length: Optional[int] = None,
padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD,
pad_to_multiple_of: Optional[int] = None,
@ -298,7 +297,7 @@ class SequenceFeatureExtractor(FeatureExtractionMixin):
def _truncate(
self,
processed_features: Union[Dict[str, np.ndarray], BatchFeature],
processed_features: Union[dict[str, np.ndarray], BatchFeature],
max_length: Optional[int] = None,
pad_to_multiple_of: Optional[int] = None,
truncation: Optional[bool] = None,

View File

@ -18,11 +18,12 @@ import os
import sys
import types
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, ArgumentTypeError
from collections.abc import Iterable
from copy import copy
from enum import Enum
from inspect import isclass
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Literal, NewType, Optional, Tuple, Union, get_type_hints
from typing import Any, Callable, Literal, NewType, Optional, Union, get_type_hints
import yaml
@ -62,7 +63,7 @@ def make_choice_type_function(choices: list) -> Callable[[str], Any]:
def HfArg(
*,
aliases: Union[str, List[str]] = None,
aliases: Union[str, list[str]] = None,
help: str = None,
default: Any = dataclasses.MISSING,
default_factory: Callable[[], Any] = dataclasses.MISSING,
@ -254,7 +255,7 @@ class HfArgumentParser(ArgumentParser):
parser = self
try:
type_hints: Dict[str, type] = get_type_hints(dtype)
type_hints: dict[str, type] = get_type_hints(dtype)
except NameError:
raise RuntimeError(
f"Type resolution failed for {dtype}. Try declaring the class in global scope or "
@ -288,7 +289,7 @@ class HfArgumentParser(ArgumentParser):
look_for_args_file=True,
args_filename=None,
args_file_flag=None,
) -> Tuple[DataClass, ...]:
) -> tuple[DataClass, ...]:
"""
Parse command-line args into instances of the specified dataclass types.
@ -367,7 +368,7 @@ class HfArgumentParser(ArgumentParser):
return (*outputs,)
def parse_dict(self, args: Dict[str, Any], allow_extra_keys: bool = False) -> Tuple[DataClass, ...]:
def parse_dict(self, args: dict[str, Any], allow_extra_keys: bool = False) -> tuple[DataClass, ...]:
"""
Alternative helper method that does not use `argparse` at all, instead uses a dict and populating the dataclass
types.
@ -397,7 +398,7 @@ class HfArgumentParser(ArgumentParser):
def parse_json_file(
self, json_file: Union[str, os.PathLike], allow_extra_keys: bool = False
) -> Tuple[DataClass, ...]:
) -> tuple[DataClass, ...]:
"""
Alternative helper method that does not use `argparse` at all, instead loading a json file and populating the
dataclass types.
@ -421,7 +422,7 @@ class HfArgumentParser(ArgumentParser):
def parse_yaml_file(
self, yaml_file: Union[str, os.PathLike], allow_extra_keys: bool = False
) -> Tuple[DataClass, ...]:
) -> tuple[DataClass, ...]:
"""
Alternative helper method that does not use `argparse` at all, instead loading a yaml file and populating the
dataclass types.

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2023-present the HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2022 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -14,7 +13,8 @@
# limitations under the License.
import math
from typing import Dict, Iterable, Optional, Union
from collections.abc import Iterable
from typing import Optional, Union
import numpy as np
@ -116,7 +116,7 @@ class BaseImageProcessor(ImageProcessingMixin):
def center_crop(
self,
image: np.ndarray,
size: Dict[str, int],
size: dict[str, int],
data_format: Optional[Union[str, ChannelDimension]] = None,
input_data_format: Optional[Union[str, ChannelDimension]] = None,
**kwargs,
@ -207,7 +207,7 @@ def convert_to_size_dict(
def get_size_dict(
size: Union[int, Iterable[int], Dict[str, int]] = None,
size: Union[int, Iterable[int], dict[str, int]] = None,
max_size: Optional[int] = None,
height_width_order: bool = True,
default_to_square: bool = True,

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2024 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -13,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections.abc import Iterable
from functools import lru_cache, partial
from typing import Any, Dict, Iterable, List, Optional, Tuple, TypedDict, Union
from typing import Any, Optional, TypedDict, Union
import numpy as np
@ -77,8 +77,8 @@ def validate_fast_preprocess_arguments(
do_rescale: Optional[bool] = None,
rescale_factor: Optional[float] = None,
do_normalize: Optional[bool] = None,
image_mean: Optional[Union[float, List[float]]] = None,
image_std: Optional[Union[float, List[float]]] = None,
image_mean: Optional[Union[float, list[float]]] = None,
image_std: Optional[Union[float, list[float]]] = None,
do_pad: Optional[bool] = None,
size_divisibility: Optional[int] = None,
do_center_crop: Optional[bool] = None,
@ -128,14 +128,14 @@ def safe_squeeze(tensor: "torch.Tensor", axis: Optional[int] = None) -> "torch.T
return tensor
def max_across_indices(values: Iterable[Any]) -> List[Any]:
def max_across_indices(values: Iterable[Any]) -> list[Any]:
"""
Return the maximum value across all indices of an iterable of values.
"""
return [max(values_i) for values_i in zip(*values)]
def get_max_height_width(images: List["torch.Tensor"]) -> Tuple[int]:
def get_max_height_width(images: list["torch.Tensor"]) -> tuple[int]:
"""
Get the maximum height and width across all images in a batch.
"""
@ -147,7 +147,7 @@ def get_max_height_width(images: List["torch.Tensor"]) -> Tuple[int]:
def divide_to_patches(
image: Union[np.array, "torch.Tensor"], patch_size: int
) -> List[Union[np.array, "torch.Tensor"]]:
) -> list[Union[np.array, "torch.Tensor"]]:
"""
Divides an image into patches of a specified size.
@ -171,16 +171,16 @@ def divide_to_patches(
class DefaultFastImageProcessorKwargs(TypedDict, total=False):
do_resize: Optional[bool]
size: Optional[Dict[str, int]]
size: Optional[dict[str, int]]
default_to_square: Optional[bool]
resample: Optional[Union["PILImageResampling", "F.InterpolationMode"]]
do_center_crop: Optional[bool]
crop_size: Optional[Dict[str, int]]
crop_size: Optional[dict[str, int]]
do_rescale: Optional[bool]
rescale_factor: Optional[Union[int, float]]
do_normalize: Optional[bool]
image_mean: Optional[Union[float, List[float]]]
image_std: Optional[Union[float, List[float]]]
image_mean: Optional[Union[float, list[float]]]
image_std: Optional[Union[float, list[float]]]
do_convert_rgb: Optional[bool]
return_tensors: Optional[Union[str, TensorType]]
data_format: Optional[ChannelDimension]
@ -427,8 +427,8 @@ class BaseImageProcessorFast(BaseImageProcessor):
def _fuse_mean_std_and_rescale_factor(
self,
do_normalize: Optional[bool] = None,
image_mean: Optional[Union[float, List[float]]] = None,
image_std: Optional[Union[float, List[float]]] = None,
image_mean: Optional[Union[float, list[float]]] = None,
image_std: Optional[Union[float, list[float]]] = None,
do_rescale: Optional[bool] = None,
rescale_factor: Optional[float] = None,
device: Optional["torch.device"] = None,
@ -446,8 +446,8 @@ class BaseImageProcessorFast(BaseImageProcessor):
do_rescale: bool,
rescale_factor: float,
do_normalize: bool,
image_mean: Union[float, List[float]],
image_std: Union[float, List[float]],
image_mean: Union[float, list[float]],
image_std: Union[float, list[float]],
) -> "torch.Tensor":
"""
Rescale and normalize images.
@ -471,7 +471,7 @@ class BaseImageProcessorFast(BaseImageProcessor):
def center_crop(
self,
image: "torch.Tensor",
size: Dict[str, int],
size: dict[str, int],
**kwargs,
) -> "torch.Tensor":
"""
@ -576,7 +576,7 @@ class BaseImageProcessorFast(BaseImageProcessor):
do_convert_rgb: bool = None,
input_data_format: Optional[Union[str, ChannelDimension]] = None,
device: Optional["torch.device"] = None,
) -> List["torch.Tensor"]:
) -> list["torch.Tensor"]:
"""
Prepare the input images for processing.
"""
@ -599,8 +599,8 @@ class BaseImageProcessorFast(BaseImageProcessor):
size: Optional[SizeDict] = None,
crop_size: Optional[SizeDict] = None,
default_to_square: Optional[bool] = None,
image_mean: Optional[Union[float, List[float]]] = None,
image_std: Optional[Union[float, List[float]]] = None,
image_mean: Optional[Union[float, list[float]]] = None,
image_std: Optional[Union[float, list[float]]] = None,
data_format: Optional[ChannelDimension] = None,
**kwargs,
) -> dict:
@ -701,7 +701,7 @@ class BaseImageProcessorFast(BaseImageProcessor):
def _preprocess(
self,
images: List["torch.Tensor"],
images: list["torch.Tensor"],
do_resize: bool,
size: SizeDict,
interpolation: Optional["F.InterpolationMode"],
@ -710,8 +710,8 @@ class BaseImageProcessorFast(BaseImageProcessor):
do_rescale: bool,
rescale_factor: float,
do_normalize: bool,
image_mean: Optional[Union[float, List[float]]],
image_std: Optional[Union[float, List[float]]],
image_mean: Optional[Union[float, list[float]]],
image_std: Optional[Union[float, list[float]]],
return_tensors: Optional[Union[str, TensorType]],
**kwargs,
) -> BatchFeature:
@ -749,7 +749,7 @@ class BaseImageProcessorFast(BaseImageProcessor):
class SemanticSegmentationMixin:
def post_process_semantic_segmentation(self, outputs, target_sizes: List[Tuple] = None):
def post_process_semantic_segmentation(self, outputs, target_sizes: list[tuple] = None):
"""
Converts the output of [`MobileNetV2ForSemanticSegmentation`] into semantic segmentation maps. Only supports PyTorch.

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2022 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -14,9 +13,9 @@
# limitations under the License.
import warnings
from collections.abc import Collection
from collections.abc import Collection, Iterable
from math import ceil
from typing import Dict, Iterable, List, Optional, Tuple, Union
from typing import Optional, Union
import numpy as np
@ -86,7 +85,7 @@ def to_channel_dimension_format(
elif target_channel_dim == ChannelDimension.LAST:
image = image.transpose((1, 2, 0))
else:
raise ValueError("Unsupported channel dimension format: {}".format(channel_dim))
raise ValueError(f"Unsupported channel dimension format: {channel_dim}")
return image
@ -192,7 +191,7 @@ def to_pil_image(
elif is_jax_tensor(image):
image = np.array(image)
elif not isinstance(image, np.ndarray):
raise ValueError("Input image type not supported: {}".format(type(image)))
raise ValueError(f"Input image type not supported: {type(image)}")
# If the channel has been moved to first dim, we put it back at the end.
image = to_channel_dimension_format(image, ChannelDimension.LAST, input_data_format)
@ -210,7 +209,7 @@ def to_pil_image(
return PIL.Image.fromarray(image, mode=image_mode)
def get_size_with_aspect_ratio(image_size, size, max_size=None) -> Tuple[int, int]:
def get_size_with_aspect_ratio(image_size, size, max_size=None) -> tuple[int, int]:
"""
Computes the output image size given the input image size and the desired output size.
@ -252,7 +251,7 @@ def get_size_with_aspect_ratio(image_size, size, max_size=None) -> Tuple[int, in
# Logic adapted from torchvision resizing logic: https://github.com/pytorch/vision/blob/511924c1ced4ce0461197e5caa64ce5b9e558aab/torchvision/transforms/functional.py#L366
def get_resize_output_image_size(
input_image: np.ndarray,
size: Union[int, Tuple[int, int], List[int], Tuple[int]],
size: Union[int, tuple[int, int], list[int], tuple[int]],
default_to_square: bool = True,
max_size: Optional[int] = None,
input_data_format: Optional[Union[str, ChannelDimension]] = None,
@ -319,7 +318,7 @@ def get_resize_output_image_size(
def resize(
image: np.ndarray,
size: Tuple[int, int],
size: tuple[int, int],
resample: "PILImageResampling" = None,
reducing_gap: Optional[int] = None,
data_format: Optional[ChannelDimension] = None,
@ -451,7 +450,7 @@ def normalize(
def center_crop(
image: np.ndarray,
size: Tuple[int, int],
size: tuple[int, int],
data_format: Optional[Union[str, ChannelDimension]] = None,
input_data_format: Optional[Union[str, ChannelDimension]] = None,
return_numpy: Optional[bool] = None,
@ -705,7 +704,7 @@ class PaddingMode(ExplicitEnum):
def pad(
image: np.ndarray,
padding: Union[int, Tuple[int, int], Iterable[Tuple[int, int]]],
padding: Union[int, tuple[int, int], Iterable[tuple[int, int]]],
mode: PaddingMode = PaddingMode.CONSTANT,
constant_values: Union[float, Iterable[float]] = 0.0,
data_format: Optional[Union[str, ChannelDimension]] = None,
@ -855,8 +854,8 @@ def _cast_tensor_to_float(x):
def group_images_by_shape(
images: List["torch.Tensor"],
) -> Tuple[Dict[Tuple[int, int], List["torch.Tensor"]], Dict[int, Tuple[Tuple[int, int], int]]]:
images: list["torch.Tensor"],
) -> tuple[dict[tuple[int, int], list["torch.Tensor"]], dict[int, tuple[tuple[int, int], int]]]:
"""
Groups images by shape.
Returns a dictionary with the shape as key and a list of images with that shape as value,
@ -876,8 +875,8 @@ def group_images_by_shape(
def reorder_images(
processed_images: Dict[Tuple[int, int], "torch.Tensor"], grouped_images_index: Dict[int, Tuple[int, int]]
) -> List["torch.Tensor"]:
processed_images: dict[tuple[int, int], "torch.Tensor"], grouped_images_index: dict[int, tuple[int, int]]
) -> list["torch.Tensor"]:
"""
Reconstructs a list of images in the original order.
"""

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2021 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -15,10 +14,11 @@
import base64
import os
from collections.abc import Iterable
from contextlib import redirect_stdout
from dataclasses import dataclass
from io import BytesIO
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Callable, Optional, Union
import numpy as np
import requests
@ -83,19 +83,19 @@ logger = logging.get_logger(__name__)
ImageInput = Union[
"PIL.Image.Image", np.ndarray, "torch.Tensor", List["PIL.Image.Image"], List[np.ndarray], List["torch.Tensor"]
"PIL.Image.Image", np.ndarray, "torch.Tensor", list["PIL.Image.Image"], list[np.ndarray], list["torch.Tensor"]
] # noqa
VideoInput = Union[
List["PIL.Image.Image"],
list["PIL.Image.Image"],
"np.ndarray",
"torch.Tensor",
List["np.ndarray"],
List["torch.Tensor"],
List[List["PIL.Image.Image"]],
List[List["np.ndarrray"]],
List[List["torch.Tensor"]],
list["np.ndarray"],
list["torch.Tensor"],
list[list["PIL.Image.Image"]],
list[list["np.ndarrray"]],
list[list["torch.Tensor"]],
] # noqa
@ -122,7 +122,7 @@ class VideoMetadata:
video_backend: str
AnnotationType = Dict[str, Union[int, str, List[Dict]]]
AnnotationType = dict[str, Union[int, str, list[dict]]]
def is_pil_image(img):
@ -155,7 +155,7 @@ def is_valid_image(img):
return is_pil_image(img) or is_numpy_array(img) or is_torch_tensor(img) or is_tf_tensor(img) or is_jax_tensor(img)
def is_valid_list_of_images(images: List):
def is_valid_list_of_images(images: list):
return images and all(is_valid_image(image) for image in images)
@ -188,7 +188,7 @@ def is_scaled_image(image: np.ndarray) -> bool:
return np.min(image) >= 0 and np.max(image) <= 1
def make_list_of_images(images, expected_ndims: int = 3) -> List[ImageInput]:
def make_list_of_images(images, expected_ndims: int = 3) -> list[ImageInput]:
"""
Ensure that the output is a list of images. If the input is a single image, it is converted to a list of length 1.
If the input is a batch of images, it is converted to a list of images.
@ -228,7 +228,7 @@ def make_list_of_images(images, expected_ndims: int = 3) -> List[ImageInput]:
def make_flat_list_of_images(
images: Union[List[ImageInput], ImageInput],
images: Union[list[ImageInput], ImageInput],
) -> ImageInput:
"""
Ensure that the output is a flat list of images. If the input is a single image, it is converted to a list of length 1.
@ -263,7 +263,7 @@ def make_flat_list_of_images(
def make_nested_list_of_images(
images: Union[List[ImageInput], ImageInput],
images: Union[list[ImageInput], ImageInput],
) -> ImageInput:
"""
Ensure that the output is a nested list of images.
@ -339,7 +339,7 @@ def to_numpy_array(img) -> np.ndarray:
def infer_channel_dimension_format(
image: np.ndarray, num_channels: Optional[Union[int, Tuple[int, ...]]] = None
image: np.ndarray, num_channels: Optional[Union[int, tuple[int, ...]]] = None
) -> ChannelDimension:
"""
Infers the channel dimension format of `image`.
@ -399,7 +399,7 @@ def get_channel_dimension_axis(
raise ValueError(f"Unsupported data format: {input_data_format}")
def get_image_size(image: np.ndarray, channel_dim: ChannelDimension = None) -> Tuple[int, int]:
def get_image_size(image: np.ndarray, channel_dim: ChannelDimension = None) -> tuple[int, int]:
"""
Returns the (height, width) dimensions of the image.
@ -424,10 +424,10 @@ def get_image_size(image: np.ndarray, channel_dim: ChannelDimension = None) -> T
def get_image_size_for_max_height_width(
image_size: Tuple[int, int],
image_size: tuple[int, int],
max_height: int,
max_width: int,
) -> Tuple[int, int]:
) -> tuple[int, int]:
"""
Computes the output image size given the input image and the maximum allowed height and width. Keep aspect ratio.
Important, even if image_height < max_height and image_width < max_width, the image will be resized
@ -454,7 +454,7 @@ def get_image_size_for_max_height_width(
return new_height, new_width
def is_valid_annotation_coco_detection(annotation: Dict[str, Union[List, Tuple]]) -> bool:
def is_valid_annotation_coco_detection(annotation: dict[str, Union[list, tuple]]) -> bool:
if (
isinstance(annotation, dict)
and "image_id" in annotation
@ -469,7 +469,7 @@ def is_valid_annotation_coco_detection(annotation: Dict[str, Union[List, Tuple]]
return False
def is_valid_annotation_coco_panoptic(annotation: Dict[str, Union[List, Tuple]]) -> bool:
def is_valid_annotation_coco_panoptic(annotation: dict[str, Union[list, tuple]]) -> bool:
if (
isinstance(annotation, dict)
and "image_id" in annotation
@ -485,11 +485,11 @@ def is_valid_annotation_coco_panoptic(annotation: Dict[str, Union[List, Tuple]])
return False
def valid_coco_detection_annotations(annotations: Iterable[Dict[str, Union[List, Tuple]]]) -> bool:
def valid_coco_detection_annotations(annotations: Iterable[dict[str, Union[list, tuple]]]) -> bool:
return all(is_valid_annotation_coco_detection(ann) for ann in annotations)
def valid_coco_panoptic_annotations(annotations: Iterable[Dict[str, Union[List, Tuple]]]) -> bool:
def valid_coco_panoptic_annotations(annotations: Iterable[dict[str, Union[list, tuple]]]) -> bool:
return all(is_valid_annotation_coco_panoptic(ann) for ann in annotations)
@ -880,8 +880,8 @@ def load_video(
def load_images(
images: Union[List, Tuple, str, "PIL.Image.Image"], timeout: Optional[float] = None
) -> Union["PIL.Image.Image", List["PIL.Image.Image"], List[List["PIL.Image.Image"]]]:
images: Union[list, tuple, str, "PIL.Image.Image"], timeout: Optional[float] = None
) -> Union["PIL.Image.Image", list["PIL.Image.Image"], list[list["PIL.Image.Image"]]]:
"""Loads images, handling different levels of nesting.
Args:
@ -904,14 +904,14 @@ def validate_preprocess_arguments(
do_rescale: Optional[bool] = None,
rescale_factor: Optional[float] = None,
do_normalize: Optional[bool] = None,
image_mean: Optional[Union[float, List[float]]] = None,
image_std: Optional[Union[float, List[float]]] = None,
image_mean: Optional[Union[float, list[float]]] = None,
image_std: Optional[Union[float, list[float]]] = None,
do_pad: Optional[bool] = None,
size_divisibility: Optional[int] = None,
do_center_crop: Optional[bool] = None,
crop_size: Optional[Dict[str, int]] = None,
crop_size: Optional[dict[str, int]] = None,
do_resize: Optional[bool] = None,
size: Optional[Dict[str, int]] = None,
size: Optional[dict[str, int]] = None,
resample: Optional["PILImageResampling"] = None,
):
"""
@ -1295,8 +1295,8 @@ class ImageFeatureExtractionMixin:
def validate_annotations(
annotation_format: AnnotationFormat,
supported_annotation_formats: Tuple[AnnotationFormat, ...],
annotations: List[Dict],
supported_annotation_formats: tuple[AnnotationFormat, ...],
annotations: list[dict],
) -> None:
if annotation_format not in supported_annotation_formats:
raise ValueError(f"Unsupported annotation format: {format} must be one of {supported_annotation_formats}")
@ -1318,7 +1318,7 @@ def validate_annotations(
)
def validate_kwargs(valid_processor_keys: List[str], captured_kwargs: List[str]):
def validate_kwargs(valid_processor_keys: list[str], captured_kwargs: list[str]):
unused_keys = set(captured_kwargs).difference(set(valid_processor_keys))
if unused_keys:
unused_key_str = ", ".join(unused_keys)

View File

@ -2,7 +2,7 @@ import logging
import os
from pathlib import Path
from time import sleep
from typing import Callable, List, Optional, Union
from typing import Callable, Optional, Union
import numpy as np
import tensorflow as tf
@ -79,8 +79,8 @@ class KerasMetricCallback(keras.callbacks.Callback):
self,
metric_fn: Callable,
eval_dataset: Union[tf.data.Dataset, np.ndarray, tf.Tensor, tuple, dict],
output_cols: Optional[List[str]] = None,
label_cols: Optional[List[str]] = None,
output_cols: Optional[list[str]] = None,
label_cols: Optional[list[str]] = None,
batch_size: Optional[int] = None,
predict_with_generate: bool = False,
use_xla_generation: bool = False,

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from typing import List, Optional, Tuple, Union
from typing import Optional, Union
import torch
@ -301,7 +301,7 @@ class AttentionMaskConverter:
def _prepare_4d_causal_attention_mask(
attention_mask: Optional[torch.Tensor],
input_shape: Union[torch.Size, Tuple, List],
input_shape: Union[torch.Size, tuple, list],
inputs_embeds: torch.Tensor,
past_key_values_length: int,
sliding_window: Optional[int] = None,
@ -354,7 +354,7 @@ def _prepare_4d_causal_attention_mask(
# Adapted from _prepare_4d_causal_attention_mask
def _prepare_4d_causal_attention_mask_for_sdpa(
attention_mask: Optional[torch.Tensor],
input_shape: Union[torch.Size, Tuple, List],
input_shape: Union[torch.Size, tuple, list],
inputs_embeds: torch.Tensor,
past_key_values_length: int,
sliding_window: Optional[int] = None,
@ -452,7 +452,7 @@ def _prepare_4d_attention_mask_for_sdpa(mask: torch.Tensor, dtype: torch.dtype,
def _create_4d_causal_attention_mask(
input_shape: Union[torch.Size, Tuple, List],
input_shape: Union[torch.Size, tuple, list],
dtype: torch.dtype,
device: torch.device,
past_key_values_length: int = 0,

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2024 The Fairseq Authors and the HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -15,7 +14,7 @@
import inspect
import os
from typing import Optional, Tuple, TypedDict
from typing import Optional, TypedDict
import torch
import torch.nn.functional as F
@ -33,7 +32,7 @@ if is_flash_attn_2_available():
_flash_supports_window_size = "window_size" in list(inspect.signature(flash_attn_func).parameters)
def _get_unpad_data(attention_mask: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, int]:
def _get_unpad_data(attention_mask: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor, int]:
"""
Retrieves indexing data required to repad unpadded (ragged) tensors.

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2024 The ggml.ai team and The HuggingFace Inc. team. and pygguf author (github.com/99991)
# https://github.com/99991/pygguf
#
@ -15,7 +14,7 @@
# limitations under the License.
import re
from typing import Dict, NamedTuple, Optional
from typing import NamedTuple, Optional
import numpy as np
from tqdm.auto import tqdm
@ -115,7 +114,7 @@ class Qwen2MoeTensorProcessor(TensorProcessor):
return GGUFTensor(weights, name, {})
def _split_moe_expert_tensor(
self, weights: np.ndarray, parsed_parameters: Dict[str, Dict], name: str, tensor_key_mapping: dict
self, weights: np.ndarray, parsed_parameters: dict[str, dict], name: str, tensor_key_mapping: dict
):
# Original merge implementation
# https://github.com/ggerganov/llama.cpp/blob/master/convert_hf_to_gguf.py#L1994-L2022

View File

@ -13,7 +13,7 @@
# limitations under the License.
import math
from typing import Optional, Tuple
from typing import Optional
from .configuration_utils import PretrainedConfig
from .utils import is_torch_available, logging
@ -31,7 +31,7 @@ def _compute_default_rope_parameters(
device: Optional["torch.device"] = None,
seq_len: Optional[int] = None,
**rope_kwargs,
) -> Tuple["torch.Tensor", float]:
) -> tuple["torch.Tensor", float]:
"""
Computes the inverse frequencies according to the original RoPE implementation
Args:
@ -73,7 +73,7 @@ def _compute_linear_scaling_rope_parameters(
device: Optional["torch.device"] = None,
seq_len: Optional[int] = None,
**rope_kwargs,
) -> Tuple["torch.Tensor", float]:
) -> tuple["torch.Tensor", float]:
"""
Computes the inverse frequencies with linear scaling. Credits to the Reddit user /u/kaiokendev
Args:
@ -114,7 +114,7 @@ def _compute_dynamic_ntk_parameters(
device: Optional["torch.device"] = None,
seq_len: Optional[int] = None,
**rope_kwargs,
) -> Tuple["torch.Tensor", float]:
) -> tuple["torch.Tensor", float]:
"""
Computes the inverse frequencies with NTK scaling. Credits to the Reddit users /u/bloc97 and /u/emozilla
Args:
@ -162,7 +162,7 @@ def _compute_dynamic_ntk_parameters(
def _compute_yarn_parameters(
config: PretrainedConfig, device: "torch.device", seq_len: Optional[int] = None, **rope_kwargs
) -> Tuple["torch.Tensor", float]:
) -> tuple["torch.Tensor", float]:
"""
Computes the inverse frequencies with NTK scaling. Please refer to the
[original paper](https://arxiv.org/abs/2309.00071)
@ -241,7 +241,7 @@ def _compute_yarn_parameters(
def _compute_longrope_parameters(
config: PretrainedConfig, device: "torch.device", seq_len: Optional[int] = None, **rope_kwargs
) -> Tuple["torch.Tensor", float]:
) -> tuple["torch.Tensor", float]:
"""
Computes the inverse frequencies with LongRoPE scaling. Please refer to the
[original implementation](https://github.com/microsoft/LongRoPE)
@ -304,7 +304,7 @@ def _compute_longrope_parameters(
def _compute_llama3_parameters(
config: PretrainedConfig, device: "torch.device", seq_len: Optional[int] = None, **rope_kwargs
) -> Tuple["torch.Tensor", float]:
) -> tuple["torch.Tensor", float]:
"""
Computes the inverse frequencies for llama 3.1.

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2018 The Google AI Language Team Authors and The HuggingFace Inc. team.
# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved.
#

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2018 The Google AI Language Team Authors and The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -15,7 +15,7 @@
"""Functions and classes related to optimization (weight updates)."""
import re
from typing import Callable, List, Optional, Union
from typing import Callable, Optional, Union
import tensorflow as tf
@ -105,7 +105,7 @@ def create_optimizer(
adam_global_clipnorm: Optional[float] = None,
weight_decay_rate: float = 0.0,
power: float = 1.0,
include_in_weight_decay: Optional[List[str]] = None,
include_in_weight_decay: Optional[list[str]] = None,
):
"""
Creates an optimizer with a learning rate schedule using a warmup phase followed by a linear decay.
@ -224,8 +224,8 @@ class AdamWeightDecay(Adam):
epsilon: float = 1e-7,
amsgrad: bool = False,
weight_decay_rate: float = 0.0,
include_in_weight_decay: Optional[List[str]] = None,
exclude_from_weight_decay: Optional[List[str]] = None,
include_in_weight_decay: Optional[list[str]] = None,
exclude_from_weight_decay: Optional[list[str]] = None,
name: str = "AdamWeightDecay",
**kwargs,
):
@ -238,10 +238,10 @@ class AdamWeightDecay(Adam):
def from_config(cls, config):
"""Creates an optimizer from its config with WarmUp custom object."""
custom_objects = {"WarmUp": WarmUp}
return super(AdamWeightDecay, cls).from_config(config, custom_objects=custom_objects)
return super().from_config(config, custom_objects=custom_objects)
def _prepare_local(self, var_device, var_dtype, apply_state):
super(AdamWeightDecay, self)._prepare_local(var_device, var_dtype, apply_state)
super()._prepare_local(var_device, var_dtype, apply_state)
apply_state[(var_device, var_dtype)]["weight_decay_rate"] = tf.constant(
self.weight_decay_rate, name="adam_weight_decay_rate"
)
@ -257,7 +257,7 @@ class AdamWeightDecay(Adam):
def apply_gradients(self, grads_and_vars, name=None, **kwargs):
grads, tvars = list(zip(*grads_and_vars))
return super(AdamWeightDecay, self).apply_gradients(zip(grads, tvars), name=name, **kwargs)
return super().apply_gradients(zip(grads, tvars), name=name, **kwargs)
def _get_lr(self, var_device, var_dtype, apply_state):
"""Retrieves the learning rate with the given state."""
@ -276,13 +276,13 @@ class AdamWeightDecay(Adam):
lr_t, kwargs = self._get_lr(var.device, var.dtype.base_dtype, apply_state)
decay = self._decay_weights_op(var, lr_t, apply_state)
with tf.control_dependencies([decay]):
return super(AdamWeightDecay, self)._resource_apply_dense(grad, var, **kwargs)
return super()._resource_apply_dense(grad, var, **kwargs)
def _resource_apply_sparse(self, grad, var, indices, apply_state=None):
lr_t, kwargs = self._get_lr(var.device, var.dtype.base_dtype, apply_state)
decay = self._decay_weights_op(var, lr_t, apply_state)
with tf.control_dependencies([decay]):
return super(AdamWeightDecay, self)._resource_apply_sparse(grad, var, indices, **kwargs)
return super()._resource_apply_sparse(grad, var, indices, **kwargs)
def get_config(self):
config = super().get_config()

View File

@ -15,7 +15,7 @@ from __future__ import annotations
import inspect
from functools import lru_cache, wraps
from typing import Callable, List, Optional, Set, Tuple, Union
from typing import Callable
import torch
from packaging import version
@ -157,9 +157,7 @@ def prune_conv1d_layer(layer: Conv1D, index: torch.LongTensor, dim: int = 1) ->
return new_layer
def prune_layer(
layer: Union[nn.Linear, Conv1D], index: torch.LongTensor, dim: Optional[int] = None
) -> Union[nn.Linear, Conv1D]:
def prune_layer(layer: nn.Linear | Conv1D, index: torch.LongTensor, dim: int | None = None) -> nn.Linear | Conv1D:
"""
Prune a Conv1D or linear layer to keep only entries in index.
@ -260,8 +258,8 @@ def apply_chunking_to_forward(
def find_pruneable_heads_and_indices(
heads: List[int], n_heads: int, head_size: int, already_pruned_heads: Set[int]
) -> Tuple[Set[int], torch.LongTensor]:
heads: list[int], n_heads: int, head_size: int, already_pruned_heads: set[int]
) -> tuple[set[int], torch.LongTensor]:
"""
Finds the heads and their indices taking `already_pruned_heads` into account.
@ -286,9 +284,7 @@ def find_pruneable_heads_and_indices(
return heads, index
def meshgrid(
*tensors: Union[torch.Tensor, List[torch.Tensor]], indexing: Optional[str] = None
) -> Tuple[torch.Tensor, ...]:
def meshgrid(*tensors: torch.Tensor | list[torch.Tensor], indexing: str | None = None) -> tuple[torch.Tensor, ...]:
"""
Wrapper around torch.meshgrid to avoid warning messages about the introduced `indexing` argument.
@ -297,7 +293,7 @@ def meshgrid(
return torch.meshgrid(*tensors, indexing=indexing)
def id_tensor_storage(tensor: torch.Tensor) -> Tuple[torch.device, int, int]:
def id_tensor_storage(tensor: torch.Tensor) -> tuple[torch.device, int, int]:
"""
Unique identifier to a tensor storage. Multiple different tensors can share the same underlying storage. For
example, "meta" tensors all share the same storage, and thus their identifier will all be equal. This identifier is

View File

@ -33,12 +33,12 @@ import threading
import time
import unittest
from collections import UserDict, defaultdict
from collections.abc import Mapping
from collections.abc import Generator, Iterable, Iterator, Mapping
from dataclasses import MISSING, fields
from functools import cache, wraps
from io import StringIO
from pathlib import Path
from typing import Any, Callable, Dict, Generator, Iterable, Iterator, List, Optional, Union
from typing import Any, Callable, Optional, Union
from unittest import mock
from unittest.mock import patch
@ -1456,14 +1456,13 @@ def get_steps_per_epoch(trainer: Trainer) -> int:
def evaluate_side_effect_factory(
side_effect_values: List[Dict[str, float]],
) -> Generator[Dict[str, float], None, None]:
side_effect_values: list[dict[str, float]],
) -> Generator[dict[str, float], None, None]:
"""
Function that returns side effects for the _evaluate method.
Used when we're unsure of exactly how many times _evaluate will be called.
"""
for side_effect_value in side_effect_values:
yield side_effect_value
yield from side_effect_values
while True:
yield side_effect_values[-1]
@ -2444,7 +2443,7 @@ def nested_simplify(obj, decimals=3):
def check_json_file_has_correct_format(file_path):
with open(file_path, "r") as f:
with open(file_path) as f:
lines = f.readlines()
if len(lines) == 1:
# length can only be 1 if dict is empty
@ -2471,7 +2470,7 @@ class SubprocessCallException(Exception):
pass
def run_command(command: List[str], return_stdout=False):
def run_command(command: list[str], return_stdout=False):
"""
Runs `command` with `subprocess.check_output` and will potentially return the `stdout`. Will also properly capture
if an error occurred while running `command`
@ -2904,7 +2903,7 @@ class HfDoctestModule(Module):
yield DoctestItem.from_parent(self, name=test.name, runner=runner, dtest=test)
def _device_agnostic_dispatch(device: str, dispatch_table: Dict[str, Callable], *args, **kwargs):
def _device_agnostic_dispatch(device: str, dispatch_table: dict[str, Callable], *args, **kwargs):
if device not in dispatch_table:
return dispatch_table["default"](*args, **kwargs)
@ -2992,7 +2991,7 @@ if is_torch_available():
torch_device = device_name
def update_mapping_from_spec(device_fn_dict: Dict[str, Callable], attribute_name: str):
def update_mapping_from_spec(device_fn_dict: dict[str, Callable], attribute_name: str):
try:
# Try to import the function directly
spec_fn = getattr(device_spec_module, attribute_name)

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional, Union
from typing import Optional, Union
import numpy as np
import tensorflow as tf
@ -25,7 +25,7 @@ from .utils import logging
logger = logging.get_logger(__name__)
def shape_list(tensor: Union[tf.Tensor, np.ndarray]) -> List[int]:
def shape_list(tensor: Union[tf.Tensor, np.ndarray]) -> list[int]:
"""
Deal with dynamic shape in tensorflow cleanly.

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2023 The HuggingFace Inc. team.
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
@ -17,7 +16,7 @@
Time series distributional output classes and utilities.
"""
from typing import Callable, Dict, Optional, Tuple
from typing import Callable, Optional
import torch
from torch import nn
@ -63,14 +62,14 @@ class AffineTransformed(TransformedDistribution):
class ParameterProjection(nn.Module):
def __init__(
self, in_features: int, args_dim: Dict[str, int], domain_map: Callable[..., Tuple[torch.Tensor]], **kwargs
self, in_features: int, args_dim: dict[str, int], domain_map: Callable[..., tuple[torch.Tensor]], **kwargs
) -> None:
super().__init__(**kwargs)
self.args_dim = args_dim
self.proj = nn.ModuleList([nn.Linear(in_features, dim) for dim in args_dim.values()])
self.domain_map = domain_map
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor]:
def forward(self, x: torch.Tensor) -> tuple[torch.Tensor]:
params_unbounded = [proj(x) for proj in self.proj]
return self.domain_map(*params_unbounded)
@ -88,7 +87,7 @@ class LambdaLayer(nn.Module):
class DistributionOutput:
distribution_class: type
in_features: int
args_dim: Dict[str, int]
args_dim: dict[str, int]
def __init__(self, dim: int = 1) -> None:
self.dim = dim
@ -113,7 +112,7 @@ class DistributionOutput:
return AffineTransformed(distr, loc=loc, scale=scale, event_dim=self.event_dim)
@property
def event_shape(self) -> Tuple:
def event_shape(self) -> tuple:
r"""
Shape of each individual event contemplated by the distributions that this object constructs.
"""
@ -167,7 +166,7 @@ class StudentTOutput(DistributionOutput):
Student-T distribution output class.
"""
args_dim: Dict[str, int] = {"df": 1, "loc": 1, "scale": 1}
args_dim: dict[str, int] = {"df": 1, "loc": 1, "scale": 1}
distribution_class: type = StudentT
@classmethod
@ -182,7 +181,7 @@ class NormalOutput(DistributionOutput):
Normal distribution output class.
"""
args_dim: Dict[str, int] = {"loc": 1, "scale": 1}
args_dim: dict[str, int] = {"loc": 1, "scale": 1}
distribution_class: type = Normal
@classmethod
@ -196,7 +195,7 @@ class NegativeBinomialOutput(DistributionOutput):
Negative Binomial distribution output class.
"""
args_dim: Dict[str, int] = {"total_count": 1, "logits": 1}
args_dim: dict[str, int] = {"total_count": 1, "logits": 1}
distribution_class: type = NegativeBinomial
@classmethod

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2020 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -22,7 +21,7 @@ import itertools
import re
import unicodedata
from collections import OrderedDict
from typing import Any, Dict, List, Optional, Tuple, Union, overload
from typing import Any, Optional, Union, overload
from .tokenization_utils_base import (
ENCODE_KWARGS_DOCSTRING,
@ -103,7 +102,7 @@ class Trie:
ref = ref[char]
ref[self._termination_char] = 1
def split(self, text: str) -> List[str]:
def split(self, text: str) -> list[str]:
"""
Will look for the words added to the trie within `text`. Output is the original string splitted along the
boundaries of the words found.
@ -391,7 +390,7 @@ def _is_start_of_word(text):
return bool(_is_control(first_char) | _is_punctuation(first_char) | _is_whitespace(first_char))
def _insert_one_token_to_ordered_list(token_list: List[str], new_token: str):
def _insert_one_token_to_ordered_list(token_list: list[str], new_token: str):
"""
Inserts one token to an ordered list if it does not already exist. Note: token_list must be sorted.
"""
@ -425,11 +424,11 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
# 2. init `_added_tokens_decoder` if child class did not
if not hasattr(self, "_added_tokens_decoder"):
self._added_tokens_decoder: Dict[int, AddedToken] = {}
self._added_tokens_decoder: dict[int, AddedToken] = {}
# 3. if a `added_tokens_decoder` is passed, we are loading from a saved tokenizer, we overwrite
self._added_tokens_decoder.update(kwargs.pop("added_tokens_decoder", {}))
self._added_tokens_encoder: Dict[str, int] = {k.content: v for v, k in self._added_tokens_decoder.items()}
self._added_tokens_encoder: dict[str, int] = {k.content: v for v, k in self._added_tokens_decoder.items()}
# 4 init the parent class
super().__init__(**kwargs)
@ -455,7 +454,7 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
raise NotImplementedError
@property
def added_tokens_encoder(self) -> Dict[str, int]:
def added_tokens_encoder(self) -> dict[str, int]:
"""
Returns the sorted mapping from string to index. The added tokens encoder is cached for performance
optimisation in `self._added_tokens_encoder` for the slow tokenizers.
@ -463,7 +462,7 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
return {k.content: v for v, k in sorted(self._added_tokens_decoder.items(), key=lambda item: item[0])}
@property
def added_tokens_decoder(self) -> Dict[int, AddedToken]:
def added_tokens_decoder(self) -> dict[int, AddedToken]:
"""
Returns the added tokens in the vocabulary as a dictionary of index to AddedToken.
@ -473,7 +472,7 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
return dict(sorted(self._added_tokens_decoder.items(), key=lambda item: item[0]))
@added_tokens_decoder.setter
def added_tokens_decoder(self, value: Dict[int, Union[AddedToken, str]]) -> Dict[int, AddedToken]:
def added_tokens_decoder(self, value: dict[int, Union[AddedToken, str]]) -> dict[int, AddedToken]:
# Always raise an error if string because users should define the behavior
for index, token in value.items():
if not isinstance(token, (str, AddedToken)) or not isinstance(index, int):
@ -485,7 +484,7 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
self._added_tokens_encoder[str(token)] = index
self._update_total_vocab_size()
def get_added_vocab(self) -> Dict[str, int]:
def get_added_vocab(self) -> dict[str, int]:
"""
Returns the added tokens in the vocabulary as a dictionary of token to index. Results might be different from
the fast call because for now we always add the tokens even if they are already in the vocabulary. This is
@ -510,7 +509,7 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
"""
self.total_vocab_size = len(self.get_vocab())
def _add_tokens(self, new_tokens: Union[List[str], List[AddedToken]], special_tokens: bool = False) -> int:
def _add_tokens(self, new_tokens: Union[list[str], list[AddedToken]], special_tokens: bool = False) -> int:
"""
Add a list of new tokens to the tokenizer class. If the new tokens are not in the vocabulary, they are added to
it with indices starting from length of the current vocabulary. Special tokens are sometimes already in the
@ -619,7 +618,7 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
token_ids_1 = []
return len(self.build_inputs_with_special_tokens(token_ids_0, token_ids_1 if pair else None))
def tokenize(self, text: TextInput, **kwargs) -> List[str]:
def tokenize(self, text: TextInput, **kwargs) -> list[str]:
"""
Converts a string into a sequence of tokens, using the tokenizer.
@ -708,7 +707,7 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
"""
raise NotImplementedError
def convert_tokens_to_ids(self, tokens: Union[str, List[str]]) -> Union[int, List[int]]:
def convert_tokens_to_ids(self, tokens: Union[str, list[str]]) -> Union[int, list[int]]:
"""
Converts a token string (or a sequence of tokens) in a single integer id (or a sequence of ids), using the
vocabulary.
@ -824,12 +823,12 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
def _batch_encode_plus(
self,
batch_text_or_text_pairs: Union[
List[TextInput],
List[TextInputPair],
List[PreTokenizedInput],
List[PreTokenizedInputPair],
List[EncodedInput],
List[EncodedInputPair],
list[TextInput],
list[TextInputPair],
list[PreTokenizedInput],
list[PreTokenizedInputPair],
list[EncodedInput],
list[EncodedInputPair],
],
add_special_tokens: bool = True,
padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD,
@ -913,7 +912,7 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
@add_end_docstrings(ENCODE_KWARGS_DOCSTRING, ENCODE_PLUS_ADDITIONAL_KWARGS_DOCSTRING)
def _batch_prepare_for_model(
self,
batch_ids_pairs: List[Union[PreTokenizedInputPair, Tuple[List[int], None]]],
batch_ids_pairs: list[Union[PreTokenizedInputPair, tuple[list[int], None]]],
add_special_tokens: bool = True,
padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD,
truncation_strategy: TruncationStrategy = TruncationStrategy.DO_NOT_TRUNCATE,
@ -982,7 +981,7 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
def prepare_for_tokenization(
self, text: str, is_split_into_words: bool = False, **kwargs
) -> Tuple[str, Dict[str, Any]]:
) -> tuple[str, dict[str, Any]]:
"""
Performs any necessary transformations before tokenization.
@ -1005,8 +1004,8 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
return (text, kwargs)
def get_special_tokens_mask(
self, token_ids_0: List, token_ids_1: Optional[List] = None, already_has_special_tokens: bool = False
) -> List[int]:
self, token_ids_0: list, token_ids_1: Optional[list] = None, already_has_special_tokens: bool = False
) -> list[int]:
"""
Retrieves sequence ids from a token list that has no special tokens added. This method is called when adding
special tokens using the tokenizer `prepare_for_model` or `encode_plus` methods.
@ -1038,11 +1037,11 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
def convert_ids_to_tokens(self, ids: int, skip_special_tokens: bool = False) -> str: ...
@overload
def convert_ids_to_tokens(self, ids: List[int], skip_special_tokens: bool = False) -> List[str]: ...
def convert_ids_to_tokens(self, ids: list[int], skip_special_tokens: bool = False) -> list[str]: ...
def convert_ids_to_tokens(
self, ids: Union[int, List[int]], skip_special_tokens: bool = False
) -> Union[str, List[str]]:
self, ids: Union[int, list[int]], skip_special_tokens: bool = False
) -> Union[str, list[str]]:
"""
Converts a single index or a sequence of indices in a token or a sequence of tokens, using the vocabulary and
added tokens.
@ -1075,12 +1074,12 @@ class PreTrainedTokenizer(PreTrainedTokenizerBase):
def _convert_id_to_token(self, index: int) -> str:
raise NotImplementedError
def convert_tokens_to_string(self, tokens: List[str]) -> str:
def convert_tokens_to_string(self, tokens: list[str]) -> str:
return " ".join(tokens)
def _decode(
self,
token_ids: Union[int, List[int]],
token_ids: Union[int, list[int]],
skip_special_tokens: bool = False,
clean_up_tokenization_spaces: bool = None,
spaces_between_special_tokens: bool = True,

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2020 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -21,7 +20,8 @@ import copy
import json
import os
from collections import defaultdict
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from collections.abc import Iterable
from typing import Any, Optional, Union
import tokenizers.pre_tokenizers as pre_tokenizers_fast
from tokenizers import Encoding as EncodingFast
@ -238,15 +238,15 @@ class PreTrainedTokenizerFast(PreTrainedTokenizerBase):
"""
return self._tokenizer.get_vocab_size(with_added_tokens=False)
def get_vocab(self) -> Dict[str, int]:
def get_vocab(self) -> dict[str, int]:
return self._tokenizer.get_vocab(with_added_tokens=True)
@property
def vocab(self) -> Dict[str, int]:
def vocab(self) -> dict[str, int]:
return self.get_vocab()
@property
def added_tokens_encoder(self) -> Dict[str, int]:
def added_tokens_encoder(self) -> dict[str, int]:
"""
Returns the sorted mapping from string to index. The added tokens encoder is cached for performance
optimisation in `self._added_tokens_encoder` for the slow tokenizers.
@ -254,7 +254,7 @@ class PreTrainedTokenizerFast(PreTrainedTokenizerBase):
return {k.content: v for v, k in sorted(self.added_tokens_decoder.items(), key=lambda item: item[0])}
@property
def added_tokens_decoder(self) -> Dict[int, AddedToken]:
def added_tokens_decoder(self) -> dict[int, AddedToken]:
"""
Returns the added tokens in the vocabulary as a dictionary of index to AddedToken.
@ -263,7 +263,7 @@ class PreTrainedTokenizerFast(PreTrainedTokenizerBase):
"""
return self._tokenizer.get_added_tokens_decoder()
def get_added_vocab(self) -> Dict[str, int]:
def get_added_vocab(self) -> dict[str, int]:
"""
Returns the added tokens in the vocabulary as a dictionary of token to index.
@ -302,7 +302,7 @@ class PreTrainedTokenizerFast(PreTrainedTokenizerBase):
return_offsets_mapping: bool = False,
return_length: bool = False,
verbose: bool = True,
) -> Tuple[Dict[str, Any], List[EncodingFast]]:
) -> tuple[dict[str, Any], list[EncodingFast]]:
"""
Convert the encoding representation (from low-level HuggingFace tokenizer output) to a python Dict and a list
of encodings, take care of building a batch from overflowing tokens.
@ -339,7 +339,7 @@ class PreTrainedTokenizerFast(PreTrainedTokenizerBase):
return encoding_dict, encodings
def convert_tokens_to_ids(self, tokens: Union[str, Iterable[str]]) -> Union[int, List[int]]:
def convert_tokens_to_ids(self, tokens: Union[str, Iterable[str]]) -> Union[int, list[int]]:
"""
Converts a token string (or a sequence of tokens) in a single integer id (or a Iterable of ids), using the
vocabulary.
@ -364,7 +364,7 @@ class PreTrainedTokenizerFast(PreTrainedTokenizerBase):
def _convert_id_to_token(self, index: int) -> Optional[str]:
return self._tokenizer.id_to_token(int(index))
def _add_tokens(self, new_tokens: List[Union[str, AddedToken]], special_tokens=False) -> int:
def _add_tokens(self, new_tokens: list[Union[str, AddedToken]], special_tokens=False) -> int:
if special_tokens:
return self._tokenizer.add_special_tokens(new_tokens)
@ -392,8 +392,8 @@ class PreTrainedTokenizerFast(PreTrainedTokenizerBase):
return self._tokenizer.num_special_tokens_to_add(pair)
def convert_ids_to_tokens(
self, ids: Union[int, List[int]], skip_special_tokens: bool = False
) -> Union[str, List[str]]:
self, ids: Union[int, list[int]], skip_special_tokens: bool = False
) -> Union[str, list[str]]:
"""
Converts a single index or a sequence of indices in a token or a sequence of tokens, using the vocabulary and
added tokens.
@ -417,7 +417,7 @@ class PreTrainedTokenizerFast(PreTrainedTokenizerBase):
tokens.append(self._tokenizer.id_to_token(index))
return tokens
def tokenize(self, text: str, pair: Optional[str] = None, add_special_tokens: bool = False, **kwargs) -> List[str]:
def tokenize(self, text: str, pair: Optional[str] = None, add_special_tokens: bool = False, **kwargs) -> list[str]:
return self.encode_plus(text=text, text_pair=pair, add_special_tokens=add_special_tokens, **kwargs).tokens()
def set_truncation_and_padding(
@ -498,7 +498,7 @@ class PreTrainedTokenizerFast(PreTrainedTokenizerBase):
def _batch_encode_plus(
self,
batch_text_or_text_pairs: Union[
List[TextInput], List[TextInputPair], List[PreTokenizedInput], List[PreTokenizedInputPair]
list[TextInput], list[TextInputPair], list[PreTokenizedInput], list[PreTokenizedInputPair]
],
add_special_tokens: bool = True,
padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD,
@ -647,7 +647,7 @@ class PreTrainedTokenizerFast(PreTrainedTokenizerBase):
return batched_output
def convert_tokens_to_string(self, tokens: List[str]) -> str:
def convert_tokens_to_string(self, tokens: list[str]) -> str:
return (
self.backend_tokenizer.decoder.decode(tokens)
if self.backend_tokenizer.decoder is not None
@ -656,7 +656,7 @@ class PreTrainedTokenizerFast(PreTrainedTokenizerBase):
def _decode(
self,
token_ids: Union[int, List[int]],
token_ids: Union[int, list[int]],
skip_special_tokens: bool = False,
clean_up_tokenization_spaces: bool = None,
**kwargs,
@ -681,10 +681,10 @@ class PreTrainedTokenizerFast(PreTrainedTokenizerBase):
def _save_pretrained(
self,
save_directory: Union[str, os.PathLike],
file_names: Tuple[str],
file_names: tuple[str],
legacy_format: Optional[bool] = None,
filename_prefix: Optional[str] = None,
) -> Tuple[str]:
) -> tuple[str]:
"""
Save a tokenizer using the slow-tokenizer/legacy format: vocabulary + added tokens as well as in a unique JSON
file containing {config + vocab + added-tokens}.

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2020-present the HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -34,7 +33,7 @@ import time
import warnings
from collections.abc import Mapping
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Type, Union
from typing import TYPE_CHECKING, Any, Callable, Optional, Union
# Integrations must be imported before ML frameworks:
@ -419,16 +418,16 @@ class Trainer:
args: TrainingArguments = None,
data_collator: Optional[DataCollator] = None,
train_dataset: Optional[Union[Dataset, IterableDataset, "datasets.Dataset"]] = None,
eval_dataset: Optional[Union[Dataset, Dict[str, Dataset], "datasets.Dataset"]] = None,
eval_dataset: Optional[Union[Dataset, dict[str, Dataset], "datasets.Dataset"]] = None,
processing_class: Optional[
Union[PreTrainedTokenizerBase, BaseImageProcessor, FeatureExtractionMixin, ProcessorMixin]
] = None,
model_init: Optional[Callable[[], PreTrainedModel]] = None,
compute_loss_func: Optional[Callable] = None,
compute_metrics: Optional[Callable[[EvalPrediction], Dict]] = None,
callbacks: Optional[List[TrainerCallback]] = None,
optimizers: Tuple[Optional[torch.optim.Optimizer], Optional[torch.optim.lr_scheduler.LambdaLR]] = (None, None),
optimizer_cls_and_kwargs: Optional[Tuple[Type[torch.optim.Optimizer], Dict[str, Any]]] = None,
compute_metrics: Optional[Callable[[EvalPrediction], dict]] = None,
callbacks: Optional[list[TrainerCallback]] = None,
optimizers: tuple[Optional[torch.optim.Optimizer], Optional[torch.optim.lr_scheduler.LambdaLR]] = (None, None),
optimizer_cls_and_kwargs: Optional[tuple[type[torch.optim.Optimizer], dict[str, Any]]] = None,
preprocess_logits_for_metrics: Optional[Callable[[torch.Tensor, torch.Tensor], torch.Tensor]] = None,
):
if args is None:
@ -1187,7 +1186,7 @@ class Trainer:
optimizer = self.optimizer
self.create_scheduler(num_training_steps=num_training_steps, optimizer=optimizer)
def get_decay_parameter_names(self, model) -> List[str]:
def get_decay_parameter_names(self, model) -> list[str]:
"""
Get all parameter names that weight decay will be applied to.
@ -1298,7 +1297,7 @@ class Trainer:
@staticmethod
def get_optimizer_cls_and_kwargs(
args: TrainingArguments, model: Optional[PreTrainedModel] = None
) -> Tuple[Any, Any]:
) -> tuple[Any, Any]:
"""
Returns the optimizer class and optimizer parameters based on the training arguments.
@ -1324,10 +1323,10 @@ class Trainer:
def setup_low_rank_optimizer(
optimizer_name: str,
optimizer_mapping: Dict[str, Any],
optim_kwargs: Dict[str, Any],
optimizer_mapping: dict[str, Any],
optim_kwargs: dict[str, Any],
is_layerwise_supported: bool = True,
) -> Tuple[Any, Any]:
) -> tuple[Any, Any]:
"""
Helper function to set up low-rank optimizers like GaLore and Apollo.
@ -1783,7 +1782,7 @@ class Trainer:
logger.warning("Cannot get num_tokens from dataloader")
return train_tokens
def _hp_search_setup(self, trial: Union["optuna.Trial", Dict[str, Any]]):
def _hp_search_setup(self, trial: Union["optuna.Trial", dict[str, Any]]):
"""HP search setup code"""
self._trial = trial
@ -1839,7 +1838,7 @@ class Trainer:
self.create_accelerator_and_postprocess()
def _report_to_hp_search(self, trial: Union["optuna.Trial", Dict[str, Any]], step: int, metrics: Dict[str, float]):
def _report_to_hp_search(self, trial: Union["optuna.Trial", dict[str, Any]], step: int, metrics: dict[str, float]):
if self.hp_search_backend is None or trial is None:
return
metrics = metrics.copy()
@ -2140,8 +2139,8 @@ class Trainer:
def train(
self,
resume_from_checkpoint: Optional[Union[str, bool]] = None,
trial: Union["optuna.Trial", Dict[str, Any]] = None,
ignore_keys_for_eval: Optional[List[str]] = None,
trial: Union["optuna.Trial", dict[str, Any]] = None,
ignore_keys_for_eval: Optional[list[str]] = None,
**kwargs,
):
"""
@ -3070,7 +3069,7 @@ class Trainer:
if is_torch_xla_available():
xm.mark_step()
logs: Dict[str, float] = {}
logs: dict[str, float] = {}
# all_gather + mean() to get average loss over all processes
tr_loss_scalar = self._nested_gather(tr_loss).mean().item()
@ -3529,14 +3528,14 @@ class Trainer:
def hyperparameter_search(
self,
hp_space: Optional[Callable[["optuna.Trial"], Dict[str, float]]] = None,
compute_objective: Optional[Callable[[Dict[str, float]], float]] = None,
hp_space: Optional[Callable[["optuna.Trial"], dict[str, float]]] = None,
compute_objective: Optional[Callable[[dict[str, float]], float]] = None,
n_trials: int = 20,
direction: Union[str, List[str]] = "minimize",
direction: Union[str, list[str]] = "minimize",
backend: Optional[Union["str", HPSearchBackend]] = None,
hp_name: Optional[Callable[["optuna.Trial"], str]] = None,
**kwargs,
) -> Union[BestRun, List[BestRun]]:
) -> Union[BestRun, list[BestRun]]:
"""
Launch an hyperparameter search using `optuna` or `Ray Tune` or `SigOpt`. The optimized quantity is determined
by `compute_objective`, which defaults to a function returning the evaluation loss when no metric is provided,
@ -3611,7 +3610,7 @@ class Trainer:
self.hp_search_backend = None
return best_run
def log(self, logs: Dict[str, float], start_time: Optional[float] = None) -> None:
def log(self, logs: dict[str, float], start_time: Optional[float] = None) -> None:
"""
Log `logs` on the various objects watching training.
@ -3652,7 +3651,7 @@ class Trainer:
return data.to(**kwargs)
return data
def _prepare_inputs(self, inputs: Dict[str, Union[torch.Tensor, Any]]) -> Dict[str, Union[torch.Tensor, Any]]:
def _prepare_inputs(self, inputs: dict[str, Union[torch.Tensor, Any]]) -> dict[str, Union[torch.Tensor, Any]]:
"""
Prepare `inputs` before feeding them to the model, converting them to tensors if they are not already and
handling potential state.
@ -3687,7 +3686,7 @@ class Trainer:
return ctx_manager
def training_step(
self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]], num_items_in_batch=None
self, model: nn.Module, inputs: dict[str, Union[torch.Tensor, Any]], num_items_in_batch=None
) -> torch.Tensor:
"""
Perform a training step on a batch of inputs.
@ -4016,7 +4015,7 @@ class Trainer:
def _sorted_checkpoints(
self, output_dir=None, checkpoint_prefix=PREFIX_CHECKPOINT_DIR, use_mtime=False
) -> List[str]:
) -> list[str]:
ordering_and_checkpoint_path = []
glob_checkpoints = [str(x) for x in Path(output_dir).glob(f"{checkpoint_prefix}-*") if os.path.isdir(x)]
@ -4068,10 +4067,10 @@ class Trainer:
def evaluate(
self,
eval_dataset: Optional[Union[Dataset, Dict[str, Dataset]]] = None,
ignore_keys: Optional[List[str]] = None,
eval_dataset: Optional[Union[Dataset, dict[str, Dataset]]] = None,
ignore_keys: Optional[list[str]] = None,
metric_key_prefix: str = "eval",
) -> Dict[str, float]:
) -> dict[str, float]:
"""
Run evaluation and returns metrics.
@ -4171,7 +4170,7 @@ class Trainer:
return output.metrics
def predict(
self, test_dataset: Dataset, ignore_keys: Optional[List[str]] = None, metric_key_prefix: str = "test"
self, test_dataset: Dataset, ignore_keys: Optional[list[str]] = None, metric_key_prefix: str = "test"
) -> PredictionOutput:
"""
Run prediction and returns predictions and potential metrics.
@ -4239,7 +4238,7 @@ class Trainer:
dataloader: DataLoader,
description: str,
prediction_loss_only: Optional[bool] = None,
ignore_keys: Optional[List[str]] = None,
ignore_keys: Optional[list[str]] = None,
metric_key_prefix: str = "eval",
) -> EvalLoopOutput:
"""
@ -4339,11 +4338,11 @@ class Trainer:
# Update containers
if losses is not None:
losses = self.gather_function((losses.repeat(batch_size)))
losses = self.gather_function(losses.repeat(batch_size))
all_losses.add(losses)
if inputs_decode is not None:
inputs_decode = self.accelerator.pad_across_processes(inputs_decode, dim=1, pad_index=-100)
inputs_decode = self.gather_function((inputs_decode))
inputs_decode = self.gather_function(inputs_decode)
if not self.args.batch_eval_metrics or description == "Prediction":
all_inputs.add(inputs_decode)
if labels is not None:
@ -4353,11 +4352,11 @@ class Trainer:
logits = self.accelerator.pad_across_processes(logits, dim=1, pad_index=-100)
if self.preprocess_logits_for_metrics is not None:
logits = self.preprocess_logits_for_metrics(logits, labels)
logits = self.gather_function((logits))
logits = self.gather_function(logits)
if not self.args.batch_eval_metrics or description == "Prediction":
all_preds.add(logits)
if labels is not None:
labels = self.gather_function((labels))
labels = self.gather_function(labels)
if not self.args.batch_eval_metrics or description == "Prediction":
all_labels.add(labels)
@ -4470,10 +4469,10 @@ class Trainer:
def prediction_step(
self,
model: nn.Module,
inputs: Dict[str, Union[torch.Tensor, Any]],
inputs: dict[str, Union[torch.Tensor, Any]],
prediction_loss_only: bool,
ignore_keys: Optional[List[str]] = None,
) -> Tuple[Optional[torch.Tensor], Optional[torch.Tensor], Optional[torch.Tensor]]:
ignore_keys: Optional[list[str]] = None,
) -> tuple[Optional[torch.Tensor], Optional[torch.Tensor], Optional[torch.Tensor]]:
"""
Perform an evaluation step on `model` using `inputs`.
@ -4572,7 +4571,7 @@ class Trainer:
return (loss, logits, labels)
def floating_point_ops(self, inputs: Dict[str, Union[torch.Tensor, Any]]):
def floating_point_ops(self, inputs: dict[str, Union[torch.Tensor, Any]]):
"""
For models that inherit from [`PreTrainedModel`], uses that method to compute the number of floating point
operations for every backward + forward pass. If using another model, either implement such a method in the
@ -4612,13 +4611,13 @@ class Trainer:
self,
language: Optional[str] = None,
license: Optional[str] = None,
tags: Union[str, List[str], None] = None,
tags: Union[str, list[str], None] = None,
model_name: Optional[str] = None,
finetuned_from: Optional[str] = None,
tasks: Union[str, List[str], None] = None,
dataset_tags: Union[str, List[str], None] = None,
dataset: Union[str, List[str], None] = None,
dataset_args: Union[str, List[str], None] = None,
tasks: Union[str, list[str], None] = None,
dataset_tags: Union[str, list[str], None] = None,
dataset: Union[str, list[str], None] = None,
dataset_args: Union[str, list[str], None] = None,
):
"""
Creates a draft of a model card using the information available to the `Trainer`.
@ -4840,7 +4839,7 @@ class Trainer:
dataloader: DataLoader,
description: str,
prediction_loss_only: Optional[bool] = None,
ignore_keys: Optional[List[str]] = None,
ignore_keys: Optional[list[str]] = None,
metric_key_prefix: str = "eval",
) -> EvalLoopOutput:
"""
@ -4904,9 +4903,9 @@ class Trainer:
logger.info(f" Batch size = {batch_size}")
losses_host: torch.Tensor = None
preds_host: Union[torch.Tensor, List[torch.Tensor]] = None
labels_host: Union[torch.Tensor, List[torch.Tensor]] = None
inputs_host: Union[torch.Tensor, List[torch.Tensor]] = None
preds_host: Union[torch.Tensor, list[torch.Tensor]] = None
labels_host: Union[torch.Tensor, list[torch.Tensor]] = None
inputs_host: Union[torch.Tensor, list[torch.Tensor]] = None
metrics: Optional[dict] = None
eval_set_kwargs: dict = {}
@ -5047,7 +5046,7 @@ class Trainer:
# Get current .gitignore content
if os.path.exists(os.path.join(self.repo.local_dir, ".gitignore")):
with open(os.path.join(self.repo.local_dir, ".gitignore"), "r") as f:
with open(os.path.join(self.repo.local_dir, ".gitignore")) as f:
current_content = f.read()
else:
current_content = ""

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2020-present the HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -20,7 +19,7 @@ import dataclasses
import json
import math
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
from typing import Optional, Union
import numpy as np
from tqdm.auto import tqdm
@ -104,7 +103,7 @@ class TrainerState:
num_train_epochs: int = 0
num_input_tokens_seen: int = 0
total_flos: float = 0
log_history: List[Dict[str, float]] = None
log_history: list[dict[str, float]] = None
best_metric: Optional[float] = None
best_global_step: Optional[int] = None
best_model_checkpoint: Optional[str] = None
@ -112,8 +111,8 @@ class TrainerState:
is_world_process_zero: bool = True
is_hyper_param_search: bool = False
trial_name: str = None
trial_params: Dict[str, Union[str, float, int, bool]] = None
stateful_callbacks: List["TrainerCallback"] = None
trial_params: dict[str, Union[str, float, int, bool]] = None
stateful_callbacks: list["TrainerCallback"] = None
def __post_init__(self):
if self.log_history is None:
@ -151,7 +150,7 @@ class TrainerState:
@classmethod
def load_from_json(cls, json_path: str):
"""Create an instance from the content of `json_path`."""
with open(json_path, "r", encoding="utf-8") as f:
with open(json_path, encoding="utf-8") as f:
text = f.read()
return cls(**json.loads(text))

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2020-present the HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -24,12 +23,12 @@ import math
import os
import sys
import warnings
from collections.abc import Mapping
from collections.abc import Iterator, Mapping
from contextlib import contextmanager
from dataclasses import dataclass, field
from itertools import chain
from logging import StreamHandler
from typing import Any, Dict, Iterator, List, Optional, Union
from typing import Any, Optional, Union
import numpy as np
import torch
@ -221,7 +220,7 @@ def distributed_concat(tensor: Any, num_total_examples: Optional[int] = None) ->
def distributed_broadcast_scalars(
scalars: List[Union[int, float]],
scalars: list[Union[int, float]],
num_total_examples: Optional[int] = None,
device: Optional[torch.device] = torch.device("cuda"),
) -> torch.Tensor:
@ -624,7 +623,7 @@ class LengthGroupedSampler(Sampler):
self,
batch_size: int,
dataset: Optional[Dataset] = None,
lengths: Optional[List[int]] = None,
lengths: Optional[list[int]] = None,
model_input_name: Optional[str] = None,
generator=None,
):
@ -675,7 +674,7 @@ class DistributedLengthGroupedSampler(DistributedSampler):
rank: Optional[int] = None,
seed: int = 0,
drop_last: bool = False,
lengths: Optional[List[int]] = None,
lengths: Optional[list[int]] = None,
model_input_name: Optional[str] = None,
):
if dataset is None and lengths is None:
@ -936,7 +935,7 @@ def _secs2timedelta(secs):
return f"{datetime.timedelta(seconds=int(secs))}.{msec:02d}"
def metrics_format(self, metrics: Dict[str, float]) -> Dict[str, float]:
def metrics_format(self, metrics: dict[str, float]) -> dict[str, float]:
"""
Reformat Trainer metrics values to a human-readable format
@ -1080,7 +1079,7 @@ def save_metrics(self, split, metrics, combined=True):
if combined:
path = os.path.join(self.args.output_dir, "all_results.json")
if os.path.exists(path):
with open(path, "r") as f:
with open(path) as f:
all_metrics = json.load(f)
else:
all_metrics = {}
@ -1300,7 +1299,7 @@ class AcceleratorConfig:
},
)
gradient_accumulation_kwargs: Optional[Dict] = field(
gradient_accumulation_kwargs: Optional[dict] = field(
default=None,
metadata={
"help": "Additional kwargs to configure gradient accumulation, see [`accelerate.utils.GradientAccumulationPlugin`]. "

View File

@ -16,7 +16,7 @@ import contextlib
import warnings
from copy import deepcopy
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Callable, Optional, Union
import torch
from torch import nn
@ -59,15 +59,15 @@ class Seq2SeqTrainer(Trainer):
args: "TrainingArguments" = None,
data_collator: Optional["DataCollator"] = None,
train_dataset: Optional[Union[Dataset, "IterableDataset", "datasets.Dataset"]] = None,
eval_dataset: Optional[Union[Dataset, Dict[str, Dataset]]] = None,
eval_dataset: Optional[Union[Dataset, dict[str, Dataset]]] = None,
processing_class: Optional[
Union["PreTrainedTokenizerBase", "BaseImageProcessor", "FeatureExtractionMixin", "ProcessorMixin"]
] = None,
model_init: Optional[Callable[[], "PreTrainedModel"]] = None,
compute_loss_func: Optional[Callable] = None,
compute_metrics: Optional[Callable[["EvalPrediction"], Dict]] = None,
callbacks: Optional[List["TrainerCallback"]] = None,
optimizers: Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LambdaLR] = (None, None),
compute_metrics: Optional[Callable[["EvalPrediction"], dict]] = None,
callbacks: Optional[list["TrainerCallback"]] = None,
optimizers: tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LambdaLR] = (None, None),
preprocess_logits_for_metrics: Optional[Callable[[torch.Tensor, torch.Tensor], torch.Tensor]] = None,
):
super().__init__(
@ -143,10 +143,10 @@ class Seq2SeqTrainer(Trainer):
def evaluate(
self,
eval_dataset: Optional[Dataset] = None,
ignore_keys: Optional[List[str]] = None,
ignore_keys: Optional[list[str]] = None,
metric_key_prefix: str = "eval",
**gen_kwargs,
) -> Dict[str, float]:
) -> dict[str, float]:
"""
Run evaluation and returns metrics.
@ -199,7 +199,7 @@ class Seq2SeqTrainer(Trainer):
def predict(
self,
test_dataset: Dataset,
ignore_keys: Optional[List[str]] = None,
ignore_keys: Optional[list[str]] = None,
metric_key_prefix: str = "test",
**gen_kwargs,
) -> "PredictionOutput":
@ -263,11 +263,11 @@ class Seq2SeqTrainer(Trainer):
def prediction_step(
self,
model: nn.Module,
inputs: Dict[str, Union[torch.Tensor, Any]],
inputs: dict[str, Union[torch.Tensor, Any]],
prediction_loss_only: bool,
ignore_keys: Optional[List[str]] = None,
ignore_keys: Optional[list[str]] = None,
**gen_kwargs,
) -> Tuple[Optional[float], Optional[torch.Tensor], Optional[torch.Tensor]]:
) -> tuple[Optional[float], Optional[torch.Tensor], Optional[torch.Tensor]]:
"""
Perform an evaluation step on `model` using `inputs`.

View File

@ -1,4 +1,3 @@
# coding=utf-8
# Copyright 2020-present the HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -25,7 +24,7 @@ import random
import re
import threading
import time
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union
from typing import Any, NamedTuple, Optional, Union
import numpy as np
@ -165,10 +164,10 @@ class EvalPrediction:
def __init__(
self,
predictions: Union[np.ndarray, Tuple[np.ndarray]],
label_ids: Union[np.ndarray, Tuple[np.ndarray]],
inputs: Optional[Union[np.ndarray, Tuple[np.ndarray]]] = None,
losses: Optional[Union[np.ndarray, Tuple[np.ndarray]]] = None,
predictions: Union[np.ndarray, tuple[np.ndarray]],
label_ids: Union[np.ndarray, tuple[np.ndarray]],
inputs: Optional[Union[np.ndarray, tuple[np.ndarray]]] = None,
losses: Optional[Union[np.ndarray, tuple[np.ndarray]]] = None,
):
self.predictions = predictions
self.label_ids = label_ids
@ -190,22 +189,22 @@ class EvalPrediction:
class EvalLoopOutput(NamedTuple):
predictions: Union[np.ndarray, Tuple[np.ndarray]]
label_ids: Optional[Union[np.ndarray, Tuple[np.ndarray]]]
metrics: Optional[Dict[str, float]]
predictions: Union[np.ndarray, tuple[np.ndarray]]
label_ids: Optional[Union[np.ndarray, tuple[np.ndarray]]]
metrics: Optional[dict[str, float]]
num_samples: Optional[int]
class PredictionOutput(NamedTuple):
predictions: Union[np.ndarray, Tuple[np.ndarray]]
label_ids: Optional[Union[np.ndarray, Tuple[np.ndarray]]]
metrics: Optional[Dict[str, float]]
predictions: Union[np.ndarray, tuple[np.ndarray]]
label_ids: Optional[Union[np.ndarray, tuple[np.ndarray]]]
metrics: Optional[dict[str, float]]
class TrainOutput(NamedTuple):
global_step: int
training_loss: float
metrics: Dict[str, float]
metrics: dict[str, float]
PREFIX_CHECKPOINT_DIR = "checkpoint"
@ -267,12 +266,12 @@ class BestRun(NamedTuple):
"""
run_id: str
objective: Union[float, List[float]]
hyperparameters: Dict[str, Any]
objective: Union[float, list[float]]
hyperparameters: dict[str, Any]
run_summary: Optional[Any] = None
def default_compute_objective(metrics: Dict[str, float]) -> float:
def default_compute_objective(metrics: dict[str, float]) -> float:
"""
The default objective to maximize/minimize when doing an hyperparameter search. It is the evaluation loss if no
metrics are provided to the [`Trainer`], the sum of all metrics otherwise.
@ -297,7 +296,7 @@ def default_compute_objective(metrics: Dict[str, float]) -> float:
return loss if len(metrics) == 0 else sum(metrics.values())
def default_hp_space_optuna(trial) -> Dict[str, float]:
def default_hp_space_optuna(trial) -> dict[str, float]:
from .integrations import is_optuna_available
assert is_optuna_available(), "This function needs Optuna installed: `pip install optuna`"
@ -309,7 +308,7 @@ def default_hp_space_optuna(trial) -> Dict[str, float]:
}
def default_hp_space_ray(trial) -> Dict[str, float]:
def default_hp_space_ray(trial) -> dict[str, float]:
from .integrations import is_ray_tune_available
assert is_ray_tune_available(), "This function needs ray installed: `pip install ray[tune]`"
@ -336,7 +335,7 @@ def default_hp_space_sigopt(trial):
]
def default_hp_space_wandb(trial) -> Dict[str, float]:
def default_hp_space_wandb(trial) -> dict[str, float]:
from .integrations import is_wandb_available
if not is_wandb_available():
@ -867,7 +866,7 @@ class RemoveColumnsCollator:
self.message_logged = True
return {k: v for k, v in feature.items() if k in self.signature_columns}
def __call__(self, features: List[dict]):
def __call__(self, features: list[dict]):
features = [self._remove_columns(feature) for feature in features]
return self.data_collator(features)

View File

@ -14,7 +14,7 @@
import warnings
from dataclasses import dataclass, field
from typing import Optional, Tuple
from typing import Optional
from .training_args import TrainingArguments
from .utils import cached_property, is_tf_available, logging, requires_backends
@ -189,7 +189,7 @@ class TFTrainingArguments(TrainingArguments):
xla: bool = field(default=False, metadata={"help": "Whether to activate the XLA compilation or not"})
@cached_property
def _setup_strategy(self) -> Tuple["tf.distribute.Strategy", int]:
def _setup_strategy(self) -> tuple["tf.distribute.Strategy", int]:
requires_backends(self, ["tf"])
logger.info("Tensorflow: setting up strategy")