开源 AI 食谱文档

带有签名感知的模型服务,基于 MLflow 和 Ray Serve

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

Open In Colab

带有签名感知的模型服务,基于 MLflow 和 Ray Serve

作者:Jonathan Jin

简介

本笔记本探讨了简化从模型注册表中部署模型的解决方案。对于希望随着时间推移将许多模型投入生产的团队来说,在 AI/ML 项目生命周期的这个“过渡点”进行投资可以显著缩短从开发到生产的时间。这对于规模较小的新团队来说尤其重要,因为他们可能没有现有基础设施来形成在线模型生产服务的“黄金路径”。

动机

优化模型生命周期的这个阶段尤为重要,因为最终结果是面向生产的。在这个阶段,您的模型实际上变成了一个微服务。这意味着您现在需要处理服务所有权的所有要素,包括

  • 标准化和强制执行 API 向后兼容性;
  • 日志记录、指标和通用可观察性问题;
  • 等等。

每次想要部署新模型时都需要重复相同的通用设置,这将导致您和您的团队的开发成本随着时间的推移而显著增加。另一方面,考虑到生产模型所有权的“长尾效应”(假设生产模型不太可能在短期内退役),简化此处的投资可以在长期内获得丰厚的回报。

鉴于以上所有内容,我们在此用以下用户故事来阐述我们的探索动机

我想仅使用模型名称从模型注册表(例如 MLflow)部署模型。每次我想部署新模型时,我需要复制的样板代码和脚手架越少越好。我希望能够动态选择模型的不同版本,而无需设置全新的部署来适应这些新版本。

组件

在此探索中,我们将使用以下最小堆栈

  • MLflow 用于模型注册表;
  • Ray Serve 用于模型服务。

为了演示目的,我们将专门使用来自 Hugging Face Hub 的现成开源模型。

我们不会使用 GPU 进行推理,因为推理性能与我们今天的重点无关。毋庸置疑,在“现实生活中”,您可能无法仅使用 CPU 计算来服务您的模型。

现在让我们安装依赖项。

!pip install "transformers" "mlflow-skinny" "ray[serve]" "torch"

注册模型

首先,让我们定义我们今天探索中将使用的模型。为了简单起见,我们将使用一个简单的文本翻译模型,其中源语言和目标语言在注册时可配置。实际上,这意味着可以注册模型的不同“版本”来翻译不同的语言,但底层模型架构和权重可以保持不变。

import mlflow
from transformers import pipeline


class MyTranslationModel(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        self.lang_from = context.model_config.get("lang_from", "en")
        self.lang_to = context.model_config.get("lang_to", "de")

        self.input_label: str = context.model_config.get("input_label", "prompt")

        self.model_ref: str = context.model_config.get("hfhub_name", "google-t5/t5-base")

        self.pipeline = pipeline(
            f"translation_{self.lang_from}_to_{self.lang_to}",
            self.model_ref,
        )

    def predict(self, context, model_input, params=None):
        prompt = model_input[self.input_label].tolist()

        return self.pipeline(prompt)

(您可能想知道我们为什么还要费心使输入标签可配置。这将在稍后对我们有用。)

现在我们的模型已定义,让我们注册它的实际版本。这个特定版本将使用 Google 的 T5 Base 模型,并配置为从英语翻译成德语。

import pandas as pd

with mlflow.start_run():
    model_info = mlflow.pyfunc.log_model(
        "translation_model",
        registered_model_name="translation_model",
        python_model=MyTranslationModel(),
        pip_requirements=["transformers"],
        input_example=pd.DataFrame(
            {
                "prompt": ["Hello my name is Jonathan."],
            }
        ),
        model_config={
            "hfhub_name": "google-t5/t5-base",
            "lang_from": "en",
            "lang_to": "de",
        },
    )

让我们跟踪这个确切的版本。这将在稍后有用。

en_to_de_version: str = str(model_info.registered_model_version)

注册的模型元数据包含一些对我们有用的信息。最值得注意的是,注册的模型版本与严格的签名相关联,该签名表示其输入和输出的预期形状。这将在稍后对我们有用。

>>> print(model_info.signature)
inputs: 
  ['prompt': string (required)]
outputs: 
  ['translation_text': string (required)]
params: 
  None

服务模型

现在我们的模型已在 MLflow 中注册,让我们使用 Ray Serve 设置我们的服务脚手架。目前,我们将“部署”限制为以下行为

  • 从 MLflow 获取选定的模型和版本;
  • 通过简单的 REST API 接收推理请求并返回推理响应。
import mlflow
import pandas as pd

from ray import serve
from fastapi import FastAPI

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class ModelDeployment:
    def __init__(self, model_name: str = "translation_model", default_version: str = "1"):
        self.model_name = model_name
        self.default_version = default_version

        self.model = mlflow.pyfunc.load_model(f"models:/{self.model_name}/{self.default_version}")

    @app.post("/serve")
    async def serve(self, input_string: str):
        return self.model.predict(pd.DataFrame({"prompt": [input_string]}))


deployment = ModelDeployment.bind(default_version=en_to_de_version)

您可能已经注意到,在此处将 “prompt” 硬编码为输入标签会在注册模型的签名和部署实现之间引入隐藏的耦合。我们稍后会回到这一点。

现在,让我们运行部署并试用一下。

serve.run(deployment, blocking=False)
>>> import requests

>>> response = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     params={"input_string": "The weather is lovely today"},
... )

