Transformers 文档

视频分类

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

视频分类

视频分类的任务是将标签或类别分配给整个视频。每个视频应只有一个类别。视频分类模型将视频作为输入,并返回关于视频属于哪个类别的预测。这些模型可用于分类视频的内容。视频分类的一个实际应用是动作/活动识别,这对于健身应用非常有用。它对有视觉障碍的人也很有帮助,尤其是在他们通勤时。

本指南将向您展示如何

  1. UCF101 数据集的子集上微调 VideoMAE
  2. 使用您微调的模型进行推理。

要查看与此任务兼容的所有架构和检查点,我们建议查看任务页面

在开始之前,请确保您已安装所有必要的库

pip install -q pytorchvideo transformers evaluate

您将使用 PyTorchVideo (别名为 pytorchvideo) 来处理和准备视频。

我们鼓励您登录您的 Hugging Face 帐户,以便您可以上传您的模型并与社区分享。出现提示时,输入您的令牌以登录

>>> from huggingface_hub import notebook_login

>>> notebook_login()

加载 UCF101 数据集

首先加载 UCF-101 数据集 的子集。这将使您有机会进行实验,并在花费更多时间在完整数据集上进行训练之前确保一切正常。

>>> from huggingface_hub import hf_hub_download

>>> hf_dataset_identifier = "sayakpaul/ucf101-subset"
>>> filename = "UCF101_subset.tar.gz"
>>> file_path = hf_hub_download(repo_id=hf_dataset_identifier, filename=filename, repo_type="dataset")

下载子集后,您需要解压缩压缩包

>>> import tarfile

>>> with tarfile.open(file_path) as t:
...      t.extractall(".")

从高层次来看,数据集的组织结构如下

UCF101_subset/
    train/
        BandMarching/
            video_1.mp4
            video_2.mp4
            ...
        Archery
            video_1.mp4
            video_2.mp4
            ...
        ...
    val/
        BandMarching/
            video_1.mp4
            video_2.mp4
            ...
        Archery
            video_1.mp4
            video_2.mp4
            ...
        ...
    test/
        BandMarching/
            video_1.mp4
            video_2.mp4
            ...
        Archery
            video_1.mp4
            video_2.mp4
            ...
        ...

然后您可以计算视频总数。

>>> import pathlib
>>> dataset_root_path = "UCF101_subset"
>>> dataset_root_path = pathlib.Path(dataset_root_path)
>>> video_count_train = len(list(dataset_root_path.glob("train/*/*.avi")))
>>> video_count_val = len(list(dataset_root_path.glob("val/*/*.avi")))
>>> video_count_test = len(list(dataset_root_path.glob("test/*/*.avi")))
>>> video_total = video_count_train + video_count_val + video_count_test
>>> print(f"Total videos: {video_total}")
>>> all_video_file_paths = (
...     list(dataset_root_path.glob("train/*/*.avi"))
...     + list(dataset_root_path.glob("val/*/*.avi"))
...     + list(dataset_root_path.glob("test/*/*.avi"))
...  )
>>> all_video_file_paths[:5]

sorted)视频路径如下所示

...
'UCF101_subset/train/ApplyEyeMakeup/v_ApplyEyeMakeup_g07_c04.avi',
'UCF101_subset/train/ApplyEyeMakeup/v_ApplyEyeMakeup_g07_c06.avi',
'UCF101_subset/train/ApplyEyeMakeup/v_ApplyEyeMakeup_g08_c01.avi',
'UCF101_subset/train/ApplyEyeMakeup/v_ApplyEyeMakeup_g09_c02.avi',
'UCF101_subset/train/ApplyEyeMakeup/v_ApplyEyeMakeup_g09_c06.avi'
...

您会注意到,有些视频片段属于同一组/场景,其中组由视频文件路径中的 g 表示。例如,v_ApplyEyeMakeup_g07_c04.aviv_ApplyEyeMakeup_g07_c06.avi

对于验证和评估拆分,您不希望有来自同一组/场景的视频片段,以防止 数据泄露。您在本教程中使用的子集考虑了此信息。

接下来,您将导出数据集中存在的标签集。此外,创建两个字典,它们在初始化模型时会很有用

  • label2id:将类名映射到整数。
  • id2label:将整数映射到类名。
>>> class_labels = sorted({str(path).split("/")[2] for path in all_video_file_paths})
>>> label2id = {label: i for i, label in enumerate(class_labels)}
>>> id2label = {i: label for label, i in label2id.items()}

