开源 AI 食谱文档

从 MLflow 到 Ray Serve 的签名感知模型服务

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

Open In Colab

从 MLflow 到 Ray Serve 的签名感知模型服务

作者: Jonathan Jin

简介

本笔记本探讨了简化模型从模型注册表部署的解决方案。对于希望随着时间推移生产化许多模型的团队来说,在 AI/ML 项目生命周期中这个“转换点”上的投资可以显著缩短上市时间。这对于可能没有现有基础设施来形成在线模型在生产中提供“黄金路径”的年轻小型团队来说可能很重要。

动机

优化模型生命周期的这一阶段特别重要,因为最终结果面向生产。在此阶段,您的模型实际上成为一个微服务。这意味着您现在需要处理服务所有权的各个要素,这可能包括:

  • 标准化和强制 API 向后兼容性;
  • 日志记录、指标和一般可观测性问题;
  • 等等。

每次您想部署新模型时都需要重复相同的通用设置,这将导致您和您的团队的开发成本随着时间的推移显著增加。另一方面,考虑到生产模型所有权的“长尾”(假设生产化的模型不太可能很快退役),在这里精简投资可以随着时间的推移带来丰厚的回报。

鉴于以上所有内容,我们在这里的探索动机是以下用户故事:

我想使用只提供模型名称的方式从模型注册表(例如 MLflow)部署模型。每次我想部署新模型时,需要复制的样板和脚手架越少越好。我希望能够动态地选择不同版本的模型,而无需为适应这些新版本设置全新的部署。

组件

为此处的探索,我们将使用以下最小堆栈:

  • MLflow 用于模型注册;
  • Ray Serve 用于模型服务。

出于演示目的,我们将只使用 Hugging Face Hub 中的现成开源模型。

我们今天将使用 GPU 进行推理,因为推理性能与我们今天的重点无关。毋庸置疑,在“实际生活中”,您可能无法仅凭 CPU 计算来提供模型服务。

现在让我们安装依赖项。

!pip install "transformers" "mlflow-skinny" "ray[serve]" "torch"

注册模型

首先,让我们定义今天探索要使用的模型。为简单起见,我们将使用一个简单的文本翻译模型,其中源语言和目标语言在注册时是可配置的。实际上,这意味着不同“版本”的模型可以注册以翻译不同的语言,但底层模型架构和权重可以保持不变。

import mlflow
from transformers import pipeline


class MyTranslationModel(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        self.lang_from = context.model_config.get("lang_from", "en")
        self.lang_to = context.model_config.get("lang_to", "de")

        self.input_label: str = context.model_config.get("input_label", "prompt")

        self.model_ref: str = context.model_config.get("hfhub_name", "google-t5/t5-base")

        self.pipeline = pipeline(
            f"translation_{self.lang_from}_to_{self.lang_to}",
            self.model_ref,
        )

    def predict(self, context, model_input, params=None):
        prompt = model_input[self.input_label].tolist()

        return self.pipeline(prompt)

(您可能想知道为什么我们甚至费心使输入标签可配置。这将在稍后对我们有用。)

现在模型已经定义,让我们注册它的一个实际版本。这个特定版本将使用 Google 的 T5 Base 模型,并配置为从英语翻译到德语

import pandas as pd

with mlflow.start_run():
    model_info = mlflow.pyfunc.log_model(
        "translation_model",
        registered_model_name="translation_model",
        python_model=MyTranslationModel(),
        pip_requirements=["transformers"],
        input_example=pd.DataFrame(
            {
                "prompt": ["Hello my name is Jonathan."],
            }
        ),
        model_config={
            "hfhub_name": "google-t5/t5-base",
            "lang_from": "en",
            "lang_to": "de",
        },
    )

让我们记录这个确切的版本。这将在稍后有用。

en_to_de_version: str = str(model_info.registered_model_version)

注册的模型元数据包含一些对我们有用的信息。最值得注意的是,注册的模型版本与一个严格的签名相关联,该签名表示其输入和输出的预期形状。这将在稍后对我们有用。

>>> print(model_info.signature)
inputs: 
  ['prompt': string (required)]
outputs: 
  ['translation_text': string (required)]
params: 
  None

提供模型服务

现在模型已在 MLflow 中注册,让我们使用 Ray Serve 设置我们的服务脚手架。目前,我们将“部署”限制为以下行为:

  • 从 MLflow 获取选定的模型和版本;
  • 通过简单的 REST API 接收推理请求并返回推理响应。
import mlflow
import pandas as pd

from ray import serve
from fastapi import FastAPI

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class ModelDeployment:
    def __init__(self, model_name: str = "translation_model", default_version: str = "1"):
        self.model_name = model_name
        self.default_version = default_version

        self.model = mlflow.pyfunc.load_model(f"models:/{self.model_name}/{self.default_version}")

    @app.post("/serve")
    async def serve(self, input_string: str):
        return self.model.predict(pd.DataFrame({"prompt": [input_string]}))


deployment = ModelDeployment.bind(default_version=en_to_de_version)

您可能已经注意到,这里将 "prompt" 硬编码为输入标签,这在注册模型的签名和部署实现之间引入了隐藏的耦合。我们稍后会再讨论这个问题。

现在,让我们运行部署并试用它。

serve.run(deployment, blocking=False)
>>> import requests

>>> response = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     params={"input_string": "The weather is lovely today"},
... )

