mirror of
https://github.com/huggingface/transformers.git
synced 2025-07-25 23:38:59 +06:00
1796 lines
79 KiB
Python
1796 lines
79 KiB
Python
# coding=utf-8
|
|
# Copyright 2022 The HuggingFace Inc. team. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Image processor class for DETR."""
|
|
|
|
import io
|
|
import pathlib
|
|
import warnings
|
|
from collections import defaultdict
|
|
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union
|
|
|
|
import numpy as np
|
|
|
|
from transformers.image_processing_utils import BaseImageProcessor, BatchFeature, get_size_dict
|
|
from transformers.image_transforms import (
|
|
PaddingMode,
|
|
center_to_corners_format,
|
|
corners_to_center_format,
|
|
id_to_rgb,
|
|
normalize,
|
|
pad,
|
|
rescale,
|
|
resize,
|
|
rgb_to_id,
|
|
to_channel_dimension_format,
|
|
)
|
|
from transformers.image_utils import (
|
|
IMAGENET_DEFAULT_MEAN,
|
|
IMAGENET_DEFAULT_STD,
|
|
ChannelDimension,
|
|
ImageInput,
|
|
PILImageResampling,
|
|
get_image_size,
|
|
infer_channel_dimension_format,
|
|
make_list_of_images,
|
|
to_numpy_array,
|
|
valid_coco_detection_annotations,
|
|
valid_coco_panoptic_annotations,
|
|
valid_images,
|
|
)
|
|
from transformers.utils import (
|
|
is_flax_available,
|
|
is_jax_tensor,
|
|
is_scipy_available,
|
|
is_tf_available,
|
|
is_tf_tensor,
|
|
is_torch_available,
|
|
is_torch_tensor,
|
|
is_vision_available,
|
|
)
|
|
from transformers.utils.generic import ExplicitEnum, TensorType
|
|
|
|
|
|
if is_torch_available():
|
|
import torch
|
|
from torch import nn
|
|
|
|
|
|
if is_vision_available():
|
|
import PIL
|
|
|
|
|
|
if is_scipy_available():
|
|
import scipy.special
|
|
import scipy.stats
|
|
|
|
|
|
AnnotationType = Dict[str, Union[int, str, List[Dict]]]
|
|
|
|
|
|
class AnnotionFormat(ExplicitEnum):
|
|
COCO_DETECTION = "coco_detection"
|
|
COCO_PANOPTIC = "coco_panoptic"
|
|
|
|
|
|
SUPPORTED_ANNOTATION_FORMATS = (AnnotionFormat.COCO_DETECTION, AnnotionFormat.COCO_PANOPTIC)
|
|
|
|
|
|
def get_size_with_aspect_ratio(image_size, size, max_size=None) -> Tuple[int, int]:
|
|
"""
|
|
Computes the output image size given the input image size and the desired output size.
|
|
|
|
Args:
|
|
image_size (`Tuple[int, int]`):
|
|
The input image size.
|
|
size (`int`):
|
|
The desired output size.
|
|
max_size (`int`, *optional*):
|
|
The maximum allowed output size.
|
|
"""
|
|
height, width = image_size
|
|
if max_size is not None:
|
|
min_original_size = float(min((height, width)))
|
|
max_original_size = float(max((height, width)))
|
|
if max_original_size / min_original_size * size > max_size:
|
|
size = int(round(max_size * min_original_size / max_original_size))
|
|
|
|
if (height <= width and height == size) or (width <= height and width == size):
|
|
return height, width
|
|
|
|
if width < height:
|
|
ow = size
|
|
oh = int(size * height / width)
|
|
else:
|
|
oh = size
|
|
ow = int(size * width / height)
|
|
return (oh, ow)
|
|
|
|
|
|
def get_resize_output_image_size(
|
|
input_image: np.ndarray, size: Union[int, Tuple[int, int], List[int]], max_size: Optional[int] = None
|
|
) -> Tuple[int, int]:
|
|
"""
|
|
Computes the output image size given the input image size and the desired output size. If the desired output size
|
|
is a tuple or list, the output image size is returned as is. If the desired output size is an integer, the output
|
|
image size is computed by keeping the aspect ratio of the input image size.
|
|
|
|
Args:
|
|
image_size (`Tuple[int, int]`):
|
|
The input image size.
|
|
size (`int`):
|
|
The desired output size.
|
|
max_size (`int`, *optional*):
|
|
The maximum allowed output size.
|
|
"""
|
|
image_size = get_image_size(input_image)
|
|
if isinstance(size, (list, tuple)):
|
|
return size
|
|
|
|
return get_size_with_aspect_ratio(image_size, size, max_size)
|
|
|
|
|
|
def get_numpy_to_framework_fn(arr) -> Callable:
|
|
"""
|
|
Returns a function that converts a numpy array to the framework of the input array.
|
|
|
|
Args:
|
|
arr (`np.ndarray`): The array to convert.
|
|
"""
|
|
if isinstance(arr, np.ndarray):
|
|
return np.array
|
|
if is_tf_available() and is_tf_tensor(arr):
|
|
import tensorflow as tf
|
|
|
|
return tf.convert_to_tensor
|
|
if is_torch_available() and is_torch_tensor(arr):
|
|
import torch
|
|
|
|
return torch.tensor
|
|
if is_flax_available() and is_jax_tensor(arr):
|
|
import jax.numpy as jnp
|
|
|
|
return jnp.array
|
|
raise ValueError(f"Cannot convert arrays of type {type(arr)}")
|
|
|
|
|
|
def safe_squeeze(arr: np.ndarray, axis: Optional[int] = None) -> np.ndarray:
|
|
"""
|
|
Squeezes an array, but only if the axis specified has dim 1.
|
|
"""
|
|
if axis is None:
|
|
return arr.squeeze()
|
|
|
|
try:
|
|
return arr.squeeze(axis=axis)
|
|
except ValueError:
|
|
return arr
|
|
|
|
|
|
def normalize_annotation(annotation: Dict, image_size: Tuple[int, int]) -> Dict:
|
|
image_height, image_width = image_size
|
|
norm_annotation = {}
|
|
for key, value in annotation.items():
|
|
if key == "boxes":
|
|
boxes = value
|
|
boxes = corners_to_center_format(boxes)
|
|
boxes /= np.asarray([image_width, image_height, image_width, image_height], dtype=np.float32)
|
|
norm_annotation[key] = boxes
|
|
else:
|
|
norm_annotation[key] = value
|
|
return norm_annotation
|
|
|
|
|
|
# Copied from transformers.models.vilt.image_processing_vilt.max_across_indices
|
|
def max_across_indices(values: Iterable[Any]) -> List[Any]:
|
|
"""
|
|
Return the maximum value across all indices of an iterable of values.
|
|
"""
|
|
return [max(values_i) for values_i in zip(*values)]
|
|
|
|
|
|
# Copied from transformers.models.vilt.image_processing_vilt.get_max_height_width
|
|
def get_max_height_width(images: List[np.ndarray]) -> List[int]:
|
|
"""
|
|
Get the maximum height and width across all images in a batch.
|
|
"""
|
|
input_channel_dimension = infer_channel_dimension_format(images[0])
|
|
|
|
if input_channel_dimension == ChannelDimension.FIRST:
|
|
_, max_height, max_width = max_across_indices([img.shape for img in images])
|
|
elif input_channel_dimension == ChannelDimension.LAST:
|
|
max_height, max_width, _ = max_across_indices([img.shape for img in images])
|
|
else:
|
|
raise ValueError(f"Invalid channel dimension format: {input_channel_dimension}")
|
|
return (max_height, max_width)
|
|
|
|
|
|
# Copied from transformers.models.vilt.image_processing_vilt.make_pixel_mask
|
|
def make_pixel_mask(image: np.ndarray, output_size: Tuple[int, int]) -> np.ndarray:
|
|
"""
|
|
Make a pixel mask for the image, where 1 indicates a valid pixel and 0 indicates padding.
|
|
|
|
Args:
|
|
image (`np.ndarray`):
|
|
Image to make the pixel mask for.
|
|
output_size (`Tuple[int, int]`):
|
|
Output size of the mask.
|
|
"""
|
|
input_height, input_width = get_image_size(image)
|
|
mask = np.zeros(output_size, dtype=np.int64)
|
|
mask[:input_height, :input_width] = 1
|
|
return mask
|
|
|
|
|
|
# inspired by https://github.com/facebookresearch/detr/blob/master/datasets/coco.py#L33
|
|
def convert_coco_poly_to_mask(segmentations, height: int, width: int) -> np.ndarray:
|
|
"""
|
|
Convert a COCO polygon annotation to a mask.
|
|
|
|
Args:
|
|
segmentations (`List[List[float]]`):
|
|
List of polygons, each polygon represented by a list of x-y coordinates.
|
|
height (`int`):
|
|
Height of the mask.
|
|
width (`int`):
|
|
Width of the mask.
|
|
"""
|
|
try:
|
|
from pycocotools import mask as coco_mask
|
|
except ImportError:
|
|
raise ImportError("Pycocotools is not installed in your environment.")
|
|
|
|
masks = []
|
|
for polygons in segmentations:
|
|
rles = coco_mask.frPyObjects(polygons, height, width)
|
|
mask = coco_mask.decode(rles)
|
|
if len(mask.shape) < 3:
|
|
mask = mask[..., None]
|
|
mask = np.asarray(mask, dtype=np.uint8)
|
|
mask = np.any(mask, axis=2)
|
|
masks.append(mask)
|
|
if masks:
|
|
masks = np.stack(masks, axis=0)
|
|
else:
|
|
masks = np.zeros((0, height, width), dtype=np.uint8)
|
|
|
|
return masks
|
|
|
|
|
|
# inspired by https://github.com/facebookresearch/detr/blob/master/datasets/coco.py#L50
|
|
def prepare_coco_detection_annotation(image, target, return_segmentation_masks: bool = False):
|
|
"""
|
|
Convert the target in COCO format into the format expected by DETR.
|
|
"""
|
|
image_height, image_width = get_image_size(image)
|
|
|
|
image_id = target["image_id"]
|
|
image_id = np.asarray([image_id], dtype=np.int64)
|
|
|
|
# Get all COCO annotations for the given image.
|
|
annotations = target["annotations"]
|
|
annotations = [obj for obj in annotations if "iscrowd" not in obj or obj["iscrowd"] == 0]
|
|
|
|
classes = [obj["category_id"] for obj in annotations]
|
|
classes = np.asarray(classes, dtype=np.int64)
|
|
|
|
# for conversion to coco api
|
|
area = np.asarray([obj["area"] for obj in annotations], dtype=np.float32)
|
|
iscrowd = np.asarray([obj["iscrowd"] if "iscrowd" in obj else 0 for obj in annotations], dtype=np.int64)
|
|
|
|
boxes = [obj["bbox"] for obj in annotations]
|
|
# guard against no boxes via resizing
|
|
boxes = np.asarray(boxes, dtype=np.float32).reshape(-1, 4)
|
|
boxes[:, 2:] += boxes[:, :2]
|
|
boxes[:, 0::2] = boxes[:, 0::2].clip(min=0, max=image_width)
|
|
boxes[:, 1::2] = boxes[:, 1::2].clip(min=0, max=image_height)
|
|
|
|
keep = (boxes[:, 3] > boxes[:, 1]) & (boxes[:, 2] > boxes[:, 0])
|
|
|
|
new_target = {}
|
|
new_target["image_id"] = image_id
|
|
new_target["class_labels"] = classes[keep]
|
|
new_target["boxes"] = boxes[keep]
|
|
new_target["area"] = area[keep]
|
|
new_target["iscrowd"] = iscrowd[keep]
|
|
new_target["orig_size"] = np.asarray([int(image_height), int(image_width)], dtype=np.int64)
|
|
|
|
if annotations and "keypoints" in annotations[0]:
|
|
keypoints = [obj["keypoints"] for obj in annotations]
|
|
keypoints = np.asarray(keypoints, dtype=np.float32)
|
|
num_keypoints = keypoints.shape[0]
|
|
keypoints = keypoints.reshape((-1, 3)) if num_keypoints else keypoints
|
|
new_target["keypoints"] = keypoints[keep]
|
|
|
|
if return_segmentation_masks:
|
|
segmentation_masks = [obj["segmentation"] for obj in annotations]
|
|
masks = convert_coco_poly_to_mask(segmentation_masks, image_height, image_width)
|
|
new_target["masks"] = masks[keep]
|
|
|
|
return new_target
|
|
|
|
|
|
def masks_to_boxes(masks: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Compute the bounding boxes around the provided panoptic segmentation masks.
|
|
|
|
Args:
|
|
masks: masks in format `[number_masks, height, width]` where N is the number of masks
|
|
|
|
Returns:
|
|
boxes: bounding boxes in format `[number_masks, 4]` in xyxy format
|
|
"""
|
|
if masks.size == 0:
|
|
return np.zeros((0, 4))
|
|
|
|
h, w = masks.shape[-2:]
|
|
y = np.arange(0, h, dtype=np.float32)
|
|
x = np.arange(0, w, dtype=np.float32)
|
|
# see https://github.com/pytorch/pytorch/issues/50276
|
|
y, x = np.meshgrid(y, x, indexing="ij")
|
|
|
|
x_mask = masks * np.expand_dims(x, axis=0)
|
|
x_max = x_mask.reshape(x_mask.shape[0], -1).max(-1)
|
|
x = np.ma.array(x_mask, mask=~(np.array(masks, dtype=bool)))
|
|
x_min = x.filled(fill_value=1e8)
|
|
x_min = x_min.reshape(x_min.shape[0], -1).min(-1)
|
|
|
|
y_mask = masks * np.expand_dims(y, axis=0)
|
|
y_max = y_mask.reshape(x_mask.shape[0], -1).max(-1)
|
|
y = np.ma.array(y_mask, mask=~(np.array(masks, dtype=bool)))
|
|
y_min = y.filled(fill_value=1e8)
|
|
y_min = y_min.reshape(y_min.shape[0], -1).min(-1)
|
|
|
|
return np.stack([x_min, y_min, x_max, y_max], 1)
|
|
|
|
|
|
def prepare_coco_panoptic_annotation(
|
|
image: np.ndarray, target: Dict, masks_path: Union[str, pathlib.Path], return_masks: bool = True
|
|
) -> Dict:
|
|
"""
|
|
Prepare a coco panoptic annotation for DETR.
|
|
"""
|
|
image_height, image_width = get_image_size(image)
|
|
annotation_path = pathlib.Path(masks_path) / target["file_name"]
|
|
|
|
new_target = {}
|
|
new_target["image_id"] = np.asarray([target["image_id"] if "image_id" in target else target["id"]], dtype=np.int64)
|
|
new_target["size"] = np.asarray([image_height, image_width], dtype=np.int64)
|
|
new_target["orig_size"] = np.asarray([image_height, image_width], dtype=np.int64)
|
|
|
|
if "segments_info" in target:
|
|
masks = np.asarray(PIL.Image.open(annotation_path), dtype=np.uint32)
|
|
masks = rgb_to_id(masks)
|
|
|
|
ids = np.array([segment_info["id"] for segment_info in target["segments_info"]])
|
|
masks = masks == ids[:, None, None]
|
|
masks = masks.astype(np.uint8)
|
|
if return_masks:
|
|
new_target["masks"] = masks
|
|
new_target["boxes"] = masks_to_boxes(masks)
|
|
new_target["class_labels"] = np.array(
|
|
[segment_info["category_id"] for segment_info in target["segments_info"]], dtype=np.int64
|
|
)
|
|
new_target["iscrowd"] = np.asarray(
|
|
[segment_info["iscrowd"] for segment_info in target["segments_info"]], dtype=np.int64
|
|
)
|
|
new_target["area"] = np.asarray(
|
|
[segment_info["area"] for segment_info in target["segments_info"]], dtype=np.float32
|
|
)
|
|
|
|
return new_target
|
|
|
|
|
|
def get_segmentation_image(
|
|
masks: np.ndarray, input_size: Tuple, target_size: Tuple, stuff_equiv_classes, deduplicate=False
|
|
):
|
|
h, w = input_size
|
|
final_h, final_w = target_size
|
|
|
|
m_id = scipy.special.softmax(masks.transpose(0, 1), -1)
|
|
|
|
if m_id.shape[-1] == 0:
|
|
# We didn't detect any mask :(
|
|
m_id = np.zeros((h, w), dtype=np.int64)
|
|
else:
|
|
m_id = m_id.argmax(-1).reshape(h, w)
|
|
|
|
if deduplicate:
|
|
# Merge the masks corresponding to the same stuff class
|
|
for equiv in stuff_equiv_classes.values():
|
|
for eq_id in equiv:
|
|
m_id[m_id == eq_id] = equiv[0]
|
|
|
|
seg_img = id_to_rgb(m_id)
|
|
seg_img = resize(seg_img, (final_w, final_h), resample=PILImageResampling.NEAREST)
|
|
return seg_img
|
|
|
|
|
|
def get_mask_area(seg_img: np.ndarray, target_size: Tuple[int, int], n_classes: int) -> np.ndarray:
|
|
final_h, final_w = target_size
|
|
np_seg_img = seg_img.astype(np.uint8)
|
|
np_seg_img = np_seg_img.reshape(final_h, final_w, 3)
|
|
m_id = rgb_to_id(np_seg_img)
|
|
area = [(m_id == i).sum() for i in range(n_classes)]
|
|
return area
|
|
|
|
|
|
def score_labels_from_class_probabilities(logits: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
|
probs = scipy.special.softmax(logits, axis=-1)
|
|
labels = probs.argmax(-1, keepdims=True)
|
|
scores = np.take_along_axis(probs, labels, axis=-1)
|
|
scores, labels = scores.squeeze(-1), labels.squeeze(-1)
|
|
return scores, labels
|
|
|
|
|
|
def post_process_panoptic_sample(
|
|
out_logits: np.ndarray,
|
|
masks: np.ndarray,
|
|
boxes: np.ndarray,
|
|
processed_size: Tuple[int, int],
|
|
target_size: Tuple[int, int],
|
|
is_thing_map: Dict,
|
|
threshold=0.85,
|
|
) -> Dict:
|
|
"""
|
|
Converts the output of [`DetrForSegmentation`] into panoptic segmentation predictions for a single sample.
|
|
|
|
Args:
|
|
out_logits (`torch.Tensor`):
|
|
The logits for this sample.
|
|
masks (`torch.Tensor`):
|
|
The predicted segmentation masks for this sample.
|
|
boxes (`torch.Tensor`):
|
|
The prediced bounding boxes for this sample. The boxes are in the normalized format `(center_x, center_y,
|
|
width, height)` and values between `[0, 1]`, relative to the size the image (disregarding padding).
|
|
processed_size (`Tuple[int, int]`):
|
|
The processed size of the image `(height, width)`, as returned by the preprocessing step i.e. the size
|
|
after data augmentation but before batching.
|
|
target_size (`Tuple[int, int]`):
|
|
The target size of the image, `(height, width)` corresponding to the requested final size of the
|
|
prediction.
|
|
is_thing_map (`Dict`):
|
|
A dictionary mapping class indices to a boolean value indicating whether the class is a thing or not.
|
|
threshold (`float`, *optional*, defaults to 0.85):
|
|
The threshold used to binarize the segmentation masks.
|
|
"""
|
|
# we filter empty queries and detection below threshold
|
|
scores, labels = score_labels_from_class_probabilities(out_logits)
|
|
keep = (labels != out_logits.shape[-1] - 1) & (scores > threshold)
|
|
|
|
cur_scores = scores[keep]
|
|
cur_classes = labels[keep]
|
|
cur_boxes = center_to_corners_format(boxes[keep])
|
|
|
|
if len(cur_boxes) != len(cur_classes):
|
|
raise ValueError("Not as many boxes as there are classes")
|
|
|
|
cur_masks = masks[keep]
|
|
cur_masks = resize(cur_masks[:, None], processed_size, resample=PILImageResampling.BILINEAR)
|
|
cur_masks = safe_squeeze(cur_masks, 1)
|
|
b, h, w = cur_masks.shape
|
|
|
|
# It may be that we have several predicted masks for the same stuff class.
|
|
# In the following, we track the list of masks ids for each stuff class (they are merged later on)
|
|
cur_masks = cur_masks.reshape(b, -1)
|
|
stuff_equiv_classes = defaultdict(list)
|
|
for k, label in enumerate(cur_classes):
|
|
if not is_thing_map[label]:
|
|
stuff_equiv_classes[label].append(k)
|
|
|
|
seg_img = get_segmentation_image(cur_masks, processed_size, target_size, stuff_equiv_classes, deduplicate=True)
|
|
area = get_mask_area(cur_masks, processed_size, n_classes=len(cur_scores))
|
|
|
|
# We filter out any mask that is too small
|
|
if cur_classes.size() > 0:
|
|
# We know filter empty masks as long as we find some
|
|
filtered_small = np.array([a <= 4 for a in area], dtype=bool)
|
|
while filtered_small.any():
|
|
cur_masks = cur_masks[~filtered_small]
|
|
cur_scores = cur_scores[~filtered_small]
|
|
cur_classes = cur_classes[~filtered_small]
|
|
seg_img = get_segmentation_image(cur_masks, (h, w), target_size, stuff_equiv_classes, deduplicate=True)
|
|
area = get_mask_area(seg_img, target_size, n_classes=len(cur_scores))
|
|
filtered_small = np.array([a <= 4 for a in area], dtype=bool)
|
|
else:
|
|
cur_classes = np.ones((1, 1), dtype=np.int64)
|
|
|
|
segments_info = [
|
|
{"id": i, "isthing": is_thing_map[cat], "category_id": int(cat), "area": a}
|
|
for i, (cat, a) in enumerate(zip(cur_classes, area))
|
|
]
|
|
del cur_classes
|
|
|
|
with io.BytesIO() as out:
|
|
PIL.Image.fromarray(seg_img).save(out, format="PNG")
|
|
predictions = {"png_string": out.getvalue(), "segments_info": segments_info}
|
|
|
|
return predictions
|
|
|
|
|
|
def resize_annotation(
|
|
annotation: Dict[str, Any],
|
|
orig_size: Tuple[int, int],
|
|
target_size: Tuple[int, int],
|
|
threshold: float = 0.5,
|
|
resample: PILImageResampling = PILImageResampling.NEAREST,
|
|
):
|
|
"""
|
|
Resizes an annotation to a target size.
|
|
|
|
Args:
|
|
annotation (`Dict[str, Any]`):
|
|
The annotation dictionary.
|
|
orig_size (`Tuple[int, int]`):
|
|
The original size of the input image.
|
|
target_size (`Tuple[int, int]`):
|
|
The target size of the image, as returned by the preprocessing `resize` step.
|
|
threshold (`float`, *optional*, defaults to 0.5):
|
|
The threshold used to binarize the segmentation masks.
|
|
resample (`PILImageResampling`, defaults to `PILImageResampling.NEAREST`):
|
|
The resampling filter to use when resizing the masks.
|
|
"""
|
|
ratios = tuple(float(s) / float(s_orig) for s, s_orig in zip(target_size, orig_size))
|
|
ratio_height, ratio_width = ratios
|
|
|
|
new_annotation = {}
|
|
new_annotation["size"] = target_size
|
|
|
|
for key, value in annotation.items():
|
|
if key == "boxes":
|
|
boxes = value
|
|
scaled_boxes = boxes * np.asarray([ratio_width, ratio_height, ratio_width, ratio_height], dtype=np.float32)
|
|
new_annotation["boxes"] = scaled_boxes
|
|
elif key == "area":
|
|
area = value
|
|
scaled_area = area * (ratio_width * ratio_height)
|
|
new_annotation["area"] = scaled_area
|
|
elif key == "masks":
|
|
masks = value[:, None]
|
|
masks = np.array([resize(mask, target_size, resample=resample) for mask in masks])
|
|
masks = masks.astype(np.float32)
|
|
masks = masks[:, 0] > threshold
|
|
new_annotation["masks"] = masks
|
|
elif key == "size":
|
|
new_annotation["size"] = target_size
|
|
else:
|
|
new_annotation[key] = value
|
|
|
|
return new_annotation
|
|
|
|
|
|
# TODO - (Amy) make compatible with other frameworks
|
|
def binary_mask_to_rle(mask):
|
|
"""
|
|
Converts given binary mask of shape `(height, width)` to the run-length encoding (RLE) format.
|
|
|
|
Args:
|
|
mask (`torch.Tensor` or `numpy.array`):
|
|
A binary mask tensor of shape `(height, width)` where 0 denotes background and 1 denotes the target
|
|
segment_id or class_id.
|
|
Returns:
|
|
`List`: Run-length encoded list of the binary mask. Refer to COCO API for more information about the RLE
|
|
format.
|
|
"""
|
|
if is_torch_tensor(mask):
|
|
mask = mask.numpy()
|
|
|
|
pixels = mask.flatten()
|
|
pixels = np.concatenate([[0], pixels, [0]])
|
|
runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
|
|
runs[1::2] -= runs[::2]
|
|
return [x for x in runs]
|
|
|
|
|
|
# TODO - (Amy) make compatible with other frameworks
|
|
def convert_segmentation_to_rle(segmentation):
|
|
"""
|
|
Converts given segmentation map of shape `(height, width)` to the run-length encoding (RLE) format.
|
|
|
|
Args:
|
|
segmentation (`torch.Tensor` or `numpy.array`):
|
|
A segmentation map of shape `(height, width)` where each value denotes a segment or class id.
|
|
Returns:
|
|
`List[List]`: A list of lists, where each list is the run-length encoding of a segment / class id.
|
|
"""
|
|
segment_ids = torch.unique(segmentation)
|
|
|
|
run_length_encodings = []
|
|
for idx in segment_ids:
|
|
mask = torch.where(segmentation == idx, 1, 0)
|
|
rle = binary_mask_to_rle(mask)
|
|
run_length_encodings.append(rle)
|
|
|
|
return run_length_encodings
|
|
|
|
|
|
def remove_low_and_no_objects(masks, scores, labels, object_mask_threshold, num_labels):
|
|
"""
|
|
Binarize the given masks using `object_mask_threshold`, it returns the associated values of `masks`, `scores` and
|
|
`labels`.
|
|
|
|
Args:
|
|
masks (`torch.Tensor`):
|
|
A tensor of shape `(num_queries, height, width)`.
|
|
scores (`torch.Tensor`):
|
|
A tensor of shape `(num_queries)`.
|
|
labels (`torch.Tensor`):
|
|
A tensor of shape `(num_queries)`.
|
|
object_mask_threshold (`float`):
|
|
A number between 0 and 1 used to binarize the masks.
|
|
Raises:
|
|
`ValueError`: Raised when the first dimension doesn't match in all input tensors.
|
|
Returns:
|
|
`Tuple[`torch.Tensor`, `torch.Tensor`, `torch.Tensor`]`: The `masks`, `scores` and `labels` without the region
|
|
< `object_mask_threshold`.
|
|
"""
|
|
if not (masks.shape[0] == scores.shape[0] == labels.shape[0]):
|
|
raise ValueError("mask, scores and labels must have the same shape!")
|
|
|
|
to_keep = labels.ne(num_labels) & (scores > object_mask_threshold)
|
|
|
|
return masks[to_keep], scores[to_keep], labels[to_keep]
|
|
|
|
|
|
def check_segment_validity(mask_labels, mask_probs, k, mask_threshold=0.5, overlap_mask_area_threshold=0.8):
|
|
# Get the mask associated with the k class
|
|
mask_k = mask_labels == k
|
|
mask_k_area = mask_k.sum()
|
|
|
|
# Compute the area of all the stuff in query k
|
|
original_area = (mask_probs[k] >= mask_threshold).sum()
|
|
mask_exists = mask_k_area > 0 and original_area > 0
|
|
|
|
# Eliminate disconnected tiny segments
|
|
if mask_exists:
|
|
area_ratio = mask_k_area / original_area
|
|
if not area_ratio.item() > overlap_mask_area_threshold:
|
|
mask_exists = False
|
|
|
|
return mask_exists, mask_k
|
|
|
|
|
|
def compute_segments(
|
|
mask_probs,
|
|
pred_scores,
|
|
pred_labels,
|
|
mask_threshold: float = 0.5,
|
|
overlap_mask_area_threshold: float = 0.8,
|
|
label_ids_to_fuse: Optional[Set[int]] = None,
|
|
target_size: Tuple[int, int] = None,
|
|
):
|
|
height = mask_probs.shape[1] if target_size is None else target_size[0]
|
|
width = mask_probs.shape[2] if target_size is None else target_size[1]
|
|
|
|
segmentation = torch.zeros((height, width), dtype=torch.int32, device=mask_probs.device)
|
|
segments: List[Dict] = []
|
|
|
|
if target_size is not None:
|
|
mask_probs = nn.functional.interpolate(
|
|
mask_probs.unsqueeze(0), size=target_size, mode="bilinear", align_corners=False
|
|
)[0]
|
|
|
|
current_segment_id = 0
|
|
|
|
# Weigh each mask by its prediction score
|
|
mask_probs *= pred_scores.view(-1, 1, 1)
|
|
mask_labels = mask_probs.argmax(0) # [height, width]
|
|
|
|
# Keep track of instances of each class
|
|
stuff_memory_list: Dict[str, int] = {}
|
|
for k in range(pred_labels.shape[0]):
|
|
pred_class = pred_labels[k].item()
|
|
should_fuse = pred_class in label_ids_to_fuse
|
|
|
|
# Check if mask exists and large enough to be a segment
|
|
mask_exists, mask_k = check_segment_validity(
|
|
mask_labels, mask_probs, k, mask_threshold, overlap_mask_area_threshold
|
|
)
|
|
|
|
if mask_exists:
|
|
if pred_class in stuff_memory_list:
|
|
current_segment_id = stuff_memory_list[pred_class]
|
|
else:
|
|
current_segment_id += 1
|
|
|
|
# Add current object segment to final segmentation map
|
|
segmentation[mask_k] = current_segment_id
|
|
segment_score = round(pred_scores[k].item(), 6)
|
|
segments.append(
|
|
{
|
|
"id": current_segment_id,
|
|
"label_id": pred_class,
|
|
"was_fused": should_fuse,
|
|
"score": segment_score,
|
|
}
|
|
)
|
|
if should_fuse:
|
|
stuff_memory_list[pred_class] = current_segment_id
|
|
|
|
return segmentation, segments
|
|
|
|
|
|
class DetrImageProcessor(BaseImageProcessor):
|
|
r"""
|
|
Constructs a Detr image processor.
|
|
|
|
Args:
|
|
format (`str`, *optional*, defaults to `"coco_detection"`):
|
|
Data format of the annotations. One of "coco_detection" or "coco_panoptic".
|
|
do_resize (`bool`, *optional*, defaults to `True`):
|
|
Controls whether to resize the image's `(height, width)` dimensions to the specified `size`. Can be
|
|
overridden by the `do_resize` parameter in the `preprocess` method.
|
|
size (`Dict[str, int]` *optional*, defaults to `{"shortest_edge": 800, "longest_edge": 1333}`):
|
|
Size of the image's `(height, width)` dimensions after resizing. Can be overridden by the `size` parameter
|
|
in the `preprocess` method.
|
|
resample (`PILImageResampling`, *optional*, defaults to `PILImageResampling.BILINEAR`):
|
|
Resampling filter to use if resizing the image.
|
|
do_rescale (`bool`, *optional*, defaults to `True`):
|
|
Controls whether to rescale the image by the specified scale `rescale_factor`. Can be overridden by the
|
|
`do_rescale` parameter in the `preprocess` method.
|
|
rescale_factor (`int` or `float`, *optional*, defaults to `1/255`):
|
|
Scale factor to use if rescaling the image. Can be overridden by the `rescale_factor` parameter in the
|
|
`preprocess` method.
|
|
do_normalize:
|
|
Controls whether to normalize the image. Can be overridden by the `do_normalize` parameter in the
|
|
`preprocess` method.
|
|
image_mean (`float` or `List[float]`, *optional*, defaults to `IMAGENET_DEFAULT_MEAN`):
|
|
Mean values to use when normalizing the image. Can be a single value or a list of values, one for each
|
|
channel. Can be overridden by the `image_mean` parameter in the `preprocess` method.
|
|
image_std (`float` or `List[float]`, *optional*, defaults to `IMAGENET_DEFAULT_STD`):
|
|
Standard deviation values to use when normalizing the image. Can be a single value or a list of values, one
|
|
for each channel. Can be overridden by the `image_std` parameter in the `preprocess` method.
|
|
do_pad (`bool`, *optional*, defaults to `True`):
|
|
Controls whether to pad the image to the largest image in a batch and create a pixel mask. Can be
|
|
overridden by the `do_pad` parameter in the `preprocess` method.
|
|
"""
|
|
|
|
model_input_names = ["pixel_values", "pixel_mask"]
|
|
|
|
def __init__(
|
|
self,
|
|
format: Union[str, AnnotionFormat] = AnnotionFormat.COCO_DETECTION,
|
|
do_resize: bool = True,
|
|
size: Dict[str, int] = None,
|
|
resample: PILImageResampling = PILImageResampling.BILINEAR,
|
|
do_rescale: bool = True,
|
|
rescale_factor: Union[int, float] = 1 / 255,
|
|
do_normalize: bool = True,
|
|
image_mean: Union[float, List[float]] = None,
|
|
image_std: Union[float, List[float]] = None,
|
|
do_pad: bool = True,
|
|
**kwargs,
|
|
) -> None:
|
|
if "pad_and_return_pixel_mask" in kwargs:
|
|
do_pad = kwargs.pop("pad_and_return_pixel_mask")
|
|
|
|
if "max_size" in kwargs:
|
|
warnings.warn(
|
|
"The `max_size` parameter is deprecated and will be removed in v4.26. "
|
|
"Please specify in `size['longest_edge'] instead`.",
|
|
FutureWarning,
|
|
)
|
|
max_size = kwargs.pop("max_size")
|
|
else:
|
|
max_size = None if size is None else 1333
|
|
|
|
size = size if size is not None else {"shortest_edge": 800, "longest_edge": 1333}
|
|
size = get_size_dict(size, max_size=max_size, default_to_square=False)
|
|
|
|
super().__init__(**kwargs)
|
|
self.format = format
|
|
self.do_resize = do_resize
|
|
self.size = size
|
|
self.resample = resample
|
|
self.do_rescale = do_rescale
|
|
self.rescale_factor = rescale_factor
|
|
self.do_normalize = do_normalize
|
|
self.image_mean = image_mean if image_mean is not None else IMAGENET_DEFAULT_MEAN
|
|
self.image_std = image_std if image_std is not None else IMAGENET_DEFAULT_STD
|
|
self.do_pad = do_pad
|
|
|
|
@property
|
|
def max_size(self):
|
|
warnings.warn(
|
|
"The `max_size` parameter is deprecated and will be removed in v4.27. "
|
|
"Please specify in `size['longest_edge'] instead`.",
|
|
FutureWarning,
|
|
)
|
|
return self.size["longest_edge"]
|
|
|
|
@classmethod
|
|
def from_dict(cls, image_processor_dict: Dict[str, Any], **kwargs):
|
|
"""
|
|
Overrides the `from_dict` method from the base class to make sure parameters are updated if image processor is
|
|
created using from_dict and kwargs e.g. `DetrImageProcessor.from_pretrained(checkpoint, size=600,
|
|
max_size=800)`
|
|
"""
|
|
image_processor_dict = image_processor_dict.copy()
|
|
if "max_size" in kwargs:
|
|
image_processor_dict["max_size"] = kwargs.pop("max_size")
|
|
if "pad_and_return_pixel_mask" in kwargs:
|
|
image_processor_dict["pad_and_return_pixel_mask"] = kwargs.pop("pad_and_return_pixel_mask")
|
|
return super().from_dict(image_processor_dict, **kwargs)
|
|
|
|
def prepare_annotation(
|
|
self,
|
|
image: np.ndarray,
|
|
target: Dict,
|
|
format: Optional[AnnotionFormat] = None,
|
|
return_segmentation_masks: bool = None,
|
|
masks_path: Optional[Union[str, pathlib.Path]] = None,
|
|
) -> Dict:
|
|
"""
|
|
Prepare an annotation for feeding into DETR model.
|
|
"""
|
|
format = format if format is not None else self.format
|
|
|
|
if format == AnnotionFormat.COCO_DETECTION:
|
|
return_segmentation_masks = False if return_segmentation_masks is None else return_segmentation_masks
|
|
target = prepare_coco_detection_annotation(image, target, return_segmentation_masks)
|
|
elif format == AnnotionFormat.COCO_PANOPTIC:
|
|
return_segmentation_masks = True if return_segmentation_masks is None else return_segmentation_masks
|
|
target = prepare_coco_panoptic_annotation(
|
|
image, target, masks_path=masks_path, return_masks=return_segmentation_masks
|
|
)
|
|
else:
|
|
raise ValueError(f"Format {format} is not supported.")
|
|
return target
|
|
|
|
def prepare(self, image, target, return_segmentation_masks=None, masks_path=None):
|
|
warnings.warn(
|
|
"The `prepare` method is deprecated and will be removed in a future version. "
|
|
"Please use `prepare_annotation` instead. Note: the `prepare_annotation` method "
|
|
"does not return the image anymore.",
|
|
)
|
|
target = self.prepare_annotation(image, target, return_segmentation_masks, masks_path, self.format)
|
|
return image, target
|
|
|
|
def convert_coco_poly_to_mask(self, *args, **kwargs):
|
|
warnings.warn("The `convert_coco_poly_to_mask` method is deprecated and will be removed in a future version. ")
|
|
return convert_coco_poly_to_mask(*args, **kwargs)
|
|
|
|
def prepare_coco_detection(self, *args, **kwargs):
|
|
warnings.warn("The `prepare_coco_detection` method is deprecated and will be removed in a future version. ")
|
|
return prepare_coco_detection_annotation(*args, **kwargs)
|
|
|
|
def prepare_coco_panoptic(self, *args, **kwargs):
|
|
warnings.warn("The `prepare_coco_panoptic` method is deprecated and will be removed in a future version. ")
|
|
return prepare_coco_panoptic_annotation(*args, **kwargs)
|
|
|
|
def resize(
|
|
self,
|
|
image: np.ndarray,
|
|
size: Dict[str, int],
|
|
resample: PILImageResampling = PILImageResampling.BILINEAR,
|
|
data_format: Optional[ChannelDimension] = None,
|
|
**kwargs,
|
|
) -> np.ndarray:
|
|
"""
|
|
Resize the image to the given size. Size can be `min_size` (scalar) or `(height, width)` tuple. If size is an
|
|
int, smaller edge of the image will be matched to this number.
|
|
"""
|
|
if "max_size" in kwargs:
|
|
warnings.warn(
|
|
"The `max_size` parameter is deprecated and will be removed in v4.26. "
|
|
"Please specify in `size['longest_edge'] instead`.",
|
|
FutureWarning,
|
|
)
|
|
max_size = kwargs.pop("max_size")
|
|
else:
|
|
max_size = None
|
|
size = get_size_dict(size, max_size=max_size, default_to_square=False)
|
|
if "shortest_edge" in size and "longest_edge" in size:
|
|
size = get_resize_output_image_size(image, size["shortest_edge"], size["longest_edge"])
|
|
elif "height" in size and "width" in size:
|
|
size = (size["height"], size["width"])
|
|
else:
|
|
raise ValueError(
|
|
"Size must contain 'height' and 'width' keys or 'shortest_edge' and 'longest_edge' keys. Got"
|
|
f" {size.keys()}."
|
|
)
|
|
image = resize(image, size=size, resample=resample, data_format=data_format)
|
|
return image
|
|
|
|
def resize_annotation(
|
|
self,
|
|
annotation,
|
|
orig_size,
|
|
size,
|
|
resample: PILImageResampling = PILImageResampling.NEAREST,
|
|
) -> Dict:
|
|
"""
|
|
Resize the annotation to match the resized image. If size is an int, smaller edge of the mask will be matched
|
|
to this number.
|
|
"""
|
|
return resize_annotation(annotation, orig_size=orig_size, target_size=size, resample=resample)
|
|
|
|
def rescale(
|
|
self, image: np.ndarray, rescale_factor: Union[float, int], data_format: Optional[ChannelDimension] = None
|
|
) -> np.ndarray:
|
|
"""
|
|
Rescale the image by the given factor.
|
|
"""
|
|
return rescale(image, rescale_factor, data_format=data_format)
|
|
|
|
def normalize(
|
|
self,
|
|
image: np.ndarray,
|
|
mean: Union[float, Iterable[float]],
|
|
std: Union[float, Iterable[float]],
|
|
data_format: Optional[ChannelDimension] = None,
|
|
) -> np.ndarray:
|
|
"""
|
|
Normalize the image with the given mean and standard deviation.
|
|
"""
|
|
return normalize(image, mean=mean, std=std, data_format=data_format)
|
|
|
|
def normalize_annotation(self, annotation: Dict, image_size: Tuple[int, int]) -> Dict:
|
|
"""
|
|
Normalize the boxes in the annotation from `[top_left_x, top_left_y, bottom_right_x, bottom_right_y]` to
|
|
`[center_x, center_y, width, height]` format.
|
|
"""
|
|
return normalize_annotation(annotation, image_size=image_size)
|
|
|
|
def pad_and_create_pixel_mask(
|
|
self,
|
|
pixel_values_list: List[ImageInput],
|
|
return_tensors: Optional[Union[str, TensorType]] = None,
|
|
data_format: Optional[ChannelDimension] = None,
|
|
) -> BatchFeature:
|
|
"""
|
|
Pads a batch of images with zeros to the size of largest height and width in the batch and returns their
|
|
corresponding pixel mask.
|
|
|
|
Args:
|
|
images (`List[np.ndarray]`):
|
|
Batch of images to pad.
|
|
return_tensors (`str` or `TensorType`, *optional*):
|
|
The type of tensors to return. Can be one of:
|
|
- Unset: Return a list of `np.ndarray`.
|
|
- `TensorType.TENSORFLOW` or `'tf'`: Return a batch of type `tf.Tensor`.
|
|
- `TensorType.PYTORCH` or `'pt'`: Return a batch of type `torch.Tensor`.
|
|
- `TensorType.NUMPY` or `'np'`: Return a batch of type `np.ndarray`.
|
|
- `TensorType.JAX` or `'jax'`: Return a batch of type `jax.numpy.ndarray`.
|
|
data_format (`str` or `ChannelDimension`, *optional*):
|
|
The channel dimension format of the image. If not provided, it will be the same as the input image.
|
|
"""
|
|
warnings.warn(
|
|
"This method is deprecated and will be removed in v4.27.0. Please use pad instead.", FutureWarning
|
|
)
|
|
# pad expects a list of np.ndarray, but the previous feature extractors expected torch tensors
|
|
images = [to_numpy_array(image) for image in pixel_values_list]
|
|
return self.pad(
|
|
images=images,
|
|
return_pixel_mask=True,
|
|
return_tensors=return_tensors,
|
|
data_format=data_format,
|
|
)
|
|
|
|
def _pad_image(
|
|
self,
|
|
image: np.ndarray,
|
|
output_size: Tuple[int, int],
|
|
constant_values: Union[float, Iterable[float]] = 0,
|
|
data_format: Optional[ChannelDimension] = None,
|
|
) -> np.ndarray:
|
|
"""
|
|
Pad an image with zeros to the given size.
|
|
"""
|
|
input_height, input_width = get_image_size(image)
|
|
output_height, output_width = output_size
|
|
|
|
pad_bottom = output_height - input_height
|
|
pad_right = output_width - input_width
|
|
padding = ((0, pad_bottom), (0, pad_right))
|
|
padded_image = pad(
|
|
image, padding, mode=PaddingMode.CONSTANT, constant_values=constant_values, data_format=data_format
|
|
)
|
|
return padded_image
|
|
|
|
def pad(
|
|
self,
|
|
images: List[np.ndarray],
|
|
constant_values: Union[float, Iterable[float]] = 0,
|
|
return_pixel_mask: bool = True,
|
|
return_tensors: Optional[Union[str, TensorType]] = None,
|
|
data_format: Optional[ChannelDimension] = None,
|
|
) -> np.ndarray:
|
|
"""
|
|
Pads a batch of images to the bottom and right of the image with zeros to the size of largest height and width
|
|
in the batch and optionally returns their corresponding pixel mask.
|
|
|
|
Args:
|
|
image (`np.ndarray`):
|
|
Image to pad.
|
|
constant_values (`float` or `Iterable[float]`, *optional*):
|
|
The value to use for the padding if `mode` is `"constant"`.
|
|
return_pixel_mask (`bool`, *optional*, defaults to `True`):
|
|
Whether to return a pixel mask.
|
|
input_channel_dimension (`ChannelDimension`, *optional*):
|
|
The channel dimension format of the image. If not provided, it will be inferred from the input image.
|
|
data_format (`str` or `ChannelDimension`, *optional*):
|
|
The channel dimension format of the image. If not provided, it will be the same as the input image.
|
|
"""
|
|
pad_size = get_max_height_width(images)
|
|
|
|
padded_images = [
|
|
self._pad_image(image, pad_size, constant_values=constant_values, data_format=data_format)
|
|
for image in images
|
|
]
|
|
data = {"pixel_values": padded_images}
|
|
|
|
if return_pixel_mask:
|
|
masks = [make_pixel_mask(image=image, output_size=pad_size) for image in images]
|
|
data["pixel_mask"] = masks
|
|
|
|
return BatchFeature(data=data, tensor_type=return_tensors)
|
|
|
|
def preprocess(
|
|
self,
|
|
images: ImageInput,
|
|
annotations: Optional[Union[AnnotationType, List[AnnotationType]]] = None,
|
|
return_segmentation_masks: bool = None,
|
|
masks_path: Optional[Union[str, pathlib.Path]] = None,
|
|
do_resize: Optional[bool] = None,
|
|
size: Optional[Dict[str, int]] = None,
|
|
resample=None, # PILImageResampling
|
|
do_rescale: Optional[bool] = None,
|
|
rescale_factor: Optional[Union[int, float]] = None,
|
|
do_normalize: Optional[bool] = None,
|
|
image_mean: Optional[Union[float, List[float]]] = None,
|
|
image_std: Optional[Union[float, List[float]]] = None,
|
|
do_pad: Optional[bool] = None,
|
|
format: Optional[Union[str, AnnotionFormat]] = None,
|
|
return_tensors: Optional[Union[TensorType, str]] = None,
|
|
data_format: Union[str, ChannelDimension] = ChannelDimension.FIRST,
|
|
**kwargs,
|
|
) -> BatchFeature:
|
|
"""
|
|
Preprocess an image or a batch of images so that it can be used by the model.
|
|
|
|
Args:
|
|
images (`ImageInput`):
|
|
Image or batch of images to preprocess.
|
|
annotations (`AnnotationType` or `List[AnnotationType]`, *optional*):
|
|
List of annotations associated with the image or batch of images. If annotionation is for object
|
|
detection, the annotations should be a dictionary with the following keys:
|
|
- "image_id" (`int`): The image id.
|
|
- "annotations" (`List[Dict]`): List of annotations for an image. Each annotation should be a
|
|
dictionary. An image can have no annotations, in which case the list should be empty.
|
|
If annotionation is for segmentation, the annotations should be a dictionary with the following keys:
|
|
- "image_id" (`int`): The image id.
|
|
- "segments_info" (`List[Dict]`): List of segments for an image. Each segment should be a dictionary.
|
|
An image can have no segments, in which case the list should be empty.
|
|
- "file_name" (`str`): The file name of the image.
|
|
return_segmentation_masks (`bool`, *optional*, defaults to self.return_segmentation_masks):
|
|
Whether to return segmentation masks.
|
|
masks_path (`str` or `pathlib.Path`, *optional*):
|
|
Path to the directory containing the segmentation masks.
|
|
do_resize (`bool`, *optional*, defaults to self.do_resize):
|
|
Whether to resize the image.
|
|
size (`Dict[str, int]`, *optional*, defaults to self.size):
|
|
Size of the image after resizing.
|
|
resample (`PILImageResampling`, *optional*, defaults to self.resample):
|
|
Resampling filter to use when resizing the image.
|
|
do_rescale (`bool`, *optional*, defaults to self.do_rescale):
|
|
Whether to rescale the image.
|
|
rescale_factor (`float`, *optional*, defaults to self.rescale_factor):
|
|
Rescale factor to use when rescaling the image.
|
|
do_normalize (`bool`, *optional*, defaults to self.do_normalize):
|
|
Whether to normalize the image.
|
|
image_mean (`float` or `List[float]`, *optional*, defaults to self.image_mean):
|
|
Mean to use when normalizing the image.
|
|
image_std (`float` or `List[float]`, *optional*, defaults to self.image_std):
|
|
Standard deviation to use when normalizing the image.
|
|
do_pad (`bool`, *optional*, defaults to self.do_pad):
|
|
Whether to pad the image.
|
|
format (`str` or `AnnotionFormat`, *optional*, defaults to self.format):
|
|
Format of the annotations.
|
|
return_tensors (`str` or `TensorType`, *optional*, defaults to self.return_tensors):
|
|
Type of tensors to return. If `None`, will return the list of images.
|
|
data_format (`str` or `ChannelDimension`, *optional*, defaults to self.data_format):
|
|
The channel dimension format of the image. If not provided, it will be the same as the input image.
|
|
"""
|
|
if "pad_and_return_pixel_mask" in kwargs:
|
|
warnings.warn(
|
|
"The `pad_and_return_pixel_mask` argument is deprecated and will be removed in a future version, "
|
|
"use `do_pad` instead.",
|
|
FutureWarning,
|
|
)
|
|
do_pad = kwargs.pop("pad_and_return_pixel_mask")
|
|
|
|
max_size = None
|
|
if "max_size" in kwargs:
|
|
warnings.warn(
|
|
"The `max_size` argument is deprecated and will be removed in a future version, use"
|
|
" `size['longest_edge']` instead.",
|
|
FutureWarning,
|
|
)
|
|
size = kwargs.pop("max_size")
|
|
|
|
do_resize = self.do_resize if do_resize is None else do_resize
|
|
size = self.size if size is None else size
|
|
size = get_size_dict(size=size, max_size=max_size, default_to_square=False)
|
|
resample = self.resample if resample is None else resample
|
|
do_rescale = self.do_rescale if do_rescale is None else do_rescale
|
|
rescale_factor = self.rescale_factor if rescale_factor is None else rescale_factor
|
|
do_normalize = self.do_normalize if do_normalize is None else do_normalize
|
|
image_mean = self.image_mean if image_mean is None else image_mean
|
|
image_std = self.image_std if image_std is None else image_std
|
|
do_pad = self.do_pad if do_pad is None else do_pad
|
|
format = self.format if format is None else format
|
|
|
|
if do_resize is not None and size is None:
|
|
raise ValueError("Size and max_size must be specified if do_resize is True.")
|
|
|
|
if do_rescale is not None and rescale_factor is None:
|
|
raise ValueError("Rescale factor must be specified if do_rescale is True.")
|
|
|
|
if do_normalize is not None and (image_mean is None or image_std is None):
|
|
raise ValueError("Image mean and std must be specified if do_normalize is True.")
|
|
|
|
images = make_list_of_images(images)
|
|
if annotations is not None and isinstance(annotations, dict):
|
|
annotations = [annotations]
|
|
|
|
if annotations is not None and len(images) != len(annotations):
|
|
raise ValueError(
|
|
f"The number of images ({len(images)}) and annotations ({len(annotations)}) do not match."
|
|
)
|
|
|
|
if not valid_images(images):
|
|
raise ValueError(
|
|
"Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, "
|
|
"torch.Tensor, tf.Tensor or jax.ndarray."
|
|
)
|
|
|
|
format = AnnotionFormat(format)
|
|
if annotations is not None:
|
|
if format == AnnotionFormat.COCO_DETECTION and not valid_coco_detection_annotations(annotations):
|
|
raise ValueError(
|
|
"Invalid COCO detection annotations. Annotations must a dict (single image) of list of dicts"
|
|
"(batch of images) with the following keys: `image_id` and `annotations`, with the latter "
|
|
"being a list of annotations in the COCO format."
|
|
)
|
|
elif format == AnnotionFormat.COCO_PANOPTIC and not valid_coco_panoptic_annotations(annotations):
|
|
raise ValueError(
|
|
"Invalid COCO panoptic annotations. Annotations must a dict (single image) of list of dicts "
|
|
"(batch of images) with the following keys: `image_id`, `file_name` and `segments_info`, with "
|
|
"the latter being a list of annotations in the COCO format."
|
|
)
|
|
elif format not in SUPPORTED_ANNOTATION_FORMATS:
|
|
raise ValueError(
|
|
f"Unsupported annotation format: {format} must be one of {SUPPORTED_ANNOTATION_FORMATS}"
|
|
)
|
|
|
|
if (
|
|
masks_path is not None
|
|
and format == AnnotionFormat.COCO_PANOPTIC
|
|
and not isinstance(masks_path, (pathlib.Path, str))
|
|
):
|
|
raise ValueError(
|
|
"The path to the directory containing the mask PNG files should be provided as a"
|
|
f" `pathlib.Path` or string object, but is {type(masks_path)} instead."
|
|
)
|
|
|
|
# All transformations expect numpy arrays
|
|
images = [to_numpy_array(image) for image in images]
|
|
|
|
# prepare (COCO annotations as a list of Dict -> DETR target as a single Dict per image)
|
|
if annotations is not None:
|
|
prepared_images = []
|
|
prepared_annotations = []
|
|
for image, target in zip(images, annotations):
|
|
target = self.prepare_annotation(
|
|
image, target, format, return_segmentation_masks=return_segmentation_masks, masks_path=masks_path
|
|
)
|
|
prepared_images.append(image)
|
|
prepared_annotations.append(target)
|
|
images = prepared_images
|
|
annotations = prepared_annotations
|
|
del prepared_images, prepared_annotations
|
|
|
|
# transformations
|
|
if do_resize:
|
|
if annotations is not None:
|
|
resized_images, resized_annotations = [], []
|
|
for image, target in zip(images, annotations):
|
|
orig_size = get_image_size(image)
|
|
resized_image = self.resize(image, size=size, max_size=max_size, resample=resample)
|
|
resized_annotation = self.resize_annotation(target, orig_size, get_image_size(resized_image))
|
|
resized_images.append(resized_image)
|
|
resized_annotations.append(resized_annotation)
|
|
images = resized_images
|
|
annotations = resized_annotations
|
|
del resized_images, resized_annotations
|
|
else:
|
|
images = [self.resize(image, size=size, resample=resample) for image in images]
|
|
|
|
if do_rescale:
|
|
images = [self.rescale(image, rescale_factor) for image in images]
|
|
|
|
if do_normalize:
|
|
images = [self.normalize(image, image_mean, image_std) for image in images]
|
|
if annotations is not None:
|
|
annotations = [
|
|
self.normalize_annotation(annotation, get_image_size(image))
|
|
for annotation, image in zip(annotations, images)
|
|
]
|
|
|
|
if do_pad:
|
|
# Pads images and returns their mask: {'pixel_values': ..., 'pixel_mask': ...}
|
|
data = self.pad(images, return_pixel_mask=True, data_format=data_format)
|
|
else:
|
|
images = [to_channel_dimension_format(image, data_format) for image in images]
|
|
data = {"pixel_values": images}
|
|
|
|
encoded_inputs = BatchFeature(data=data, tensor_type=return_tensors)
|
|
if annotations is not None:
|
|
encoded_inputs["labels"] = [
|
|
BatchFeature(annotation, tensor_type=return_tensors) for annotation in annotations
|
|
]
|
|
|
|
return encoded_inputs
|
|
|
|
# POSTPROCESSING METHODS - TODO: add support for other frameworks
|
|
# inspired by https://github.com/facebookresearch/detr/blob/master/models/detr.py#L258
|
|
def post_process(self, outputs, target_sizes):
|
|
"""
|
|
Converts the raw output of [`DetrForObjectDetection`] into final bounding boxes in (top_left_x, top_left_y,
|
|
bottom_right_x, bottom_right_y) format. Only supports PyTorch.
|
|
|
|
Args:
|
|
outputs ([`DetrObjectDetectionOutput`]):
|
|
Raw outputs of the model.
|
|
target_sizes (`torch.Tensor` of shape `(batch_size, 2)`):
|
|
Tensor containing the size (height, width) of each image of the batch. For evaluation, this must be the
|
|
original image size (before any data augmentation). For visualization, this should be the image size
|
|
after data augment, but before padding.
|
|
Returns:
|
|
`List[Dict]`: A list of dictionaries, each dictionary containing the scores, labels and boxes for an image
|
|
in the batch as predicted by the model.
|
|
"""
|
|
warnings.warn(
|
|
"`post_process` is deprecated and will be removed in v5 of Transformers, please use"
|
|
" `post_process_object_detection`",
|
|
FutureWarning,
|
|
)
|
|
|
|
out_logits, out_bbox = outputs.logits, outputs.pred_boxes
|
|
|
|
if len(out_logits) != len(target_sizes):
|
|
raise ValueError("Make sure that you pass in as many target sizes as the batch dimension of the logits")
|
|
if target_sizes.shape[1] != 2:
|
|
raise ValueError("Each element of target_sizes must contain the size (h, w) of each image of the batch")
|
|
|
|
prob = nn.functional.softmax(out_logits, -1)
|
|
scores, labels = prob[..., :-1].max(-1)
|
|
|
|
# convert to [x0, y0, x1, y1] format
|
|
boxes = center_to_corners_format(out_bbox)
|
|
# and from relative [0, 1] to absolute [0, height] coordinates
|
|
img_h, img_w = target_sizes.unbind(1)
|
|
scale_fct = torch.stack([img_w, img_h, img_w, img_h], dim=1).to(boxes.device)
|
|
boxes = boxes * scale_fct[:, None, :]
|
|
|
|
results = [{"scores": s, "labels": l, "boxes": b} for s, l, b in zip(scores, labels, boxes)]
|
|
return results
|
|
|
|
def post_process_segmentation(self, outputs, target_sizes, threshold=0.9, mask_threshold=0.5):
|
|
"""
|
|
Converts the output of [`DetrForSegmentation`] into image segmentation predictions. Only supports PyTorch.
|
|
|
|
Args:
|
|
outputs ([`DetrSegmentationOutput`]):
|
|
Raw outputs of the model.
|
|
target_sizes (`torch.Tensor` of shape `(batch_size, 2)` or `List[Tuple]` of length `batch_size`):
|
|
Torch Tensor (or list) corresponding to the requested final size (h, w) of each prediction.
|
|
threshold (`float`, *optional*, defaults to 0.9):
|
|
Threshold to use to filter out queries.
|
|
mask_threshold (`float`, *optional*, defaults to 0.5):
|
|
Threshold to use when turning the predicted masks into binary values.
|
|
Returns:
|
|
`List[Dict]`: A list of dictionaries, each dictionary containing the scores, labels, and masks for an image
|
|
in the batch as predicted by the model.
|
|
"""
|
|
warnings.warn(
|
|
"`post_process_segmentation` is deprecated and will be removed in v5 of Transformers, please use"
|
|
" `post_process_semantic_segmentation`.",
|
|
FutureWarning,
|
|
)
|
|
out_logits, raw_masks = outputs.logits, outputs.pred_masks
|
|
empty_label = out_logits.shape[-1] - 1
|
|
preds = []
|
|
|
|
def to_tuple(tup):
|
|
if isinstance(tup, tuple):
|
|
return tup
|
|
return tuple(tup.cpu().tolist())
|
|
|
|
for cur_logits, cur_masks, size in zip(out_logits, raw_masks, target_sizes):
|
|
# we filter empty queries and detection below threshold
|
|
cur_scores, cur_labels = cur_logits.softmax(-1).max(-1)
|
|
keep = cur_labels.ne(empty_label) & (cur_scores > threshold)
|
|
cur_scores = cur_scores[keep]
|
|
cur_labels = cur_labels[keep]
|
|
cur_masks = cur_masks[keep]
|
|
cur_masks = nn.functional.interpolate(cur_masks[:, None], to_tuple(size), mode="bilinear").squeeze(1)
|
|
cur_masks = (cur_masks.sigmoid() > mask_threshold) * 1
|
|
|
|
predictions = {"scores": cur_scores, "labels": cur_labels, "masks": cur_masks}
|
|
preds.append(predictions)
|
|
return preds
|
|
|
|
# inspired by https://github.com/facebookresearch/detr/blob/master/models/segmentation.py#L218
|
|
def post_process_instance(self, results, outputs, orig_target_sizes, max_target_sizes, threshold=0.5):
|
|
"""
|
|
Converts the output of [`DetrForSegmentation`] into actual instance segmentation predictions. Only supports
|
|
PyTorch.
|
|
|
|
Args:
|
|
results (`List[Dict]`):
|
|
Results list obtained by [`~DetrFeatureExtractor.post_process`], to which "masks" results will be
|
|
added.
|
|
outputs ([`DetrSegmentationOutput`]):
|
|
Raw outputs of the model.
|
|
orig_target_sizes (`torch.Tensor` of shape `(batch_size, 2)`):
|
|
Tensor containing the size (h, w) of each image of the batch. For evaluation, this must be the original
|
|
image size (before any data augmentation).
|
|
max_target_sizes (`torch.Tensor` of shape `(batch_size, 2)`):
|
|
Tensor containing the maximum size (h, w) of each image of the batch. For evaluation, this must be the
|
|
original image size (before any data augmentation).
|
|
threshold (`float`, *optional*, defaults to 0.5):
|
|
Threshold to use when turning the predicted masks into binary values.
|
|
Returns:
|
|
`List[Dict]`: A list of dictionaries, each dictionary containing the scores, labels, boxes and masks for an
|
|
image in the batch as predicted by the model.
|
|
"""
|
|
warnings.warn(
|
|
"`post_process_instance` is deprecated and will be removed in v5 of Transformers, please use"
|
|
" `post_process_instance_segmentation`.",
|
|
FutureWarning,
|
|
)
|
|
|
|
if len(orig_target_sizes) != len(max_target_sizes):
|
|
raise ValueError("Make sure to pass in as many orig_target_sizes as max_target_sizes")
|
|
max_h, max_w = max_target_sizes.max(0)[0].tolist()
|
|
outputs_masks = outputs.pred_masks.squeeze(2)
|
|
outputs_masks = nn.functional.interpolate(
|
|
outputs_masks, size=(max_h, max_w), mode="bilinear", align_corners=False
|
|
)
|
|
outputs_masks = (outputs_masks.sigmoid() > threshold).cpu()
|
|
|
|
for i, (cur_mask, t, tt) in enumerate(zip(outputs_masks, max_target_sizes, orig_target_sizes)):
|
|
img_h, img_w = t[0], t[1]
|
|
results[i]["masks"] = cur_mask[:, :img_h, :img_w].unsqueeze(1)
|
|
results[i]["masks"] = nn.functional.interpolate(
|
|
results[i]["masks"].float(), size=tuple(tt.tolist()), mode="nearest"
|
|
).byte()
|
|
|
|
return results
|
|
|
|
# inspired by https://github.com/facebookresearch/detr/blob/master/models/segmentation.py#L241
|
|
def post_process_panoptic(self, outputs, processed_sizes, target_sizes=None, is_thing_map=None, threshold=0.85):
|
|
"""
|
|
Converts the output of [`DetrForSegmentation`] into actual panoptic predictions. Only supports PyTorch.
|
|
|
|
Args:
|
|
outputs ([`DetrSegmentationOutput`]):
|
|
Raw outputs of the model.
|
|
processed_sizes (`torch.Tensor` of shape `(batch_size, 2)` or `List[Tuple]` of length `batch_size`):
|
|
Torch Tensor (or list) containing the size (h, w) of each image of the batch, i.e. the size after data
|
|
augmentation but before batching.
|
|
target_sizes (`torch.Tensor` of shape `(batch_size, 2)` or `List[Tuple]` of length `batch_size`, *optional*):
|
|
Torch Tensor (or list) corresponding to the requested final size `(height, width)` of each prediction.
|
|
If left to None, it will default to the `processed_sizes`.
|
|
is_thing_map (`torch.Tensor` of shape `(batch_size, 2)`, *optional*):
|
|
Dictionary mapping class indices to either True or False, depending on whether or not they are a thing.
|
|
If not set, defaults to the `is_thing_map` of COCO panoptic.
|
|
threshold (`float`, *optional*, defaults to 0.85):
|
|
Threshold to use to filter out queries.
|
|
Returns:
|
|
`List[Dict]`: A list of dictionaries, each dictionary containing a PNG string and segments_info values for
|
|
an image in the batch as predicted by the model.
|
|
"""
|
|
warnings.warn(
|
|
"`post_process_panoptic is deprecated and will be removed in v5 of Transformers, please use"
|
|
" `post_process_panoptic_segmentation`.",
|
|
FutureWarning,
|
|
)
|
|
if target_sizes is None:
|
|
target_sizes = processed_sizes
|
|
if len(processed_sizes) != len(target_sizes):
|
|
raise ValueError("Make sure to pass in as many processed_sizes as target_sizes")
|
|
|
|
if is_thing_map is None:
|
|
# default to is_thing_map of COCO panoptic
|
|
is_thing_map = {i: i <= 90 for i in range(201)}
|
|
|
|
out_logits, raw_masks, raw_boxes = outputs.logits, outputs.pred_masks, outputs.pred_boxes
|
|
if not len(out_logits) == len(raw_masks) == len(target_sizes):
|
|
raise ValueError(
|
|
"Make sure that you pass in as many target sizes as the batch dimension of the logits and masks"
|
|
)
|
|
empty_label = out_logits.shape[-1] - 1
|
|
preds = []
|
|
|
|
def to_tuple(tup):
|
|
if isinstance(tup, tuple):
|
|
return tup
|
|
return tuple(tup.cpu().tolist())
|
|
|
|
for cur_logits, cur_masks, cur_boxes, size, target_size in zip(
|
|
out_logits, raw_masks, raw_boxes, processed_sizes, target_sizes
|
|
):
|
|
# we filter empty queries and detection below threshold
|
|
cur_scores, cur_labels = cur_logits.softmax(-1).max(-1)
|
|
keep = cur_labels.ne(empty_label) & (cur_scores > threshold)
|
|
cur_scores = cur_scores[keep]
|
|
cur_labels = cur_labels[keep]
|
|
cur_masks = cur_masks[keep]
|
|
cur_masks = nn.functional.interpolate(cur_masks[:, None], to_tuple(size), mode="bilinear").squeeze(1)
|
|
cur_boxes = center_to_corners_format(cur_boxes[keep])
|
|
|
|
h, w = cur_masks.shape[-2:]
|
|
if len(cur_boxes) != len(cur_labels):
|
|
raise ValueError("Not as many boxes as there are classes")
|
|
|
|
# It may be that we have several predicted masks for the same stuff class.
|
|
# In the following, we track the list of masks ids for each stuff class (they are merged later on)
|
|
cur_masks = cur_masks.flatten(1)
|
|
stuff_equiv_classes = defaultdict(lambda: [])
|
|
for k, label in enumerate(cur_labels):
|
|
if not is_thing_map[label.item()]:
|
|
stuff_equiv_classes[label.item()].append(k)
|
|
|
|
def get_ids_area(masks, scores, dedup=False):
|
|
# This helper function creates the final panoptic segmentation image
|
|
# It also returns the area of the masks that appears on the image
|
|
|
|
m_id = masks.transpose(0, 1).softmax(-1)
|
|
|
|
if m_id.shape[-1] == 0:
|
|
# We didn't detect any mask :(
|
|
m_id = torch.zeros((h, w), dtype=torch.long, device=m_id.device)
|
|
else:
|
|
m_id = m_id.argmax(-1).view(h, w)
|
|
|
|
if dedup:
|
|
# Merge the masks corresponding to the same stuff class
|
|
for equiv in stuff_equiv_classes.values():
|
|
if len(equiv) > 1:
|
|
for eq_id in equiv:
|
|
m_id.masked_fill_(m_id.eq(eq_id), equiv[0])
|
|
|
|
final_h, final_w = to_tuple(target_size)
|
|
|
|
seg_img = PIL.Image.fromarray(id_to_rgb(m_id.view(h, w).cpu().numpy()))
|
|
seg_img = seg_img.resize(size=(final_w, final_h), resample=PILImageResampling.NEAREST)
|
|
|
|
np_seg_img = torch.ByteTensor(torch.ByteStorage.from_buffer(seg_img.tobytes()))
|
|
np_seg_img = np_seg_img.view(final_h, final_w, 3)
|
|
np_seg_img = np_seg_img.numpy()
|
|
|
|
m_id = torch.from_numpy(rgb_to_id(np_seg_img))
|
|
|
|
area = []
|
|
for i in range(len(scores)):
|
|
area.append(m_id.eq(i).sum().item())
|
|
return area, seg_img
|
|
|
|
area, seg_img = get_ids_area(cur_masks, cur_scores, dedup=True)
|
|
if cur_labels.numel() > 0:
|
|
# We know filter empty masks as long as we find some
|
|
while True:
|
|
filtered_small = torch.as_tensor(
|
|
[area[i] <= 4 for i, c in enumerate(cur_labels)], dtype=torch.bool, device=keep.device
|
|
)
|
|
if filtered_small.any().item():
|
|
cur_scores = cur_scores[~filtered_small]
|
|
cur_labels = cur_labels[~filtered_small]
|
|
cur_masks = cur_masks[~filtered_small]
|
|
area, seg_img = get_ids_area(cur_masks, cur_scores)
|
|
else:
|
|
break
|
|
|
|
else:
|
|
cur_labels = torch.ones(1, dtype=torch.long, device=cur_labels.device)
|
|
|
|
segments_info = []
|
|
for i, a in enumerate(area):
|
|
cat = cur_labels[i].item()
|
|
segments_info.append({"id": i, "isthing": is_thing_map[cat], "category_id": cat, "area": a})
|
|
del cur_labels
|
|
|
|
with io.BytesIO() as out:
|
|
seg_img.save(out, format="PNG")
|
|
predictions = {"png_string": out.getvalue(), "segments_info": segments_info}
|
|
preds.append(predictions)
|
|
return preds
|
|
|
|
# inspired by https://github.com/facebookresearch/detr/blob/master/models/detr.py#L258
|
|
def post_process_object_detection(
|
|
self, outputs, threshold: float = 0.5, target_sizes: Union[TensorType, List[Tuple]] = None
|
|
):
|
|
"""
|
|
Converts the raw output of [`DetrForObjectDetection`] into final bounding boxes in (top_left_x, top_left_y,
|
|
bottom_right_x, bottom_right_y) format. Only supports PyTorch.
|
|
|
|
Args:
|
|
outputs ([`DetrObjectDetectionOutput`]):
|
|
Raw outputs of the model.
|
|
threshold (`float`, *optional*):
|
|
Score threshold to keep object detection predictions.
|
|
target_sizes (`torch.Tensor` or `List[Tuple[int, int]]`, *optional*):
|
|
Tensor of shape `(batch_size, 2)` or list of tuples (`Tuple[int, int]`) containing the target size
|
|
`(height, width)` of each image in the batch. If unset, predictions will not be resized.
|
|
Returns:
|
|
`List[Dict]`: A list of dictionaries, each dictionary containing the scores, labels and boxes for an image
|
|
in the batch as predicted by the model.
|
|
"""
|
|
out_logits, out_bbox = outputs.logits, outputs.pred_boxes
|
|
|
|
if target_sizes is not None:
|
|
if len(out_logits) != len(target_sizes):
|
|
raise ValueError(
|
|
"Make sure that you pass in as many target sizes as the batch dimension of the logits"
|
|
)
|
|
|
|
prob = nn.functional.softmax(out_logits, -1)
|
|
scores, labels = prob[..., :-1].max(-1)
|
|
|
|
# Convert to [x0, y0, x1, y1] format
|
|
boxes = center_to_corners_format(out_bbox)
|
|
|
|
# Convert from relative [0, 1] to absolute [0, height] coordinates
|
|
if target_sizes is not None:
|
|
if isinstance(target_sizes, List):
|
|
img_h = torch.Tensor([i[0] for i in target_sizes])
|
|
img_w = torch.Tensor([i[1] for i in target_sizes])
|
|
else:
|
|
img_h, img_w = target_sizes.unbind(1)
|
|
|
|
scale_fct = torch.stack([img_w, img_h, img_w, img_h], dim=1)
|
|
boxes = boxes * scale_fct[:, None, :]
|
|
|
|
results = []
|
|
for s, l, b in zip(scores, labels, boxes):
|
|
score = s[s > threshold]
|
|
label = l[s > threshold]
|
|
box = b[s > threshold]
|
|
results.append({"scores": score, "labels": label, "boxes": box})
|
|
|
|
return results
|
|
|
|
def post_process_semantic_segmentation(self, outputs, target_sizes: List[Tuple[int, int]] = None):
|
|
"""
|
|
Converts the output of [`DetrForSegmentation`] into semantic segmentation maps. Only supports PyTorch.
|
|
|
|
Args:
|
|
outputs ([`DetrForSegmentation`]):
|
|
Raw outputs of the model.
|
|
target_sizes (`List[Tuple[int, int]]`, *optional*):
|
|
A list of tuples (`Tuple[int, int]`) containing the target size (height, width) of each image in the
|
|
batch. If unset, predictions will not be resized.
|
|
Returns:
|
|
`List[torch.Tensor]`:
|
|
A list of length `batch_size`, where each item is a semantic segmentation map of shape (height, width)
|
|
corresponding to the target_sizes entry (if `target_sizes` is specified). Each entry of each
|
|
`torch.Tensor` correspond to a semantic class id.
|
|
"""
|
|
class_queries_logits = outputs.logits # [batch_size, num_queries, num_classes+1]
|
|
masks_queries_logits = outputs.pred_masks # [batch_size, num_queries, height, width]
|
|
|
|
# Remove the null class `[..., :-1]`
|
|
masks_classes = class_queries_logits.softmax(dim=-1)[..., :-1]
|
|
masks_probs = masks_queries_logits.sigmoid() # [batch_size, num_queries, height, width]
|
|
|
|
# Semantic segmentation logits of shape (batch_size, num_classes, height, width)
|
|
segmentation = torch.einsum("bqc, bqhw -> bchw", masks_classes, masks_probs)
|
|
batch_size = class_queries_logits.shape[0]
|
|
|
|
# Resize logits and compute semantic segmentation maps
|
|
if target_sizes is not None:
|
|
if batch_size != len(target_sizes):
|
|
raise ValueError(
|
|
"Make sure that you pass in as many target sizes as the batch dimension of the logits"
|
|
)
|
|
|
|
semantic_segmentation = []
|
|
for idx in range(batch_size):
|
|
resized_logits = nn.functional.interpolate(
|
|
segmentation[idx].unsqueeze(dim=0), size=target_sizes[idx], mode="bilinear", align_corners=False
|
|
)
|
|
semantic_map = resized_logits[0].argmax(dim=0)
|
|
semantic_segmentation.append(semantic_map)
|
|
else:
|
|
semantic_segmentation = segmentation.argmax(dim=1)
|
|
semantic_segmentation = [semantic_segmentation[i] for i in range(semantic_segmentation.shape[0])]
|
|
|
|
return semantic_segmentation
|
|
|
|
# inspired by https://github.com/facebookresearch/detr/blob/master/models/segmentation.py#L218
|
|
def post_process_instance_segmentation(
|
|
self,
|
|
outputs,
|
|
threshold: float = 0.5,
|
|
mask_threshold: float = 0.5,
|
|
overlap_mask_area_threshold: float = 0.8,
|
|
target_sizes: Optional[List[Tuple[int, int]]] = None,
|
|
return_coco_annotation: Optional[bool] = False,
|
|
) -> List[Dict]:
|
|
"""
|
|
Converts the output of [`DetrForSegmentation`] into instance segmentation predictions. Only supports PyTorch.
|
|
|
|
Args:
|
|
outputs ([`DetrForSegmentation`]):
|
|
Raw outputs of the model.
|
|
threshold (`float`, *optional*, defaults to 0.5):
|
|
The probability score threshold to keep predicted instance masks.
|
|
mask_threshold (`float`, *optional*, defaults to 0.5):
|
|
Threshold to use when turning the predicted masks into binary values.
|
|
overlap_mask_area_threshold (`float`, *optional*, defaults to 0.8):
|
|
The overlap mask area threshold to merge or discard small disconnected parts within each binary
|
|
instance mask.
|
|
target_sizes (`List[Tuple]`, *optional*):
|
|
List of length (batch_size), where each list item (`Tuple[int, int]]`) corresponds to the requested
|
|
final size (height, width) of each prediction. If unset, predictions will not be resized.
|
|
return_coco_annotation (`bool`, *optional*):
|
|
Defaults to `False`. If set to `True`, segmentation maps are returned in COCO run-length encoding (RLE)
|
|
format.
|
|
Returns:
|
|
`List[Dict]`: A list of dictionaries, one per image, each dictionary containing two keys:
|
|
- **segmentation** -- A tensor of shape `(height, width)` where each pixel represents a `segment_id` or
|
|
`List[List]` run-length encoding (RLE) of the segmentation map if return_coco_annotation is set to
|
|
`True`. Set to `None` if no mask if found above `threshold`.
|
|
- **segments_info** -- A dictionary that contains additional information on each segment.
|
|
- **id** -- An integer representing the `segment_id`.
|
|
- **label_id** -- An integer representing the label / semantic class id corresponding to `segment_id`.
|
|
- **score** -- Prediction score of segment with `segment_id`.
|
|
"""
|
|
class_queries_logits = outputs.logits # [batch_size, num_queries, num_classes+1]
|
|
masks_queries_logits = outputs.pred_masks # [batch_size, num_queries, height, width]
|
|
|
|
batch_size = class_queries_logits.shape[0]
|
|
num_labels = class_queries_logits.shape[-1] - 1
|
|
|
|
mask_probs = masks_queries_logits.sigmoid() # [batch_size, num_queries, height, width]
|
|
|
|
# Predicted label and score of each query (batch_size, num_queries)
|
|
pred_scores, pred_labels = nn.functional.softmax(class_queries_logits, dim=-1).max(-1)
|
|
|
|
# Loop over items in batch size
|
|
results: List[Dict[str, TensorType]] = []
|
|
|
|
for i in range(batch_size):
|
|
mask_probs_item, pred_scores_item, pred_labels_item = remove_low_and_no_objects(
|
|
mask_probs[i], pred_scores[i], pred_labels[i], threshold, num_labels
|
|
)
|
|
|
|
# No mask found
|
|
if mask_probs_item.shape[0] <= 0:
|
|
height, width = target_sizes[i] if target_sizes is not None else mask_probs_item.shape[1:]
|
|
segmentation = torch.zeros((height, width)) - 1
|
|
results.append({"segmentation": segmentation, "segments_info": []})
|
|
continue
|
|
|
|
# Get segmentation map and segment information of batch item
|
|
target_size = target_sizes[i] if target_sizes is not None else None
|
|
segmentation, segments = compute_segments(
|
|
mask_probs=mask_probs_item,
|
|
pred_scores=pred_scores_item,
|
|
pred_labels=pred_labels_item,
|
|
mask_threshold=mask_threshold,
|
|
overlap_mask_area_threshold=overlap_mask_area_threshold,
|
|
label_ids_to_fuse=[],
|
|
target_size=target_size,
|
|
)
|
|
|
|
# Return segmentation map in run-length encoding (RLE) format
|
|
if return_coco_annotation:
|
|
segmentation = convert_segmentation_to_rle(segmentation)
|
|
|
|
results.append({"segmentation": segmentation, "segments_info": segments})
|
|
return results
|
|
|
|
# inspired by https://github.com/facebookresearch/detr/blob/master/models/segmentation.py#L241
|
|
def post_process_panoptic_segmentation(
|
|
self,
|
|
outputs,
|
|
threshold: float = 0.5,
|
|
mask_threshold: float = 0.5,
|
|
overlap_mask_area_threshold: float = 0.8,
|
|
label_ids_to_fuse: Optional[Set[int]] = None,
|
|
target_sizes: Optional[List[Tuple[int, int]]] = None,
|
|
) -> List[Dict]:
|
|
"""
|
|
Converts the output of [`DetrForSegmentation`] into image panoptic segmentation predictions. Only supports
|
|
PyTorch.
|
|
|
|
Args:
|
|
outputs ([`DetrForSegmentation`]):
|
|
The outputs from [`DetrForSegmentation`].
|
|
threshold (`float`, *optional*, defaults to 0.5):
|
|
The probability score threshold to keep predicted instance masks.
|
|
mask_threshold (`float`, *optional*, defaults to 0.5):
|
|
Threshold to use when turning the predicted masks into binary values.
|
|
overlap_mask_area_threshold (`float`, *optional*, defaults to 0.8):
|
|
The overlap mask area threshold to merge or discard small disconnected parts within each binary
|
|
instance mask.
|
|
label_ids_to_fuse (`Set[int]`, *optional*):
|
|
The labels in this state will have all their instances be fused together. For instance we could say
|
|
there can only be one sky in an image, but several persons, so the label ID for sky would be in that
|
|
set, but not the one for person.
|
|
target_sizes (`List[Tuple]`, *optional*):
|
|
List of length (batch_size), where each list item (`Tuple[int, int]]`) corresponds to the requested
|
|
final size (height, width) of each prediction in batch. If unset, predictions will not be resized.
|
|
Returns:
|
|
`List[Dict]`: A list of dictionaries, one per image, each dictionary containing two keys:
|
|
- **segmentation** -- a tensor of shape `(height, width)` where each pixel represents a `segment_id` or
|
|
`None` if no mask if found above `threshold`. If `target_sizes` is specified, segmentation is resized to
|
|
the corresponding `target_sizes` entry.
|
|
- **segments_info** -- A dictionary that contains additional information on each segment.
|
|
- **id** -- an integer representing the `segment_id`.
|
|
- **label_id** -- An integer representing the label / semantic class id corresponding to `segment_id`.
|
|
- **was_fused** -- a boolean, `True` if `label_id` was in `label_ids_to_fuse`, `False` otherwise.
|
|
Multiple instances of the same class / label were fused and assigned a single `segment_id`.
|
|
- **score** -- Prediction score of segment with `segment_id`.
|
|
"""
|
|
|
|
if label_ids_to_fuse is None:
|
|
warnings.warn("`label_ids_to_fuse` unset. No instance will be fused.")
|
|
label_ids_to_fuse = set()
|
|
|
|
class_queries_logits = outputs.logits # [batch_size, num_queries, num_classes+1]
|
|
masks_queries_logits = outputs.pred_masks # [batch_size, num_queries, height, width]
|
|
|
|
batch_size = class_queries_logits.shape[0]
|
|
num_labels = class_queries_logits.shape[-1] - 1
|
|
|
|
mask_probs = masks_queries_logits.sigmoid() # [batch_size, num_queries, height, width]
|
|
|
|
# Predicted label and score of each query (batch_size, num_queries)
|
|
pred_scores, pred_labels = nn.functional.softmax(class_queries_logits, dim=-1).max(-1)
|
|
|
|
# Loop over items in batch size
|
|
results: List[Dict[str, TensorType]] = []
|
|
|
|
for i in range(batch_size):
|
|
mask_probs_item, pred_scores_item, pred_labels_item = remove_low_and_no_objects(
|
|
mask_probs[i], pred_scores[i], pred_labels[i], threshold, num_labels
|
|
)
|
|
|
|
# No mask found
|
|
if mask_probs_item.shape[0] <= 0:
|
|
height, width = target_sizes[i] if target_sizes is not None else mask_probs_item.shape[1:]
|
|
segmentation = torch.zeros((height, width)) - 1
|
|
results.append({"segmentation": segmentation, "segments_info": []})
|
|
continue
|
|
|
|
# Get segmentation map and segment information of batch item
|
|
target_size = target_sizes[i] if target_sizes is not None else None
|
|
segmentation, segments = compute_segments(
|
|
mask_probs=mask_probs_item,
|
|
pred_scores=pred_scores_item,
|
|
pred_labels=pred_labels_item,
|
|
mask_threshold=mask_threshold,
|
|
overlap_mask_area_threshold=overlap_mask_area_threshold,
|
|
label_ids_to_fuse=label_ids_to_fuse,
|
|
target_size=target_size,
|
|
)
|
|
|
|
results.append({"segmentation": segmentation, "segments_info": segments})
|
|
return results
|