如何创建自定义管道?
在本指南中,我们将了解如何创建自定义管道并将其分享到 Hub 或将其添加到 🤗 Transformers 库中。
首先,你需要确定管道可以接受的原始输入。它可以是字符串、原始字节、字典或任何看起来最有可能的输入。尝试将这些输入保持为纯 Python,这样可以更方便地兼容(即使通过 JSON 也可以通过其他语言进行兼容)。这些将是管道的`输入`( `preprocess` )。
然后定义`输出`。与`输入`相同的策略。越简单越好。这些将是`postprocess`方法的输出。
从继承基类`Pipeline`开始,该基类需要实现`preprocess`、`_forward`、`postprocess`和`_sanitize_parameters`的四个方法。
from transformers import Pipeline
class MyPipeline(Pipeline):
def _sanitize_parameters(self, **kwargs):
preprocess_kwargs = {}
if "maybe_arg" in kwargs:
preprocess_kwargs["maybe_arg"] = kwargs["maybe_arg"]
return preprocess_kwargs, {}, {}
def preprocess(self, inputs, maybe_arg=2):
model_input = Tensor(inputs["input_ids"])
return {"model_input": model_input}
def _forward(self, model_inputs):
# model_inputs == {"model_input": model_input}
outputs = self.model(**model_inputs)
# Maybe {"logits": Tensor(...)}
return outputs
def postprocess(self, model_outputs):
best_class = model_outputs["logits"].softmax(-1)
return best_class
这种分解的结构是为了相对无缝地支持 CPU/GPU,同时支持在 CPU 上的不同线程上进行预/后处理
`preprocess`将接受最初定义的输入,并将它们转换为模型可以接受的格式。它可能包含更多信息,通常是一个`Dict`。
`_forward`是实现细节,不应直接调用。 `forward`是首选的调用方法,因为它包含安全措施,以确保所有内容都在预期的设备上运行。如果任何内容与实际模型相关联,则应将其放在`_forward`方法中,其他任何内容都应放在preprocess/postprocess中。
`postprocess`方法将接受`_forward`的输出,并将其转换为之前确定的最终输出。
`_sanitize_parameters`的存在是为了允许用户在任何他们想要的时候传递任何参数,无论是初始化时`pipeline(...., maybe_arg=4)`还是调用时`pipe = pipeline(...); output = pipe(...., maybe_arg=4)`。
`_sanitize_parameters`的返回值是3个`kwargs`字典,它们将直接传递给`preprocess`、`_forward`和`postprocess`。如果调用者没有传递任何额外参数,则不要填充任何内容。这允许将默认参数保存在函数定义中,这始终更“自然”。
一个典型的例子是在分类任务的后期处理中使用`top_k`参数。
>>> pipe = pipeline("my-new-task")
>>> pipe("This is a test")
[{"label": "1-star", "score": 0.8}, {"label": "2-star", "score": 0.1}, {"label": "3-star", "score": 0.05}
{"label": "4-star", "score": 0.025}, {"label": "5-star", "score": 0.025}]
>>> pipe("This is a test", top_k=2)
[{"label": "1-star", "score": 0.8}, {"label": "2-star", "score": 0.1}]
为了实现这一点,我们将更新我们的`postprocess`方法,将其默认参数设置为`5`,并编辑`_sanitize_parameters`以允许使用这个新参数。
def postprocess(self, model_outputs, top_k=5):
best_class = model_outputs["logits"].softmax(-1)
# Add logic to handle top_k
return best_class
def _sanitize_parameters(self, **kwargs):
preprocess_kwargs = {}
if "maybe_arg" in kwargs:
preprocess_kwargs["maybe_arg"] = kwargs["maybe_arg"]
postprocess_kwargs = {}
if "top_k" in kwargs:
postprocess_kwargs["top_k"] = kwargs["top_k"]
return preprocess_kwargs, {}, postprocess_kwargs
尝试保持输入/输出非常简单,理想情况下是 JSON 可序列化的,因为这样可以使管道使用起来非常容易,而不必让用户理解新类型的对象。支持多种不同类型的参数以方便使用(音频文件,可以是文件名、URL 或纯字节)也是比较常见的。
添加到支持的任务列表中
要将你的`new-task`注册到支持的任务列表中,你需要将其添加到`PIPELINE_REGISTRY`中。
from transformers.pipelines import PIPELINE_REGISTRY
PIPELINE_REGISTRY.register_pipeline(
"new-task",
pipeline_class=MyPipeline,
pt_model=AutoModelForSequenceClassification,
)
如果需要,可以指定一个默认模型,在这种情况下,它应该附带一个特定的版本(可以是分支的名称或提交哈希值,这里我们使用的是`“abcdef”`)以及类型。
PIPELINE_REGISTRY.register_pipeline(
"new-task",
pipeline_class=MyPipeline,
pt_model=AutoModelForSequenceClassification,
default={"pt": ("user/awesome_model", "abcdef")},
type="text", # current support type: text, audio, image, multimodal
)
在 Hub 上分享你的管道
要将你的自定义管道分享到 Hub 上,你只需要将你`Pipeline`子类的自定义代码保存在一个 python 文件中。例如,假设我们想要使用一个自定义管道来进行句子对分类,如下所示
import numpy as np
from transformers import Pipeline
def softmax(outputs):
maxes = np.max(outputs, axis=-1, keepdims=True)
shifted_exp = np.exp(outputs - maxes)
return shifted_exp / shifted_exp.sum(axis=-1, keepdims=True)
class PairClassificationPipeline(Pipeline):
def _sanitize_parameters(self, **kwargs):
preprocess_kwargs = {}
if "second_text" in kwargs:
preprocess_kwargs["second_text"] = kwargs["second_text"]
return preprocess_kwargs, {}, {}
def preprocess(self, text, second_text=None):
return self.tokenizer(text, text_pair=second_text, return_tensors=self.framework)
def _forward(self, model_inputs):
return self.model(**model_inputs)
def postprocess(self, model_outputs):
logits = model_outputs.logits[0].numpy()
probabilities = softmax(logits)
best_class = np.argmax(probabilities)
label = self.model.config.id2label[best_class]
score = probabilities[best_class].item()
logits = logits.tolist()
return {"label": label, "score": score, "logits": logits}
实现与框架无关,适用于 PyTorch 和 TensorFlow 模型。如果我们将其保存在名为`pair_classification.py`的文件中,我们就可以导入它并注册它,如下所示
from pair_classification import PairClassificationPipeline
from transformers.pipelines import PIPELINE_REGISTRY
from transformers import AutoModelForSequenceClassification, TFAutoModelForSequenceClassification
PIPELINE_REGISTRY.register_pipeline(
"pair-classification",
pipeline_class=PairClassificationPipeline,
pt_model=AutoModelForSequenceClassification,
tf_model=TFAutoModelForSequenceClassification,
)
完成之后,我们可以使用预训练的模型来使用它。例如,`sgugger/finetuned-bert-mrpc`是在 MRPC 数据集上微调的,该数据集将句子对分类为释义或非释义。
from transformers import pipeline
classifier = pipeline("pair-classification", model="sgugger/finetuned-bert-mrpc")
然后,我们可以使用`push_to_hub`方法将其分享到 Hub 上。
classifier.push_to_hub("test-dynamic-pipeline")
这将复制你在其中定义`PairClassificationPipeline`的文件到`“test-dynamic-pipeline”`文件夹中,并将管道模型和分词器一起保存,然后将所有内容推送到`{your_username}/test-dynamic-pipeline`仓库中。此后,任何人都可以使用它,只要他们提供`trust_remote_code=True`选项。
from transformers import pipeline
classifier = pipeline(model="{your_username}/test-dynamic-pipeline", trust_remote_code=True)
将管道添加到 🤗 Transformers
如果你想将你的管道贡献给 🤗 Transformers,你需要在`pipelines`子模块中添加一个新的模块,其中包含你的管道代码,然后将其添加到`pipelines/__init__.py`中定义的任务列表中。
然后你需要添加测试。创建一个新的文件`tests/test_pipelines_MY_PIPELINE.py`,其中包含其他测试的示例。
`run_pipeline_test`函数将非常通用,并在`model_mapping`和`tf_model_mapping`定义的所有可能的架构上运行小的随机模型。
这对测试未来的兼容性非常重要,这意味着如果有人为`XXXForQuestionAnswering`添加了一个新模型,那么管道测试将尝试在该模型上运行。由于模型是随机的,因此无法检查实际值,这就是为什么有一个辅助`ANY`会简单地尝试匹配管道输出的类型。
你还*需要*实现2个(理想情况下是4个)测试。
- `test_small_model_pt`:为这个管道定义1个小型模型(结果是否合理无关紧要),并测试管道输出。结果应该与`test_small_model_tf`相同。
- `test_small_model_tf`:为这个管道定义1个小型模型(结果是否合理无关紧要),并测试管道输出。结果应该与`test_small_model_pt`相同。
- `test_large_model_pt`(`可选`):在真正的管道上测试管道,其中结果应该是有意义的。这些测试很慢,应该标记为慢速测试。这里的目标是展示管道,并确保未来的版本没有偏差。
- `test_large_model_tf`(`可选`):在真正的管道上测试管道,其中结果应该是有意义的。这些测试很慢,应该标记为慢速测试。这里的目标是展示管道,并确保未来的版本没有偏差。