>>> print(response.json())
[{'translation_text': 'Das Wetter ist heute nett.'}]

这很好用,但您可能已经注意到 REST API 与模型签名不一致。也就是说,它使用标签 "input_string",而服务模型版本本身使用输入标签 "prompt"。同样,模型可以接受多个输入值,但 API 只接受一个。

如果这让您感到不适,请继续阅读;我们稍后会再讨论这个问题。

多个版本,一个端点

现在我们已经为模型建立了一个基本的端点。太棒了!但是,请注意,此部署严格绑定到此模型的单个版本——特别是已注册的 translation_model 的版本 1

现在想象一下,您的团队希望回来改进此模型——也许用新数据重新训练它,或者将其配置为翻译成新语言,例如法语而不是德语。这两种情况都会导致 translation_model 的新版本注册。但是,使用我们当前的部署实现,我们需要为 translation_model/2 设置一个全新的端点,要求我们的用户记住哪个地址和端口对应哪个版本的模型,等等。换句话说:非常麻烦,非常容易出错,非常 费力

相反,想象一种情况,我们可以重复使用完全相同的端点——相同的签名、相同的地址和端口、相同的查询约定等——来提供这两个版本的模型服务。我们的用户可以简单地指定他们想要使用的模型版本,并且在用户没有明确请求某个版本的情况下,我们可以将其中一个视为“默认”版本。

这是 Ray Serve 凭借其称为模型多路复用的功能大放异彩的领域之一。实际上,这允许您加载模型的多个“版本”,根据需要动态地热插拔它们,并在一段时间后卸载不使用的版本。换句话说,非常节省空间。

让我们尝试注册模型的另一个版本——这次是一个从英语翻译到法语的版本。我们将此版本注册为 "2";模型服务器将以这种方式检索模型版本。

但首先,让我们扩展模型服务器以支持多路复用。

from ray import serve
from fastapi import FastAPI

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class MultiplexedModelDeployment:

    @serve.multiplexed(max_num_models_per_replica=2)
    async def get_model(self, version: str):
        return mlflow.pyfunc.load_model(f"models:/{self.model_name}/{version}")

    def __init__(
        self,
        model_name: str = "translation_model",
        default_version: str = en_to_de_version,
    ):
        self.model_name = model_name
        self.default_version = default_version

    @app.post("/serve")
    async def serve(self, input_string: str):
        model = await self.get_model(serve.get_multiplexed_model_id())
        return model.predict(pd.DataFrame({"prompt": [input_string]}))
