构建您的第一个 Kubeflow 流水线:一份全面的指南

社区文章 发布于 2023 年 10 月 15 日

Kubeflow 是一个开源平台,旨在实现端到端,简化机器学习 (ML) 工作流的每一步。它的目标是使 ML 工作流在 Kubernetes 上的部署变得简单、可移植和可扩展。其最强大的功能之一是 Kubeflow Pipelines,这是一个用于构建、部署和管理基于 Docker 容器的 ML 工作流的平台。

为什么您应该关注它?Kubeflow Pipelines 的强大之处在于它们能够自动化和简化整个机器学习过程,从数据摄取到模型部署。这不仅节省了时间,还有助于保持项目的一致性和质量。

在这篇文章中,我们将探讨如何从头开始构建您的第一个 Kubeflow 流水线。最后,您将对 Kubeflow 是什么以及如何使用它来构建 ML 工作流有扎实的理解。

image/png

Kubeflow 和机器学习工作流

Kubeflow 是一个为数据科学家和机器学习工程师设计的平台,融合了两类功能。数据科学家可以使用 Kubeflow 来试验 ML 模型,并以最有效的方式在 Kubernetes 上编排他们的实验。机器学习工程师可以使用 Kubeflow 将 ML 系统部署到不同的环境进行开发、测试和生产服务。下图展示了机器学习项目中的两个不同阶段:(i) 实验阶段和 (ii) 生产阶段。

drawing
ML 工作流中的 Kubeflow 组件

Kubeflow 拥有许多不同的组件,几乎可以支持流水线中的所有步骤。例如,为了调整模型的超参数,Kubeflow 有一个名为“Katib”的组件。

Kubeflow 还与 MLOps 原则高度契合,旨在弥合机器学习和运营之间的鸿沟。通过提供统一的工作流,Kubeflow 更易于管理从实验到生产的 ML 项目,整合了 DevOps 的各个方面,并促进了数据科学家和运营人员之间的协作。

Kubeflow 的三大原则

  • 可组合性:Kubeflow 具有高度可组合性,允许您根据需要,在机器学习流水线的不同部分使用不同版本的 TensorFlow 或任何其他 ML 库。

  • 可移植性:使用 Kubeflow,您可以在任何运行 Kubernetes 的地方运行您的整个机器学习项目。它抽象了平台特定的细节,使您能够编写一次代码,然后在笔记本电脑或基于云的集群上运行。

  • 可伸缩性:Kubeflow 旨在实现伸缩性,提供灵活的资源分配,按需分配更多资源,并在不再需要时释放资源。无论您使用的是 CPU、GPU 还是 TPU,Kubeflow 都能帮助您充分利用硬件资源。

drawing
Kubeflow 概念实体

安装 Kubeflow

有两种方法可以开始使用 Kubeflow

  1. 使用打包分发版进行安装,这既简单又直接。您可以在此处找到更多关于使用打包分发版安装 Kubeflow 的信息。
  2. 使用清单进行安装,这更高级。详细说明可以在此处找到。

打包分发版由各自的维护者开发和支持。例如,微软维护 Azure 上的 Kubeflow。您可以在下表中找到完整的发行版列表

drawing
Kubeflow 打包分发版

您还可以参考名为《Kubeflow:如何在本地机器上安装和启动 Kubeflow》的博客,获取更详细的安装说明。

安装后,您将可以访问 Kubeflow 仪表板,其外观如下图所示。

drawing
Kubeflow 仪表板 UI 视图

一个简单的 Python 演示脚本

在深入了解 Kubeflow Pipelines 之前,让我们创建一个简单的 Python 脚本,以了解我们旨在将其转换为流水线的目标。该脚本将模拟一个非常基本的 ML 工作流,其中我们读取一些数据,执行一个简单的计算,并保存结果。

您可以这样做

# my_script.py

def read_data():
    data = [1, 2, 3, 4, 5]
    return data

def compute_average(data):
    return sum(data) / len(data)

def save_result(value, filename='result.txt'):
    with open(filename, 'w') as f:
        f.write(str(value))

if __name__ == "__main__":
    data = read_data()
    avg = compute_average(data)
    save_result(avg)

这个 Python 脚本包含三个函数:一个用于读取数据(read_data),一个用于计算平均值(compute_average),以及一个用于保存结果(save_result)。我们的目标是将这些函数中的每一个都转换为一个 Kubeflow 流水线组件。

在下一节中,我们将深入研究 Kubeflow 组件,并向您展示如何从这个简单的 Python 脚本构建一个组件。

理解 Kubeflow 组件

