mirror of
https://github.com/huggingface/transformers.git
synced 2025-07-31 02:02:21 +06:00
Adding support for raw python generator
in addition to Dataset
for pipelines (#14352)
* Adding support for raw python `generator` in addition to `Dataset` The main goal is to ease the create of streaming data to the pipe. `Dataset` is more involved and pytorch specific. This PR, provides a way to use a python iterator too. This enabled #14250 but can be proposed as a standalone PR. ```python from transformers import pipeline def read_data(filename): with open(filename, 'r') as f: for line in f: yield f pipe = pipeline("text-classification") for classified in pipe(read_data("large_file.txt")): print("Success ! ", classified) ``` The main caveat of this, is the interaction with `DataLoader` with `num_workers>1`. When you have multiple workers, each receive a copy of the generator (like `IterableDataset`). That means the naive Iterator will fail since all workers iterate on all items of the generator. There are ways to do clever "skipping", but it could be bad still because all workers still do have to pass through all items of the generator (they just ignore items they don't handle), depending on the case it might be bad. Using `num_workers=1` is the simplest fix and if the cost of loading your data is small enough should be good enough. In the above example trying to do smart tricks to skip some lines is unlikely to be a net positive for instance. If there are better ways to do "jumps" on some data, then using `Dataset` is more advised (since then differents workers can just jump themselves). * Adding iterator support for `tf` too.
This commit is contained in:
parent
77262ef750
commit
ed5d15518b
@ -12,12 +12,14 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import collections
|
||||
import csv
|
||||
import importlib
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
import sys
|
||||
import types
|
||||
import warnings
|
||||
from abc import ABC, abstractmethod
|
||||
from collections import UserDict
|
||||
@ -1035,10 +1037,20 @@ class Pipeline(_ScikitCompat):
|
||||
def get_iterator(
|
||||
self, inputs, num_workers: int, batch_size: int, preprocess_params, forward_params, postprocess_params
|
||||
):
|
||||
if isinstance(inputs, collections.abc.Sized):
|
||||
dataset = PipelineDataset(inputs, self.preprocess, preprocess_params)
|
||||
else:
|
||||
if num_workers > 1:
|
||||
logger.warning(
|
||||
"For iterable dataset using num_workers>1 is likely to result"
|
||||
" in errors since everything is iterable, setting `num_workers=1`"
|
||||
" to guarantee correctness."
|
||||
)
|
||||
num_workers = 1
|
||||
dataset = PipelineIterator(inputs, self.preprocess, preprocess_params)
|
||||
if "TOKENIZERS_PARALLELISM" not in os.environ:
|
||||
logger.info("Disabling tokenizer parallelism, we're using DataLoader multithreading already")
|
||||
os.environ["TOKENIZERS_PARALLELISM"] = "false"
|
||||
dataset = PipelineDataset(inputs, self.preprocess, preprocess_params)
|
||||
collate_fn = no_collate_fn if batch_size == 1 else pad_collate_fn(self.tokenizer, self.feature_extractor)
|
||||
dataloader = DataLoader(dataset, num_workers=num_workers, batch_size=batch_size, collate_fn=collate_fn)
|
||||
model_iterator = PipelineIterator(dataloader, self.forward, forward_params, loader_batch_size=batch_size)
|
||||
@ -1074,6 +1086,14 @@ class Pipeline(_ScikitCompat):
|
||||
return self.get_iterator(
|
||||
inputs, num_workers, batch_size, preprocess_params, forward_params, postprocess_params
|
||||
)
|
||||
elif isinstance(inputs, types.GeneratorType):
|
||||
if self.framework == "pt":
|
||||
return self.get_iterator(
|
||||
inputs, num_workers, batch_size, preprocess_params, forward_params, postprocess_params
|
||||
)
|
||||
else:
|
||||
# TODO make the get_iterator work also for `tf` (and `flax`).
|
||||
return self.iterate(inputs, preprocess_params, forward_params, postprocess_params)
|
||||
else:
|
||||
return self.run_single(inputs, preprocess_params, forward_params, postprocess_params)
|
||||
|
||||
@ -1085,3 +1105,9 @@ class Pipeline(_ScikitCompat):
|
||||
model_outputs = self.forward(model_inputs, **forward_params)
|
||||
outputs = self.postprocess(model_outputs, **postprocess_params)
|
||||
return outputs
|
||||
|
||||
def iterate(self, inputs, preprocess_params, forward_params, postprocess_params):
|
||||
# This function should become `get_iterator` again, this is a temporary
|
||||
# easy solution.
|
||||
for input_ in inputs:
|
||||
yield self.run_single(input_, preprocess_params, forward_params, postprocess_params)
|
||||
|
@ -123,7 +123,12 @@ class TextClassificationPipeline(Pipeline):
|
||||
|
||||
If ``self.return_all_scores=True``, one such dictionary is returned per label.
|
||||
"""
|
||||
return super().__call__(*args, **kwargs)
|
||||
result = super().__call__(*args, **kwargs)
|
||||
if isinstance(args[0], str):
|
||||
# This pipeline is odd, and return a list when single item is run
|
||||
return [result]
|
||||
else:
|
||||
return result
|
||||
|
||||
def preprocess(self, inputs, **tokenizer_kwargs) -> Dict[str, GenericTensor]:
|
||||
return_tensors = self.framework
|
||||
@ -160,10 +165,3 @@ class TextClassificationPipeline(Pipeline):
|
||||
return [{"label": self.model.config.id2label[i], "score": score.item()} for i, score in enumerate(scores)]
|
||||
else:
|
||||
return {"label": self.model.config.id2label[scores.argmax().item()], "score": scores.max().item()}
|
||||
|
||||
def run_multi(self, inputs, preprocess_params, forward_params, postprocess_params):
|
||||
return [self.run_single(item, preprocess_params, forward_params, postprocess_params)[0] for item in inputs]
|
||||
|
||||
def run_single(self, inputs, preprocess_params, forward_params, postprocess_params):
|
||||
"This pipeline is odd, and return a list when single item is run"
|
||||
return [super().run_single(inputs, preprocess_params, forward_params, postprocess_params)]
|
||||
|
@ -34,7 +34,7 @@ from transformers import (
|
||||
)
|
||||
from transformers.pipelines import get_task
|
||||
from transformers.pipelines.base import _pad
|
||||
from transformers.testing_utils import is_pipeline_test, require_torch
|
||||
from transformers.testing_utils import is_pipeline_test, nested_simplify, require_tf, require_torch
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -286,6 +286,42 @@ class CommonPipelineTest(unittest.TestCase):
|
||||
# Wrong framework
|
||||
get_task("espnet/siddhana_slurp_entity_asr_train_asr_conformer_raw_en_word_valid.acc.ave_10best")
|
||||
|
||||
@require_torch
|
||||
def test_iterator_data(self):
|
||||
def data(n: int):
|
||||
for _ in range(n):
|
||||
yield "This is a test"
|
||||
|
||||
pipe = pipeline(model="Narsil/tiny-distilbert-sequence-classification")
|
||||
|
||||
results = []
|
||||
for out in pipe(data(10)):
|
||||
self.assertEqual(nested_simplify(out), {"label": "LABEL_1", "score": 0.502})
|
||||
results.append(out)
|
||||
self.assertEqual(len(results), 10)
|
||||
|
||||
# When using multiple workers on streamable data it should still work
|
||||
# This will force using `num_workers=1` with a warning for now.
|
||||
results = []
|
||||
for out in pipe(data(10), num_workers=2):
|
||||
self.assertEqual(nested_simplify(out), {"label": "LABEL_1", "score": 0.502})
|
||||
results.append(out)
|
||||
self.assertEqual(len(results), 10)
|
||||
|
||||
@require_tf
|
||||
def test_iterator_data_tf(self):
|
||||
def data(n: int):
|
||||
for _ in range(n):
|
||||
yield "This is a test"
|
||||
|
||||
pipe = pipeline(model="Narsil/tiny-distilbert-sequence-classification", framework="tf")
|
||||
out = pipe("This is a test")
|
||||
results = []
|
||||
for out in pipe(data(10)):
|
||||
self.assertEqual(nested_simplify(out), {"label": "LABEL_1", "score": 0.502})
|
||||
results.append(out)
|
||||
self.assertEqual(len(results), 10)
|
||||
|
||||
|
||||
@is_pipeline_test
|
||||
class PipelinePadTest(unittest.TestCase):
|
||||
|
Loading…
Reference in New Issue
Block a user