multiplexed_deployment = MultiplexedModelDeployment.bind(model_name="translation_model")
serve.run(multiplexed_deployment, blocking=False)

现在让我们实际注册新的模型版本。

import pandas as pd

with mlflow.start_run():
    model_info = mlflow.pyfunc.log_model(
        "translation_model",
        registered_model_name="translation_model",
        python_model=MyTranslationModel(),
        pip_requirements=["transformers"],
        input_example=pd.DataFrame(
            {
                "prompt": [
                    "Hello my name is Jon.",
                ],
            }
        ),
        model_config={
            "hfhub_name": "google-t5/t5-base",
            "lang_from": "en",
            "lang_to": "fr",
        },
    )

en_to_fr_version: str = str(model_info.registered_model_version)

注册完成后,我们可以通过模型服务器查询它,如下所示:

>>> import requests

>>> response = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     params={"input_string": "The weather is lovely today"},
...     headers={"serve_multiplexed_model_id": en_to_fr_version},
... )

>>> print(response.json())
[{'translation_text': "Le temps est beau aujourd'hui"}]

请注意,我们能够立即访问模型版本,而无需重新部署模型服务器。Ray Serve 的多路复用功能使其能够即时动态获取模型权重;如果我从未请求版本 2,它就永远不会被加载。这有助于为确实被查询的模型节省计算资源。更有用的是,如果加载的模型数量超过了配置的最大值 (max_num_models_per_replica),则最近最少使用的模型版本将被逐出

鉴于我们上面设置了 max_num_models_per_replica=2,默认的英译德模型版本应该仍然已加载并随时可用于服务请求,没有任何冷启动时间。现在让我们确认一下

>>> print(
...     requests.post(
...         "http://127.0.0.1:8000/serve/",
...         params={"input_string": "The weather is lovely today"},
...         headers={"serve_multiplexed_model_id": en_to_de_version},
...     ).json()
... )
[{'translation_text': 'Das Wetter ist heute nett.'}]

自动签名

这一切都很好。但是,请注意以下摩擦点仍然存在:在定义服务器时,我们需要为 API 本身定义一个全新的签名。最好情况下,这只是模型签名本身(已在 MLflow 中注册)的一些代码重复。最坏情况下,这可能导致您的团队或组织拥有的所有模型之间的 API 不一致,从而导致下游依赖项的混乱和挫败感。

在这种特殊情况下,这意味着 MultiplexedModelDeployment 实际上是translation_model 的用例紧密耦合的。如果我们想部署另一组与语言翻译无关的模型怎么办?已定义的 /serve API(返回一个类似 {"translated_text": "foo"} 的 JSON 对象)将不再有意义。

为了解决这个问题,如果 MultiplexedModelDeployment 的 API 签名可以自动镜像其所服务的基础模型的签名,那该怎么办?

幸运的是,借助 MLflow 模型注册表元数据和一些 Python 动态类创建的技巧,这完全可以实现。

让我们设置好,以便模型服务器签名可以从注册模型本身推断。由于 MLflow 的不同版本可以有不同的签名,我们将使用“默认版本”来“固定”签名;任何尝试多路复用不兼容签名模型版本的行为都将抛出错误。

由于 Ray Serve 在类定义时绑定请求和响应签名,我们将使用 Python 元类将其设置为指定模型名称和默认模型版本的函数。

import mlflow
import pydantic


def schema_to_pydantic(schema: mlflow.types.schema.Schema, *, name: str) -> pydantic.BaseModel:
    return pydantic.create_model(
        name, **{k: (v.type.to_python(), pydantic.Field(required=True)) for k, v in schema.input_dict().items()}
    )