Kubeflow 组件是流水线的构建块。本质上,每个组件都是一个独立的、执行 ML 工作流中一个步骤的代码片段。它独立运行,并且只做好一件事,例如读取数据、转换特征、训练模型或提供端点。

让我们将简单的 Python 脚本转换为 Kubeflow 组件。我们将使用 Kubeflow Pipelines SDK 的编译器模块来完成此操作。

创建 Kubeflow 组件

首先,将 Python 脚本中的每个函数转换为一个单独的组件。

from kfp import components

def read_data() -> list:
    data = [1, 2, 3, 4, 5]
    return data

def compute_average(data: list) -> float:
    return sum(data) / len(data)

def save_result(value: float, filename: str = 'result.txt'):
    with open(filename, 'w') as f:
        f.write(str(value))

# Compile the component
read_data_op = components.func_to_container_op(func=read_data,
                                output_component_file='read_data_component.yaml',
                                base_image='python:3.7',  # You can specify the base image here
                                packages_to_install=[])  # Any packages that need to be installed can be added here

compute_average_op = components.func_to_container_op(func=compute_average,
                                output_component_file='compute_average_component.yaml',
                                base_image='python:3.7',
                                packages_to_install=[])

components.func_to_container_op(save_result,
                                output_component_file='save_result_component.yaml')

组件是 Kubeflow 流水线的构建块。在我们的示例中,我们使用 func_to_container_op 函数将 Python 函数转换为组件。在转换过程中,您可能注意到了两个参数:base_imagepackages_to_install

base_image 参数指定了将用作组件执行环境的 Docker 镜像。简单来说,它就像组件的操作系统。这个镜像应该包含运行代码所需的所有必要软件。

  • 为什么重要?:不同的代码库可能需要不同的运行时环境。例如,如果您正在进行一个 TensorFlow 项目,您可以选择一个预安装了 TensorFlow 的基础镜像。

  • 使用示例:

    base_image='tensorflow/tensorflow:2.4.0'
    
  • 默认值:如果您没有指定 base_image,则使用默认的 Python 镜像,这是一个安装了 Python 的最小镜像。

packages_to_install 参数允许您安装代码运行所需的额外 Python 包。这是一个软件包名称数组,将使用 pip 进行安装。

  • 为什么重要?:有时您的代码依赖于基础镜像中不存在的第三方库。在这种情况下,您可以提供这些软件包的名称,它们将在组件运行时安装。

  • 使用示例:

    packages_to_install=['numpy', 'pandas']
    
  • 注意:指定软件包时要小心,因为安装不兼容的版本可能会导致组件崩溃。

创建 Kubeflow 流水线

定义好组件后,下一步是将它们组织成一个流水线。为此,您将使用 Kubeflow 的领域特定语言 (DSL) 将组件连接起来。一旦您将组件编译并保存为 .yaml 文件,就可以将它们组装成一个流水线了。为此,我们将编写一个 Python 函数,使用 Kubeflow Pipelines SDK 来定义流水线的结构。

在 Kubeflow Pipelines 中,流水线本质上是一个用 @dsl.pipeline 装饰的 Python 函数。在这个函数中,您可以将组件作为构建块使用。以下是如何使用我们编译的组件定义 Kubeflow 流水线的方法。

导入编译后的组件

您可以这样导入编译后的组件

import kfp
from kfp import dsl

read_data_op = kfp.components.load_component_from_file('read_data_component.yaml')
compute_average_op = kfp.components.load_component_from_file('compute_average_component.yaml')
save_result_op = kfp.components.load_component_from_file('save_result_component.yaml')

组装流水线

加载组件后,我们将其连接起来,形成一个流水线。

@dsl.pipeline(
    name='My first pipeline',
    description='A simple pipeline that computes the average of an array.'
)
def my_pipeline():
    read_data_task = read_data_op()
    compute_average_task = compute_average_op(read_data_task.output)
    save_result_task = save_result_op(compute_average_task.output)

# Compile the pipeline
kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.yaml')

这个流水线首先使用 read_data_op 读取数据,然后使用 compute_average_op 计算平均值,最后使用 save_result_op 保存结果。

此外,这里有一个片段展示了如何在组件之间传递参数

@dsl.pipeline(
    name='My parameterized pipeline',
    description='A simple pipeline that reads data and takes a parameter.'
)
def my_pipeline(my_param: int):
    read_data_task = read_data_op()
    another_task = another_component_op(my_param)

这允许您创建更具动态性和可配置性的流水线。

在接下来的部分中,我们将探讨如何部署此流水线以及使用 Kubeflow Pipelines 时应遵循的最佳实践。