>>> print(f"Unique classes: {list(label2id.keys())}.")

# Unique classes: ['ApplyEyeMakeup', 'ApplyLipstick', 'Archery', 'BabyCrawling', 'BalanceBeam', 'BandMarching', 'BaseballPitch', 'Basketball', 'BasketballDunk', 'BenchPress'].

有 10 个独特的类别。对于每个类别,训练集中有 30 个视频。

加载模型以进行微调

从预训练的检查点及其关联的图像处理器实例化视频分类模型。该模型的编码器带有预训练的参数,而分类头是随机初始化的。图像处理器将在为我们的数据集编写预处理管道时派上用场。

>>> from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification

>>> model_ckpt = "MCG-NJU/videomae-base"
>>> image_processor = VideoMAEImageProcessor.from_pretrained(model_ckpt)
>>> model = VideoMAEForVideoClassification.from_pretrained(
...     model_ckpt,
...     label2id=label2id,
...     id2label=id2label,
...     ignore_mismatched_sizes=True,  # provide this in case you're planning to fine-tune an already fine-tuned checkpoint
... )

在加载模型时,您可能会注意到以下警告

Some weights of the model checkpoint at MCG-NJU/videomae-base were not used when initializing VideoMAEForVideoClassification: [..., 'decoder.decoder_layers.1.attention.output.dense.bias', 'decoder.decoder_layers.2.attention.attention.key.weight']
- This IS expected if you are initializing VideoMAEForVideoClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing VideoMAEForVideoClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of VideoMAEForVideoClassification were not initialized from the model checkpoint at MCG-NJU/videomae-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.

该警告告诉我们,我们正在丢弃一些权重(例如,classifier 层的权重和偏差),并随机初始化其他一些权重(新 classifier 层的权重和偏差)。这在此情况下是预期的,因为我们正在添加一个新的头,我们没有该头的预训练权重,因此库会警告我们,我们应该先微调此模型,然后再将其用于推理,而这正是我们要做的事情。

注意此检查点 在此任务上表现更好,因为该检查点是通过在具有相当大领域重叠的类似下游任务上进行微调而获得的。您可以查看 此检查点,该检查点是通过微调 MCG-NJU/videomae-base-finetuned-kinetics 获得的。

准备用于训练的数据集

为了预处理视频,您将利用 PyTorchVideo 库。首先导入我们需要的依赖项。

>>> import pytorchvideo.data

>>> from pytorchvideo.transforms import (
...     ApplyTransformToKey,
...     Normalize,
...     RandomShortSideScale,
...     RemoveKey,
...     ShortSideScale,
...     UniformTemporalSubsample,
... )

>>> from torchvision.transforms import (
...     Compose,
...     Lambda,
...     RandomCrop,
...     RandomHorizontalFlip,
...     Resize,
... )

对于训练数据集转换,请使用均匀时间子采样、像素归一化、随机裁剪和随机水平翻转的组合。对于验证和评估数据集转换,请保留相同的转换链,但随机裁剪和水平翻转除外。要了解有关这些转换的详细信息,请查看 PyTorchVideo 的官方文档

使用与预训练模型关联的 image_processor 来获取以下信息

  • 视频帧像素将被归一化的图像均值和标准差。
  • 视频帧将被调整大小到的空间分辨率。

首先定义一些常量。

>>> mean = image_processor.image_mean
>>> std = image_processor.image_std
>>> if "shortest_edge" in image_processor.size:
...     height = width = image_processor.size["shortest_edge"]
>>> else:
...     height = image_processor.size["height"]
...     width = image_processor.size["width"]
>>> resize_to = (height, width)

>>> num_frames_to_sample = model.config.num_frames
>>> sample_rate = 4
>>> fps = 30
>>> clip_duration = num_frames_to_sample * sample_rate / fps

现在,分别定义数据集特定的转换和数据集。从训练集开始

>>> train_transform = Compose(
...     [
...         ApplyTransformToKey(
...             key="video",
...             transform=Compose(
...                 [
...                     UniformTemporalSubsample(num_frames_to_sample),
...                     Lambda(lambda x: x / 255.0),
...                     Normalize(mean, std),
...                     RandomShortSideScale(min_size=256, max_size=320),
...                     RandomCrop(resize_to),
...                     RandomHorizontalFlip(p=0.5),
...                 ]
...             ),
...         ),
...     ]
... )