def get_req_resp_signatures(
    model_signature: mlflow.models.ModelSignature,
) -> tuple[pydantic.BaseModel, pydantic.BaseModel]:
    inputs: mlflow.types.schema.Schema = model_signature.inputs
    outputs: mlflow.types.schema.Schema = model_signature.outputs

    return (schema_to_pydantic(inputs, name="InputModel"), schema_to_pydantic(outputs, name="OutputModel"))
import mlflow

from fastapi import FastAPI, Response, status
from ray import serve
from typing import List


def deployment_from_model_name(model_name: str, default_version: str = "1"):
    app = FastAPI()
    model_info = mlflow.models.get_model_info(f"models:/{model_name}/{default_version}")
    input_datamodel, output_datamodel = get_req_resp_signatures(model_info.signature)

    @serve.deployment
    @serve.ingress(app)
    class DynamicallyDefinedDeployment:

        MODEL_NAME: str = model_name
        DEFAULT_VERSION: str = default_version

        @serve.multiplexed(max_num_models_per_replica=2)
        async def get_model(self, model_version: str):
            model = mlflow.pyfunc.load_model(f"models:/{self.MODEL_NAME}/{model_version}")

            if model.metadata.get_model_info().signature != model_info.signature:
                raise ValueError(
                    f"Requested version {model_version} has signature incompatible with that of default version {self.DEFAULT_VERSION}"
                )
            return model

        # TODO: Extend this to support batching (lists of inputs and outputs)
        @app.post("/serve", response_model=List[output_datamodel])
        async def serve(self, model_input: input_datamodel, response: Response):
            model_id = serve.get_multiplexed_model_id()
            if model_id == "":
                model_id = self.DEFAULT_VERSION

            try:
                model = await self.get_model(model_id)
            except ValueError:
                response.status_code = status.HTTP_409_CONFLICT
                return [{"translation_text": "FAILED"}]

            return model.predict(model_input.dict())

    return DynamicallyDefinedDeployment


deployment = deployment_from_model_name("translation_model", default_version=en_to_fr_version)

serve.run(deployment.bind(), blocking=False)
>>> import requests

>>> resp = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     json={"prompt": "The weather is lovely today"},
... )

>>> assert resp.ok
>>> assert resp.status_code == 200

>>> print(resp.json())
[{'translation_text': "Le temps est beau aujourd'hui"}]
>>> import requests

>>> resp = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     json={"prompt": "The weather is lovely today"},
...     headers={"serve_multiplexed_model_id": str(en_to_fr_version)},
... )

>>> assert resp.ok
>>> assert resp.status_code == 200

>>> print(resp.json())
[{'translation_text': "Le temps est beau aujourd'hui"}]

现在让我们确认一下我们设置的签名检查功能确实有效。为此,让我们注册一个具有略微不同签名的相同模型。这应该足以触发故障保护。

(还记得我们在练习开始时,将输入标签设置为可配置的吗?这正是它最终发挥作用的地方。😎)

import pandas as pd

with mlflow.start_run():
    incompatible_version = str(
        mlflow.pyfunc.log_model(
            "translation_model",
            registered_model_name="translation_model",
            python_model=MyTranslationModel(),
            pip_requirements=["transformers"],
            input_example=pd.DataFrame(
                {
                    "text_to_translate": [
                        "Hello my name is Jon.",
                    ],
                }
            ),
            model_config={
                "input_label": "text_to_translate",
                "hfhub_name": "google-t5/t5-base",
                "lang_from": "en",
                "lang_to": "de",
            },
        ).registered_model_version
    )
import requests

resp = requests.post(
    "http://127.0.0.1:8000/serve/",
    json={"prompt": "The weather is lovely today"},
    headers={"serve_multiplexed_model_id": incompatible_version},
)
assert not resp.ok
resp.status_code == 409

assert resp.json()[0]["translation_text"] == "FAILED"

(从技术上讲,“正确”的做法是实现一个响应容器,允许将“错误消息”定义为实际响应的一部分,而不是像我们在这里所做的那样“滥用” `translation_text` 字段。但是,出于演示目的,这样做就可以了。)