部署 Kubeflow 流水线

构建并编译好流水线后,下一步是将其部署到 Kubeflow 平台上。这包括上传编译后的 .yaml 文件,然后运行流水线。

上传流水线

  1. 导航到您的 Kubeflow 仪表板。
  2. 进入 Pipelines 部分。
  3. 点击 Upload Pipeline
  4. 浏览并选择 my_pipeline.yaml 文件。

上传后,您将看到您的流水线列在您可能已上传的其他流水线中。

运行流水线

  1. 点击您刚刚上传的流水线。
  2. 点击 Run 按钮。
  3. 为您的运行命名,然后点击 Start

现在,您可以监控流水线在每个阶段的进展。成功执行将表明您的流水线已正确部署。

drawing
Kubeflow 流水线视图示例

最佳实践

在使用 Kubeflow Pipelines 时,某些最佳实践可以帮助您充分利用该平台。以下是一些需要考虑的指南,以获得更顺畅的体验

版本控制组件

  • 确保您的组件的每个版本都得到充分文档化和版本控制。这将使您将来更容易调试和更新流水线。

错误处理

  • 确保在组件中包含错误处理机制。这可以通过在 Python 代码中捕获异常并记录有意义的消息来完成。
# Example of a Python function with error handling
def read_data() -> int:
    try:
        # code to read data
        data = [1, 2, 3, 4, 5]
        return data
    except Exception as e:
        print(f"An error occurred: {e}")
        return -1

依赖管理

  • 明确指定所有依赖项,无论是 Dockerfile 中还是作为组件元数据的一部分。

组件可重用性

  • 将组件设计为可重用,以便您可以根据需要在不同的流水线中插入它们。

监控资源

  • 密切关注流水线使用的资源(CPU、内存等)。优化组件以使其尽可能高效。

总而言之,Kubeflow Pipelines 提供了一种简化方式,可以将您的 ML 项目从简单的脚本转变为健壮的端到端工作流。我们涵盖了从环境设置到构建和部署您的第一个 Kubeflow 流水线的方方面面。

将基础知识扩展到实际机器学习项目

到目前为止,为了清晰起见,我们的示例都极其基础。然而,请不要低估 Kubeflow Pipelines 的强大功能;我们所涵盖的原则可以令人印象深刻地扩展到实际的机器学习项目。

实际用例

您定义的每个组件都可以代表典型机器学习工作流中的一个步骤。以下是如何将 Kubeflow 组件映射到您的机器学习项目

  1. 数据收集read_data 组件可以扩展为从各种来源(如数据库、Excel 文件或 API)收集数据。

  2. 预处理:您可以拥有另一个用于数据清洗和预处理的组件,将原始数据转换为可以输入到 ML 模型中的格式。

  3. 数据分割:一个组件可以用于将数据集分割为训练集、验证集和测试集。

  4. 模型训练:在这里,您可以使用一个组件来使用预处理过的训练集训练模型。

  5. 评估:最后,一个组件可以使用各种指标来评估模型,以了解其性能如何。

示例

假设您有用于以下每个步骤的 Python 函数

  • read_data()
  • preprocess_data()
  • train_test_split()
  • train_model()
  • evaluate_model()

这些函数中的每一个都可以使用 func_to_container_op 转换为 Kubeflow 组件。一旦它们成为组件,您就可以像我们对简单的 read_datacompute_average 组件所做的那样,将它们排列在流水线中。这使您能够自动化整个机器学习工作流!


到此为止!希望您现在已经奠定了坚实的基础,可以开始构建自己的 Kubeflow 流水线,无论是简单的任务还是复杂的机器学习工作流。请记住,使用 Kubeflow,可能性是无限的!

在这篇文章中,我们涵盖了很多内容——从环境设置到构建和部署您的第一个 Kubeflow 流水线。现在,您应该对 Kubeflow 是什么、Kubeflow 流水线是什么以及它们如何融入机器学习工作流的大局有了扎实的理解。

Kubeflow Pipelines 是自动化和扩展您的 ML 工作流的重要工具。随着您深入 ML 项目,创建健壮、可扩展流水线的能力将变得越来越有价值。

如需更多信息和进一步学习,请随时访问Kubeflow 官方文档

参考资料

如需进一步阅读和探索,您可能会发现以下资源很有用

感谢您的阅读!欢迎分享本指南,传播知识。

学习愉快!🚀


如果您有任何问题或想联系我,我的所有社交媒体账号都在这个链接上。

您也可以在我的网站上关注我的其他博客文章。

社区

注册登录以发表评论