>>> train_dataset = pytorchvideo.data.Ucf101(
...     data_path=os.path.join(dataset_root_path, "train"),
...     clip_sampler=pytorchvideo.data.make_clip_sampler("random", clip_duration),
...     decode_audio=False,
...     transform=train_transform,
... )

相同的工作流程顺序可以应用于验证和评估集

>>> val_transform = Compose(
...     [
...         ApplyTransformToKey(
...             key="video",
...             transform=Compose(
...                 [
...                     UniformTemporalSubsample(num_frames_to_sample),
...                     Lambda(lambda x: x / 255.0),
...                     Normalize(mean, std),
...                     Resize(resize_to),
...                 ]
...             ),
...         ),
...     ]
... )

>>> val_dataset = pytorchvideo.data.Ucf101(
...     data_path=os.path.join(dataset_root_path, "val"),
...     clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", clip_duration),
...     decode_audio=False,
...     transform=val_transform,
... )

>>> test_dataset = pytorchvideo.data.Ucf101(
...     data_path=os.path.join(dataset_root_path, "test"),
...     clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", clip_duration),
...     decode_audio=False,
...     transform=val_transform,
... )

注意:以上数据集管道取自 PyTorchVideo 官方示例。我们使用 pytorchvideo.data.Ucf101() 函数是因为它是为 UCF-101 数据集量身定制的。在底层,它返回一个 pytorchvideo.data.labeled_video_dataset.LabeledVideoDataset 对象。LabeledVideoDataset 类是 PyTorchVideo 数据集中所有视频内容的基础类。因此,如果您想使用 PyTorchVideo 现成不支持的自定义数据集,您可以相应地扩展 LabeledVideoDataset 类。请参阅 data API 文档 以了解更多信息。此外,如果您的数据集遵循类似的结构(如上所示),那么使用 pytorchvideo.data.Ucf101() 应该可以正常工作。

您可以访问 num_videos 参数以了解数据集中的视频数量。

>>> print(train_dataset.num_videos, val_dataset.num_videos, test_dataset.num_videos)
# (300, 30, 75)

可视化预处理后的视频以进行更好的调试

>>> import imageio
>>> import numpy as np
>>> from IPython.display import Image

>>> def unnormalize_img(img):
...     """Un-normalizes the image pixels."""
...     img = (img * std) + mean
...     img = (img * 255).astype("uint8")
...     return img.clip(0, 255)

>>> def create_gif(video_tensor, filename="sample.gif"):
...     """Prepares a GIF from a video tensor.
...
...     The video tensor is expected to have the following shape:
...     (num_frames, num_channels, height, width).
...     """
...     frames = []
...     for video_frame in video_tensor:
...         frame_unnormalized = unnormalize_img(video_frame.permute(1, 2, 0).numpy())
...         frames.append(frame_unnormalized)
...     kargs = {"duration": 0.25}
...     imageio.mimsave(filename, frames, "GIF", **kargs)
...     return filename

>>> def display_gif(video_tensor, gif_name="sample.gif"):
...     """Prepares and displays a GIF from a video tensor."""
...     video_tensor = video_tensor.permute(1, 0, 2, 3)
...     gif_filename = create_gif(video_tensor, gif_name)
...     return Image(filename=gif_filename)

>>> sample_video = next(iter(train_dataset))
>>> video_tensor = sample_video["video"]
>>> display_gif(video_tensor)
Person playing basketball

训练模型

利用 🤗 Transformers 中的 Trainer 来训练模型。要实例化 Trainer,您需要定义训练配置和评估指标。最重要的是 TrainingArguments,它是一个包含配置训练的所有属性的类。它需要一个输出文件夹名称,该名称将用于保存模型的检查点。它还有助于同步 🤗 Hub 上模型存储库中的所有信息。

大多数训练参数都是不言自明的,但这里非常重要的一个是 remove_unused_columns=False。这将删除模型调用函数未使用的任何特征。默认情况下,它是 True,因为通常最好删除未使用的特征列,从而更容易将输入解包到模型的调用函数中。但是,在这种情况下,您需要未使用的特征(尤其是“video”),以便创建 pixel_values(这是我们的模型在其输入中期望的强制性键)。

>>> from transformers import TrainingArguments, Trainer

>>> model_name = model_ckpt.split("/")[-1]
>>> new_model_name = f"{model_name}-finetuned-ucf101-subset"
>>> num_epochs = 4