为了彻底完成,让我们尝试注册一个完全不同的模型——具有完全不同的签名——并通过 deployment_from_model_name() 部署它。这将帮助我们确认整个签名是从加载的模型中定义的。

import mlflow
from transformers import pipeline


class QuestionAnswererModel(mlflow.pyfunc.PythonModel):
    def load_context(self, context):

        self.model_context = context.model_config.get(
            "model_context",
            "My name is Hans and I live in Germany.",
        )
        self.model_name = context.model_config.get(
            "model_name",
            "deepset/roberta-base-squad2",
        )

        self.tokenizer_name = context.model_config.get(
            "tokenizer_name",
            "deepset/roberta-base-squad2",
        )

        self.pipeline = pipeline(
            "question-answering",
            model=self.model_name,
            tokenizer=self.tokenizer_name,
        )

    def predict(self, context, model_input, params=None):
        resp = self.pipeline(
            question=model_input["question"].tolist(),
            context=self.model_context,
        )

        return [resp] if type(resp) is not list else resp
import pandas as pd

with mlflow.start_run():
    model_info = mlflow.pyfunc.log_model(
        "question_answerer",
        registered_model_name="question_answerer",
        python_model=QuestionAnswererModel(),
        pip_requirements=["transformers"],
        input_example=pd.DataFrame(
            {
                "question": [
                    "Where do you live?",
                    "What is your name?",
                ],
            }
        ),
        model_config={
            "model_context": "My name is Hans and I live in Germany.",
        },
    )
>>> print(model_info.signature)
inputs: 
  ['question': string (required)]
outputs: 
  ['score': double (required), 'start': long (required), 'end': long (required), 'answer': string (required)]
params: 
  None
from ray import serve

serve.run(
    deployment_from_model_name(
        "question_answerer",
        default_version=str(model_info.registered_model_version),
    ).bind(),
    blocking=False,
)
>>> import requests

>>> resp = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     json={"question": "The weather is lovely today"},
... )
>>> print(resp.json())
[{'score': 3.255764386267401e-05, 'start': 30, 'end': 38, 'answer': 'Germany.'}]

结论

在本笔记本中,我们利用 MLflow 对模型签名的内置支持,极大地简化了部署 HTTP 服务器以在线提供模型服务的流程。我们利用 Ray Serve 强大但灵活的原始功能,使我们能够一键部署一个具有以下功能的模型服务器:

  • 版本多路复用;
  • 自动 REST API 签名设置;
  • 防止使用不兼容签名模型版本的安全措施。

通过这样做,我们展示了 Ray Serve 作为工具包的价值和潜力,您和您的团队可以基于此“构建自己的机器学习平台”。

我们还演示了如何减少同时使用多个工具所带来的集成开销和繁琐工作。无缝集成是支持自包含、一体化平台(如 AWS Sagemaker 或 GCP Vertex AI)的有力论据。我们已经证明,只要进行一些巧妙的工程设计,并以用户(在本例中是 MLE)关心的痛点为指导原则,我们就可以获得类似的好处,而无需将自己和团队与昂贵的供应商合同绑定。

练习

  • 生成的 API 签名与模型签名非常相似,但仍存在一些不匹配。您能找出它在哪里吗?尝试修复它。提示:当您尝试向我们设置的问答端点传递多个问题时会发生什么?
  • MLflow 模型签名允许可选输入。我们当前的实现没有考虑到这一点。我们如何扩展这里的实现来支持可选输入?
  • 同样,MLflow 模型签名允许非输入“推理参数”,而我们当前的实现也不支持。我们如何扩展我们的实现来支持推理参数?
  • 无论我们传入什么模型名称和版本,我们每次生成新部署时都使用名称 DynamicallyDefinedDeployment。这是一个问题吗?如果是,您认为这种方法会产生什么样的问题?尝试调整 deployment_from_model_name() 来处理这些问题。
< > 在 GitHub 上更新