>>> print(response.json())
[{'translation_text': 'Das Wetter ist heute nett.'}]

这工作正常,但您可能已经注意到 REST API 与模型签名不一致。也就是说,它使用标签 “input_string”,而服务模型版本本身使用输入标签 “prompt”。同样,模型可以接受多个输入值,但 API 只接受一个。

如果您觉得这有点 “味道”,请继续阅读;我们稍后会回到这一点。

多版本,单端点

现在我们已经为我们的模型设置了一个基本端点。太棒了!但是,请注意,此部署严格绑定到此模型的单个版本——具体来说,是注册的 translation_model 的版本 1。

现在想象一下,您的团队想要返回并改进此模型——可能是在新数据上重新训练它,或者将其配置为翻译成新语言,例如法语而不是德语。两者都会导致注册 translation_model 的新版本。但是,使用我们当前的部署实现,我们需要为 translation_model/2 设置一个全新的端点,要求我们的用户记住哪个地址和端口对应于哪个版本的模型,等等。换句话说:非常繁琐、非常容易出错、非常费力。

相反,想象一下这样一种情况,我们可以重用完全相同的端点——相同的签名、相同的地址和端口、相同的查询约定等——来服务于此模型的两个版本。我们的用户只需指定他们想要使用的模型版本,并且在用户没有明确请求的情况下,我们可以将其中一个版本视为“默认”版本。

这是 Ray Serve 凭借其称为模型多路复用 (model multiplexing) 的功能而脱颖而出的一个领域。实际上,这允许您加载模型的多个“版本”,根据需要动态热插拔它们,以及卸载一段时间后未使用的版本。换句话说,非常节省空间。

让我们尝试注册模型的另一个版本——这次是翻译英语到法语的版本。我们将将其注册为版本 “2”;模型服务器将以这种方式检索模型版本。

但首先,让我们使用多路复用支持扩展模型服务器。

from ray import serve
from fastapi import FastAPI

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class MultiplexedModelDeployment:

    @serve.multiplexed(max_num_models_per_replica=2)
    async def get_model(self, version: str):
        return mlflow.pyfunc.load_model(f"models:/{self.model_name}/{version}")

    def __init__(
        self,
        model_name: str = "translation_model",
        default_version: str = en_to_de_version,
    ):
        self.model_name = model_name
        self.default_version = default_version

    @app.post("/serve")
    async def serve(self, input_string: str):
        model = await self.get_model(serve.get_multiplexed_model_id())
        return model.predict(pd.DataFrame({"prompt": [input_string]}))