>>> args = TrainingArguments(
...     new_model_name,
...     remove_unused_columns=False,
...     eval_strategy="epoch",
...     save_strategy="epoch",
...     learning_rate=5e-5,
...     per_device_train_batch_size=batch_size,
...     per_device_eval_batch_size=batch_size,
...     warmup_ratio=0.1,
...     logging_steps=10,
...     load_best_model_at_end=True,
...     metric_for_best_model="accuracy",
...     push_to_hub=True,
...     max_steps=(train_dataset.num_videos // batch_size) * num_epochs,
... )

pytorchvideo.data.Ucf101() 返回的数据集未实现 __len__ 方法。因此,在实例化 TrainingArguments 时,我们必须定义 max_steps

接下来,您需要定义一个函数来计算预测的指标,这将使用您现在加载的 metric。您必须做的唯一预处理是获取我们预测的 logits 的 argmax

import evaluate

metric = evaluate.load("accuracy")


def compute_metrics(eval_pred):
    predictions = np.argmax(eval_pred.predictions, axis=1)
    return metric.compute(predictions=predictions, references=eval_pred.label_ids)

关于评估的说明:

VideoMAE 论文 中,作者使用了以下评估策略。他们评估来自测试视频的多个剪辑上的模型,并将不同的裁剪应用于这些剪辑,并报告聚合分数。但是,为了简单和简洁起见,我们没有在本教程中考虑这一点。

另外,定义一个 collate_fn,它将用于将示例批量处理在一起。每个批次由 2 个键组成,即 pixel_valueslabels

>>> def collate_fn(examples):
...     # permute to (num_frames, num_channels, height, width)
...     pixel_values = torch.stack(
...         [example["video"].permute(1, 0, 2, 3) for example in examples]
...     )
...     labels = torch.tensor([example["label"] for example in examples])
...     return {"pixel_values": pixel_values, "labels": labels}

然后,您只需将所有这些以及数据集传递给 Trainer

>>> trainer = Trainer(
...     model,
...     args,
...     train_dataset=train_dataset,
...     eval_dataset=val_dataset,
...     processing_class=image_processor,
...     compute_metrics=compute_metrics,
...     data_collator=collate_fn,
... )

您可能想知道,当您已经预处理了数据时,为什么还要将 image_processor 作为 tokenizer 传递。这只是为了确保图像处理器配置文件(存储为 JSON)也将上传到 Hub 上的存储库。

现在通过调用 train 方法微调我们的模型

>>> train_results = trainer.train()

训练完成后,使用 push_to_hub() 方法将您的模型分享到 Hub,以便每个人都可以使用您的模型

>>> trainer.push_to_hub()

推理

太棒了,现在您已经微调了一个模型,您可以使用它进行推理了!

加载用于推理的视频

>>> sample_test_video = next(iter(test_dataset))
Teams playing basketball

尝试使用微调模型进行推理的最简单方法是在 pipeline 中使用它。使用您的模型实例化视频分类的 pipeline,并将您的视频传递给它

>>> from transformers import pipeline

>>> video_cls = pipeline(model="my_awesome_video_cls_model")
>>> video_cls("https://huggingface.co/datasets/sayakpaul/ucf101-subset/resolve/main/v_BasketballDunk_g14_c06.avi")
[{'score': 0.9272987842559814, 'label': 'BasketballDunk'},
 {'score': 0.017777055501937866, 'label': 'BabyCrawling'},
 {'score': 0.01663011871278286, 'label': 'BalanceBeam'},
 {'score': 0.009560945443809032, 'label': 'BandMarching'},
 {'score': 0.0068979403004050255, 'label': 'BaseballPitch'}]

如果您愿意,您也可以手动复制 pipeline 的结果。

>>> def run_inference(model, video):
...     # (num_frames, num_channels, height, width)
...     perumuted_sample_test_video = video.permute(1, 0, 2, 3)
...     inputs = {
...         "pixel_values": perumuted_sample_test_video.unsqueeze(0),
...         "labels": torch.tensor(
...             [sample_test_video["label"]]
...         ),  # this can be skipped if you don't have labels available.
...     }

...     device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
...     inputs = {k: v.to(device) for k, v in inputs.items()}
...     model = model.to(device)

...     # forward pass
...     with torch.no_grad():
...         outputs = model(**inputs)
...         logits = outputs.logits

...     return logits

现在,将您的输入传递给模型并返回 logits

>>> logits = run_inference(trained_model, sample_test_video["video"])

解码 logits,我们得到

>>> predicted_class_idx = logits.argmax(-1).item()
>>> print("Predicted class:", model.config.id2label[predicted_class_idx])
# Predicted class: BasketballDunk
< > 在 GitHub 上更新