multiplexed_deployment = MultiplexedModelDeployment.bind(model_name="translation_model")
serve.run(multiplexed_deployment, blocking=False)

现在让我们实际注册新的模型版本。

import pandas as pd

with mlflow.start_run():
    model_info = mlflow.pyfunc.log_model(
        "translation_model",
        registered_model_name="translation_model",
        python_model=MyTranslationModel(),
        pip_requirements=["transformers"],
        input_example=pd.DataFrame(
            {
                "prompt": [
                    "Hello my name is Jon.",
                ],
            }
        ),
        model_config={
            "hfhub_name": "google-t5/t5-base",
            "lang_from": "en",
            "lang_to": "fr",
        },
    )

en_to_fr_version: str = str(model_info.registered_model_version)

现在它已注册,我们可以像这样通过模型服务器查询它…

>>> import requests

>>> response = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     params={"input_string": "The weather is lovely today"},
...     headers={"serve_multiplexed_model_id": en_to_fr_version},
... )

>>> print(response.json())
[{'translation_text': "Le temps est beau aujourd'hui"}]

请注意,我们是如何能够在不重新部署模型服务器的情况下立即访问模型版本的。Ray Serve 的多路复用功能允许它以即时方式动态获取模型权重;如果我从未请求版本 2,它永远不会被加载。这有助于为确实被查询的模型节省计算资源。更重要的是,如果加载的模型数量超过配置的最大值 (max_num_models_per_replica),则最近最少使用的模型版本将被驱逐。

鉴于我们在上面设置了 max_num_models_per_replica=2,模型的 “默认” 英德翻译版本应该仍然已加载并随时可用于服务请求,而无需任何冷启动时间。现在让我们确认一下

>>> print(
...     requests.post(
...         "http://127.0.0.1:8000/serve/",
...         params={"input_string": "The weather is lovely today"},
...         headers={"serve_multiplexed_model_id": en_to_de_version},
...     ).json()
... )
[{'translation_text': 'Das Wetter ist heute nett.'}]

自动签名

这一切都很好。但是,请注意,以下摩擦点仍然存在:在定义服务器时,我们需要为 API 本身定义一个全新的签名。往好了说,这只是模型签名本身(已在 MLflow 中注册)的一些代码重复。往坏了说,这可能会导致您的团队或组织拥有的所有模型之间的 API 不一致,这可能会给您的下游依赖项带来困惑和挫败感。

在这种特殊情况下,这意味着 MultiplexedModelDeployment 实际上秘密地与 translation_model 的用例紧密耦合。如果我们想要部署另一组与语言翻译无关的模型怎么办?定义的 /serve API 返回一个类似 {"translated_text": "foo"} 的 JSON 对象,将不再有意义。

为了解决这个问题,如果 MultiplexedModelDeployment 的 API 签名可以自动镜像它所服务的底层模型的签名,会怎么样?

值得庆幸的是,借助 MLflow 模型注册表元数据和一些 Python 动态类创建的技巧,这完全是可能的。

让我们进行设置,以便从注册的模型本身推断模型服务器签名。由于 MLflow 的不同版本可以具有不同的签名,我们将使用 “默认版本” 来 “固定” 签名;任何尝试多路复用不兼容签名模型版本的操作都将抛出错误。

由于 Ray Serve 在类定义时绑定请求和响应签名,我们将使用 Python 元类将其设置为指定模型名称和默认模型版本的函数。

import mlflow
import pydantic


def schema_to_pydantic(schema: mlflow.types.schema.Schema, *, name: str) -> pydantic.BaseModel:
    return pydantic.create_model(
        name, **{k: (v.type.to_python(), pydantic.Field(required=True)) for k, v in schema.input_dict().items()}
    )


def get_req_resp_signatures(
    model_signature: mlflow.models.ModelSignature,
) -> tuple[pydantic.BaseModel, pydantic.BaseModel]:
    inputs: mlflow.types.schema.Schema = model_signature.inputs
    outputs: mlflow.types.schema.Schema = model_signature.outputs

    return (schema_to_pydantic(inputs, name="InputModel"), schema_to_pydantic(outputs, name="OutputModel"))
import mlflow

from fastapi import FastAPI, Response, status
from ray import serve
from typing import List


def deployment_from_model_name(model_name: str, default_version: str = "1"):
    app = FastAPI()
    model_info = mlflow.models.get_model_info(f"models:/{model_name}/{default_version}")
    input_datamodel, output_datamodel = get_req_resp_signatures(model_info.signature)

    @serve.deployment
    @serve.ingress(app)
    class DynamicallyDefinedDeployment:

        MODEL_NAME: str = model_name
        DEFAULT_VERSION: str = default_version

        @serve.multiplexed(max_num_models_per_replica=2)
        async def get_model(self, model_version: str):
            model = mlflow.pyfunc.load_model(f"models:/{self.MODEL_NAME}/{model_version}")

            if model.metadata.get_model_info().signature != model_info.signature:
                raise ValueError(
                    f"Requested version {model_version} has signature incompatible with that of default version {self.DEFAULT_VERSION}"
                )
            return model

        # TODO: Extend this to support batching (lists of inputs and outputs)
        @app.post("/serve", response_model=List[output_datamodel])
        async def serve(self, model_input: input_datamodel, response: Response):
            model_id = serve.get_multiplexed_model_id()
            if model_id == "":
                model_id = self.DEFAULT_VERSION

            try:
                model = await self.get_model(model_id)
            except ValueError:
                response.status_code = status.HTTP_409_CONFLICT
                return [{"translation_text": "FAILED"}]

            return model.predict(model_input.dict())

    return DynamicallyDefinedDeployment


deployment = deployment_from_model_name("translation_model", default_version=en_to_fr_version)

serve.run(deployment.bind(), blocking=False)
>>> import requests

>>> resp = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     json={"prompt": "The weather is lovely today"},
... )

>>> assert resp.ok
>>> assert resp.status_code == 200

>>> print(resp.json())
[{'translation_text': "Le temps est beau aujourd'hui"}]
>>> import requests

>>> resp = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     json={"prompt": "The weather is lovely today"},
...     headers={"serve_multiplexed_model_id": str(en_to_fr_version)},
... )

>>> assert resp.ok
>>> assert resp.status_code == 200

>>> print(resp.json())
[{'translation_text': "Le temps est beau aujourd'hui"}]

现在让我们确认我们实施的签名检查规定实际上有效。为此,让我们注册具有稍微不同的签名的相同模型。这应该足以触发故障保护。

(还记得我们在本练习开始时使输入标签可配置吗?这就是它最终发挥作用的地方。😎)

import pandas as pd

with mlflow.start_run():
    incompatible_version = str(
        mlflow.pyfunc.log_model(
            "translation_model",
            registered_model_name="translation_model",
            python_model=MyTranslationModel(),
            pip_requirements=["transformers"],
            input_example=pd.DataFrame(
                {
                    "text_to_translate": [
                        "Hello my name is Jon.",
                    ],
                }
            ),
            model_config={
                "input_label": "text_to_translate",
                "hfhub_name": "google-t5/t5-base",
                "lang_from": "en",
                "lang_to": "de",
            },
        ).registered_model_version
    )
import requests

resp = requests.post(
    "http://127.0.0.1:8000/serve/",
    json={"prompt": "The weather is lovely today"},
    headers={"serve_multiplexed_model_id": incompatible_version},
)
assert not resp.ok
resp.status_code == 409

assert resp.json()[0]["translation_text"] == "FAILED"

(从技术上讲,“正确”的做法是实现一个响应容器,该容器允许将 “错误消息” 定义为实际响应的一部分,而不是像我们在此处所做的那样 “滥用” translation_text 字段。但是,为了演示目的,这样做就足够了。)

为了完全结束,让我们尝试注册一个完全不同的模型——具有完全不同的签名——并通过 deployment_from_model_name() 部署它。这将帮助我们确认整个签名是从加载的模型定义的。

import mlflow
from transformers import pipeline


class QuestionAnswererModel(mlflow.pyfunc.PythonModel):
    def load_context(self, context):

        self.model_context = context.model_config.get(
            "model_context",
            "My name is Hans and I live in Germany.",
        )
        self.model_name = context.model_config.get(
            "model_name",
            "deepset/roberta-base-squad2",
        )

        self.tokenizer_name = context.model_config.get(
            "tokenizer_name",
            "deepset/roberta-base-squad2",
        )

        self.pipeline = pipeline(
            "question-answering",
            model=self.model_name,
            tokenizer=self.tokenizer_name,
        )

    def predict(self, context, model_input, params=None):
        resp = self.pipeline(
            question=model_input["question"].tolist(),
            context=self.model_context,
        )

        return [resp] if type(resp) is not list else resp
import pandas as pd

with mlflow.start_run():
    model_info = mlflow.pyfunc.log_model(
        "question_answerer",
        registered_model_name="question_answerer",
        python_model=QuestionAnswererModel(),
        pip_requirements=["transformers"],
        input_example=pd.DataFrame(
            {
                "question": [
                    "Where do you live?",
                    "What is your name?",
                ],
            }
        ),
        model_config={
            "model_context": "My name is Hans and I live in Germany.",
        },
    )
>>> print(model_info.signature)
inputs: 
  ['question': string (required)]
outputs: 
  ['score': double (required), 'start': long (required), 'end': long (required), 'answer': string (required)]
params: 
  None
from ray import serve

serve.run(
    deployment_from_model_name(
        "question_answerer",
        default_version=str(model_info.registered_model_version),
    ).bind(),
    blocking=False,
)
>>> import requests

>>> resp = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     json={"question": "The weather is lovely today"},
... )
>>> print(resp.json())
[{'score': 3.255764386267401e-05, 'start': 30, 'end': 38, 'answer': 'Germany.'}]

结论

在本笔记本中,我们利用 MLflow 内置的对跟踪模型签名的支持,大大简化了部署 HTTP 服务器以在线方式服务该模型的过程。我们利用 Ray Serve 强大但繁琐的原语来增强自身能力,只需一行代码即可部署具有以下功能的模型服务器

  • 版本多路复用;
  • 自动 REST API 签名设置;
  • 防止使用具有不兼容签名的模型版本的安全措施。

通过这样做,我们证明了 Ray Serve 作为工具包的价值和潜力,您和您的团队可以在此基础上 “构建自己的 ML 平台”。

我们还演示了减少与组合使用多种工具相关的集成开销和繁琐工作的方法。无缝集成是支持自包含的、包罗万象的平台(如 AWS Sagemaker 或 GCP Vertex AI)的有力论据。我们已经证明,通过一些巧妙的工程设计和对用户(在本例中为 MLE)关心的摩擦点的原则性关注,我们可以获得类似的好处,而无需将我们自己和我们的团队束缚在昂贵的供应商合同中。

练习

  • 生成的 API 签名与模型签名非常相似,但仍然存在一些不匹配之处。您能找出它在哪里吗?尝试修复它。提示:当您尝试将多个问题传递到我们设置的问答端点时会发生什么?
  • MLflow 模型签名允许可选输入。我们当前的实现没有考虑这一点。我们如何扩展此处的实现以支持可选输入?
  • 同样,MLflow 模型签名允许非输入 “推理参数”,我们当前的实现也不支持。我们如何扩展此处的实现以支持推理参数?
  • 每次生成新部署时,我们都使用名称 DynamicallyDefinedDeployment,无论我们传入的模型名称和版本是什么。这是个问题吗?如果是,您预见到这种方法会产生什么样的问题?尝试调整 deployment_from_model_name() 以处理这些问题。
< > 在 GitHub 上更新