用 Rust 从零开始构建张量(第 1.3 部分):数据操作

社区文章 发布于 2025 年 7 月 31 日

上一部分,我们为张量库构建了视图操作。现在我们准备实现数据操作,这些操作实际转换内存中的值,并构成任何张量库的计算骨干。

像映射(map)、降维(reduce)和广播(broadcast)这样的数据操作是真正的工作发生的地方。它们是 GPU 加速的操作,也是任何张量库中最重要的部分。

理解张量维度

在实现数据操作之前,我们需要清楚地理解零维张量,因为它们将在我们的操作中发挥关键作用。

将张量维度视为一个层次结构:

维度 名称 描述 结构
0D 标量 数字 单个值
1D 向量 数字列表 标量列表
2D 矩阵 数字列表的列表 向量列表
3D 张量 数字列表的列表的列表 矩阵列表

遵循此模式,零维张量就是一个简单的标量,一个单一的数字。这种理解在我们构建操作时至关重要,因为我们的许多函数将使用或产生标量。

映射(Map)

第一个、最直接的操作是映射。

映射将函数独立地应用于张量的每个元素。在机器学习框架中,可以将其视为对张量应用逐元素函数,如 ReLU、tanh 或 sigmoid。

表示映射的一种好方法是使用 einop 符号:

b h w c -func> b h w c

应用于每个元素的函数必须接收一个标量(零维张量)并输出一个标量。

func(标量) -> 标量

这在我们的张量库中很容易实现。我们只需传入一个函数并将其应用于我们数据中的每个元素。

我们将映射函数添加到 TensorStorage 中:

impl<T> TensorStorage<T> {
    fn map<F, U>(&self, f: F) -> TensorStorage<U>
    where
        F: Fn(&T) -> U,
    {
        TensorStorage {
            data: self.data.iter().map(f).collect(),
        }
    }
}

这有一些新的类型约束。首先,请注意映射函数不必返回与输入向量相同的类型;输出张量的类型取决于我们传入的函数的输出类型。

当然,还要添加到我们的 Tensor 实现中:

impl<T> Tensor<T> {
    fn map<F, U>(&self, f: F) -> Tensor<U>
    where
        F: Fn(&T) -> U,
    {
        Tensor {
            shape: self.shape.clone(),
            storage: self.storage.map(f),
        }
    }
}

请注意,我们正在将大部分计算推入 TensorStorage。当我们探索在 CPU 以外的设备上运行操作时,这将会有所回报。但那是另一部分的内容。我们现在只关注 CPU。

生产环境:Candle

Candle 没有像我们上面那样可以接受任意函数的映射操作,因为它很难将任意函数转换为其他设备。因此,它有一个名为 UnaryOp 的结构体,它描述了可以以这种方式应用于张量的所有不同函数。

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UnaryOp {
    Exp,
    Log,
    Sin,
    Cos,
    Abs,
    Neg,
    Recip,
    Sqr,
    Sqrt,
    Gelu,
    GeluErf,
    Erf,
    Relu,
    Silu,
    Tanh,
    Floor,
    Ceil,
    Round,
    Sign,
}

这是实际执行计算的 unary_map 函数

pub fn unary_map<T: Copy, U: Copy, F: FnMut(T) -> U>(
    vs: &[T],
    layout: &Layout,
    mut f: F,
) -> Vec<U> {
    match layout.strided_blocks() {
        crate::StridedBlocks::SingleBlock { start_offset, len } => vs
            [start_offset..start_offset + len]
            .iter()
            .map(|&v| f(v))
            .collect(),
        crate::StridedBlocks::MultipleBlocks {
            block_start_index,
            block_len,
        } => {
            let mut result = Vec::with_capacity(layout.shape().elem_count());
            // Specialize the case where block_len is one to avoid the second loop.
            if block_len == 1 {
                for index in block_start_index {
                    let v = unsafe { vs.get_unchecked(index) };
                    result.push(f(*v))
                }
            } else {
                for index in block_start_index {
                    for offset in 0..block_len {
                        let v = unsafe { vs.get_unchecked(index + offset) };
                        result.push(f(*v))
                    }
                }
            }
            result
        }
    }
}

降维(Reduce)

降维操作接收一个张量,并使用一个(通常是关联的)函数,沿着指定的轴组合所有子张量。机器学习框架中的一个例子是沿轴求和或求积。

表示降维的一种好方法是使用 einop 符号:

b h w c -func> b w c

在这种情况下,我们有一个形状为 [b, h, w, c] 的四维张量,我们希望提取每个形状为 [b, w, c] 的子张量并逐元素组合在一起。

输出张量中的每个条目都是一个函数的输出:

func(张量) -> 标量

在这种情况下,对于输出张量中的每个条目,该函数应用于形状为 [h] 的相应子张量。

就像求和会把张量中的所有元素加起来一样。

您也可以使用任何遵循以下规则的关联函数:

func(标量, 标量) -> 标量

通常,这种类型的函数最好是二元结合的,这样我们就可以更有效地并行执行它,但现在这不是一个大问题。

结合性是指操作的顺序无关紧要的属性。

例如,加法:

a + (b + c) = (a + b) + c

拥有一个结合函数使我们能够轻松地并行化计算。

例如,如果我们要对数字列表 [a, b, c, d, e, f, g, h] 求和,我们可以分 3 步并行执行:

步骤 1:a + b = abc + d = cde + f = efg + h = gh

步骤 2:ab + cd = abcdef + gh = efgh

步骤 3:abcd + efgh = abcdefgh = a + b + c + d + e + f + g + h

我们将为我们的 TensorStorage 实现这个功能。我们只实现接受两个标量并返回一个标量(所有类型相同)的函数。

impl<T: Clone> TensorStorage<T> {
    fn reduce<F>(&self, shape: &TensorShape, dim: usize, f: F) -> TensorStorage<T>
    where
        F: Fn(&T, &T) -> T,
        T: Zero,
    {
        // Calculate result shape by removing the reduction dimension
        let mut result_shape_vec = shape.shape.clone();
        result_shape_vec.remove(dim);

        if result_shape_vec.is_empty() {
            // Reducing to scalar
            let v = self
                .data
                .iter()
                .cloned()
                .reduce(|a, b| f(&a, &b))
                .expect("Cannot reduce an empty tensor storage");
            return TensorStorage { data: vec![v] };
        }

        let result_shape = TensorShape::new(result_shape_vec);
        let mut result_storage = TensorStorage::zeros(result_shape.size());

        // Iterate through all positions in the result tensor
        for result_flat_idx in 0..result_shape.size() {
            let result_multi_idx = result_shape.unravel_index(result_flat_idx);

            // For each result position, reduce along the specified dimension
            let mut accumulated_value: Option<T> = None;

            for dim_idx in 0..shape.shape[dim] {
                // Reconstruct the full multi-index by inserting the dimension index
                let mut full_multi_idx = Vec::with_capacity(shape.shape.len());
                let mut result_idx_pos = 0;

                for d in 0..shape.shape.len() {
                    if d == dim {
                        full_multi_idx.push(dim_idx);
                    } else {
                        full_multi_idx.push(result_multi_idx[result_idx_pos]);
                        result_idx_pos += 1;
                    }
                }

                let source_flat_idx = shape.ravel_index(&full_multi_idx);
                let value = &self[source_flat_idx];

                accumulated_value = match accumulated_value {
                    None => Some(value.clone()),
                    Some(acc) => Some(f(&acc, value)),
                };
            }

            result_storage[result_flat_idx] = accumulated_value.unwrap();
        }

        result_storage
    }
}

然后也将其添加到我们的 Tensor 实现中:

impl<T: Zero + Clone> Tensor<T> {

    // Rest of implementation

    fn reduce<F>(&self, dim: usize, f: F) -> Tensor<T>
    where
        F: Fn(&T, &T) -> T,
    {
        if dim >= self.shape.shape.len() {
            panic!(
                "dim {} out of bounds for tensor with {} dimensions",
                dim,
                self.shape.shape.len()
            );
        }

        let result_storage = self.storage.reduce(&self.shape, dim, f);

        // Calculate result shape by removing the reduction dimension
        let mut result_shape_vec = self.shape.shape.clone();
        result_shape_vec.remove(dim);
        let result_tensor_shape = TensorShape::new(result_shape_vec);

        Tensor {
            shape: result_tensor_shape,
            storage: result_storage,
        }
    }
}

所以现在我们可以沿任意维度对任何张量进行降维。

生产环境:Candle

Candle 的 CPU 降维实现仅允许有限数量的不同降维,基于 ReduceOp 枚举,原因与映射类似,因为函数很难在设备之间转换。

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReduceOp {
    Sum,
    Min,
    Max,
    ArgMin,
    ArgMax,
}
fn reduce_op(&self, op: ReduceOp, layout: &Layout, reduce_dims: &[usize]) -> Result<Self> {
    match op {
        ReduceOp::Sum => {
            let src_dims = layout.dims();
            let mut dst_dims = src_dims.to_vec();
            for &dim in reduce_dims.iter() {
                dst_dims[dim] = 1;
            }
            let dst_shape = Shape::from(dst_dims);
            let mut reduce_dims = reduce_dims.to_vec();
            // Sort the reduce_dims as they have to be processed from left to right when converting the
            // indexes.
            reduce_dims.sort();
            let reduce_dims_and_stride: Vec<_> = reduce_dims
                .iter()
                .map(|&d| (src_dims[d], src_dims[d + 1..].iter().product::<usize>()))
                .collect();
            ReduceSum {
                dst_shape: &dst_shape,
                reduce_dims: &reduce_dims,
                reduce_dims_and_stride,
            }
            .map(self, layout)
        }
        ReduceOp::Min | ReduceOp::ArgMin | ReduceOp::Max | ReduceOp::ArgMax => {
            let reduce_dim_index = match reduce_dims {
                [reduce_dim_index] => *reduce_dim_index,
                _ => {
                    let op = match op {
                        ReduceOp::Min => "min",
                        ReduceOp::ArgMin => "argmin",
                        ReduceOp::Max => "max",
                        ReduceOp::ArgMax => "argmax",
                        _ => unreachable!(),
                    };
                    let dims = reduce_dims.to_vec();
                    Err(Error::OnlySingleDimension { op, dims })?
                }
            };
            let (use_min, return_index) = match op {
                ReduceOp::Min => (true, false),
                ReduceOp::ArgMin => (true, true),
                ReduceOp::Max => (false, false),
                ReduceOp::ArgMax => (false, true),
                _ => unreachable!(),
            };
            ReduceIndex {
                reduce_dim_index,
                use_min,
                return_index,
            }
            .map(self, layout)
        }
    }
}

广播(Broadcast)

广播(或 Amplect)操作是第一个对多个张量进行操作的操作。它将二元标量函数逐元素应用于给定的一组对应维度。这在机器学习框架中就是乘法或加法。

表示广播的一种好方法是使用 einop 符号:

b h w c, c -func> b h w c

在这种情况下,我们有一个形状为 [b, h, w, c] 的四维张量,我们希望使用 func 将其与 c 组合。

输出张量中的每个条目都是一个类似于降维函数的输出。

func(标量, 标量) -> 标量

将 einop 符号中的每个形状维度视为计算索引的公式是很有帮助的。

因此,如果 T1 的形状为 [b, h, w, c]T2 的形状为 [c](其中 T1c 维度和 T2c 维度是对应维度),我们明确声明如下:

输出[i, j, k, l] = func(T1[i, j, k, l], T2[l])

对于实现,首先我们需要实现一种方法来找到两个张量的输出形状。我们在 TensorShape 上实现了 broadcast_shape 函数。

我们不再像上面那样使用 einop 符号手动标记每个维度,而是简单地提供对应的维度,所有维度将折叠到最左侧的对应维度。

如果你熟悉大多数框架(包括 Candle)中的广播,你可能会注意到我们采用了与通常的广播规则不同的策略。我在我关于广播的文章中讨论了这一点。我的主要观点之一是,广播规则不允许表达性和有用的外积。以下是主要观点的摘录:

广播逐维度比较张量,从最右边开始:

  1. 如果两个维度大小相同 -> 继续
  2. 如果任一维度大小为 1 -> 扩展以匹配另一个 -> 继续
  3. 如果张量耗尽维度 -> 将缺失维度视为大小为 1
  4. 否则 -> 广播失败

例如:T1 [a b c] 和 T2 [b]

  • 比较 cb -> 大小不同,都不是 1 -> 广播失败

...

为什么 [a b c][c] 兼容,而 [a b c][b] 不兼容?这两种操作在数学上都是有意义的。

事实上,任何两个张量都可以通过它们的外部积组合。

取形状为 [a b c] 的张量 A 和形状为 [d e f] 的张量 B

广播称这些是“不可广播的”,但是如果我们向 A 添加足够的单例维度,使其形状为 [a b c 1 1 1],它们就可以突然广播到形状 [a b c d e f],这就是外部积。

impl TensorShape {

    // Rest of implementation

    fn broadcast_shape(
        &self,
        other: &TensorShape,
        corresponding_dimensions: &[(usize, usize)],
    ) -> TensorShape {
        // Create mapping for corresponding dimensions
        let mut dim_correspondence = std::collections::HashMap::new();
        let mut other_dim_used = vec![false; other.shape.len()];

        for &(self_dim, other_dim) in corresponding_dimensions {
            if self_dim >= self.shape.len() || other_dim >= other.shape.len() {
                panic!("Dimension index out of bounds");
            }
            dim_correspondence.insert(self_dim, other_dim);
            other_dim_used[other_dim] = true;
        }

        // Build output shape: LHS dimensions (with broadcasting) + remaining RHS dimensions
        let mut output_shape = Vec::new();

        // Process LHS dimensions in order
        for (self_dim, &self_size) in self.shape.iter().enumerate() {
            if let Some(&other_dim) = dim_correspondence.get(&self_dim) {
                let other_size = other.shape[other_dim];

                if self_size == other_size {
                    output_shape.push(self_size);
                } else if self_size == 1 {
                    output_shape.push(other_size);
                } else if other_size == 1 {
                    output_shape.push(self_size);
                } else {
                    panic!(
                        "Cannot broadcast dimensions: {} and {}",
                        self_size, other_size
                    );
                }
            } else {
                output_shape.push(self_size);
            }
        }

        // Add remaining RHS dimensions that weren't used in correspondence
        for (other_dim, &other_size) in other.shape.iter().enumerate() {
            if !other_dim_used[other_dim] {
                output_shape.push(other_size);
            }
        }

        TensorShape::new(output_shape)
    }
}

然后对 TensorStorage 实现广播操作:

impl<T: Clone> TensorStorage<T> {

    // Rest of the implementation

    fn broadcast_op<F, U, T2>(
        &self,
        self_shape: &TensorShape,
        other: &TensorStorage<T2>,
        other_shape: &TensorShape,
        corresponding_dimensions: &[(usize, usize)],
        f: F,
    ) -> TensorStorage<U>
    where
        F: Fn(&T, &T2) -> U,
        U: Zero + Clone,
    {
        let result_shape = self_shape.broadcast_shape(other_shape, corresponding_dimensions);
        let mut result = TensorStorage::<U>::zeros(result_shape.size());

        // Create mapping for corresponding dimensions
        let mut dim_correspondence = std::collections::HashMap::new();
        let mut other_dim_used = vec![false; other_shape.shape.len()];

        for &(self_dim, other_dim) in corresponding_dimensions {
            dim_correspondence.insert(self_dim, other_dim);
            other_dim_used[other_dim] = true;
        }

        // Perform the element-wise operation
        for flat_idx in 0..result_shape.size() {
            let output_multi_idx = result_shape.unravel_index(flat_idx);

            // Map output indices to input tensor indices
            let mut self_idx = vec![0; self_shape.shape.len()];
            let mut other_idx = vec![0; other_shape.shape.len()];

            // Map LHS dimensions
            for (self_dim, &self_size) in self_shape.shape.iter().enumerate() {
                let output_val = output_multi_idx[self_dim];

                if let Some(&other_dim) = dim_correspondence.get(&self_dim) {
                    if self_size == 1 {
                        self_idx[self_dim] = 0;
                    } else {
                        self_idx[self_dim] = output_val;
                    }

                    if other_shape.shape[other_dim] == 1 {
                        other_idx[other_dim] = 0;
                    } else {
                        other_idx[other_dim] = output_val;
                    }
                } else {
                    self_idx[self_dim] = output_val;
                }
            }

            // Map remaining RHS dimensions
            let mut rhs_output_offset = self_shape.shape.len();
            for (other_dim, _) in other_shape.shape.iter().enumerate() {
                if !other_dim_used[other_dim] {
                    other_idx[other_dim] = output_multi_idx[rhs_output_offset];
                    rhs_output_offset += 1;
                }
            }

            // Get values from input tensors using proper indexing
            let self_flat = self_shape.ravel_index(&self_idx);
            let other_flat = other_shape.ravel_index(&other_idx);

            let self_val = &self[self_flat];
            let other_val = &other[other_flat];

            // Apply operation and store result
            result[flat_idx] = f(self_val, other_val);
        }

        result
    }
}

最后将其添加到我们的 Tensor 实现中:

impl<T: Zero + Clone> Tensor<T> {

    // Rest of the implementation

    fn broadcast_op<F, U, T2>(
        &self,
        other: &Tensor<T2>,
        corresponding_dimensions: &[(usize, usize)],
        f: F,
    ) -> Tensor<U>
    where
        F: Fn(&T, &T2) -> U,
        U: Zero + Clone,
    {
        let result_shape = self
            .shape
            .broadcast_shape(&other.shape, corresponding_dimensions);

        let result_storage = self.storage.broadcast_op(
            &self.shape,
            &other.storage,
            &other.shape,
            corresponding_dimensions,
            f,
        );

        Tensor {
            shape: result_shape,
            storage: result_storage,
        }
    }
}

生产环境:Candle

对于广播操作,Candle 会针对给定的内部函数运行这个宏:首先 Candle 找到一个对应的形状来广播每个张量,然后运行内部函数。

macro_rules! broadcast_binary_op {
    ($fn_name:ident, $inner_fn_name:ident) => {
        pub fn $fn_name(&self, rhs: &Self) -> Result<Self> {
            let lhs = self;
            let shape = lhs
                .shape()
                .broadcast_shape_binary_op(rhs.shape(), stringify!($fn_name))?;
            let l_broadcast = shape != *lhs.shape();
            let r_broadcast = shape != *rhs.shape();
            match (l_broadcast, r_broadcast) {
                (true, true) => lhs
                    .broadcast_as(&shape)?
                    .$inner_fn_name(&rhs.broadcast_as(&shape)?),
                (false, true) => lhs.$inner_fn_name(&rhs.broadcast_as(&shape)?),
                (true, false) => lhs.broadcast_as(&shape)?.$inner_fn_name(rhs),
                (false, false) => lhs.$inner_fn_name(rhs),
            }
        }
    };
}

像映射和降维一样,这意味着只有少数预先选择的函数可以以这种方式应用。出于与映射和降维类似的原因,在任意设备上运行任意函数是很困难的。

矩阵乘法

现在我们有了 3 个核心数据操作,我们可以构建一些复合操作。

我们可以使用广播和降维操作创建矩阵乘法。

对于形状为 [a b] 的矩阵 A 和形状为 [b c] 的矩阵 B,我们有:

a b, b c -multiply> a b c -sum> a c

第一个箭头表示广播乘法;第二个箭头表示降维操作,它对 b 维度(中间 [a b c] 结果的中间维度)执行求和。

在我们的张量框架中:

let intermediate = matrix_a.broadcast_op(&matrix_b, &[(1, 0)], |a, b| a * b);
let result = intermediate.reduce(1, |a, b| a + b);

批处理矩阵乘法

批处理矩阵乘法执行完全相同的 2 个操作。

对于形状为 [n a b] 的矩阵 A 和形状为 [n b c] 的矩阵 B,我们有:

n a b, n b c -multiply> n a b c -sum> n a c

在我们的张量格式中:

let batch_intermediate = batch_a.broadcast_op(&batch_b, &[(0, 0), (2, 1)], |a, b| a * b);
let batch_result = batch_intermediate.reduce(2, |a, b| a + b);

生产环境:Candle

Candle 有一个单独的 matmul 操作,因为 matmul 非常普遍且经过优化。它可以处理任意数量的批处理维度。

这在张量中:

pub fn matmul(&self, rhs: &Self) -> Result<Self> {
    let a_dims = self.shape().dims();
    let b_dims = rhs.shape().dims();

    let dim = a_dims.len();

    if dim < 2 || b_dims.len() != dim {
        Err(Error::ShapeMismatchBinaryOp {
            lhs: self.shape().clone(),
            rhs: rhs.shape().clone(),
            op: "matmul",
        }
        .bt())?
    }

    let m = a_dims[dim - 2];
    let k = a_dims[dim - 1];
    let k2 = b_dims[dim - 2];
    let n = b_dims[dim - 1];

    let c_shape = Shape::from(&a_dims[..dim - 2]).extend(&[m, n]);
    if c_shape.elem_count() == 0 || k == 0 {
        return Tensor::zeros(c_shape, self.dtype(), self.device());
    }
    let batching: usize = a_dims[..dim - 2].iter().product();
    let batching_b: usize = b_dims[..dim - 2].iter().product();
    if k != k2 || batching != batching_b {
        Err(Error::ShapeMismatchBinaryOp {
            lhs: self.shape().clone(),
            rhs: rhs.shape().clone(),
            op: "matmul",
        }
        .bt())?
    }

    let storage = self.storage().matmul(
        &rhs.storage(),
        (batching, m, n, k),
        self.layout(),
        rhs.layout(),
    )?;
    let op = BackpropOp::new2(self, rhs, Op::Matmul);
    Ok(from_storage(storage, c_shape, op, false))
}

它调用了 CPU 存储 matmul

fn matmul(
    &self,
    rhs: &Self,
    bmnk: (usize, usize, usize, usize),
    lhs_l: &Layout,
    rhs_l: &Layout,
) -> Result<Self> {
    MatMul(bmnk).map(self, lhs_l, rhs, rhs_l)
}

Softmax

我们可以通过以下方式在最后一个维度上创建 softmax:

映射 exp

A: a b c -exp> a b c

降维求和

B: a b c -sum> a b

广播除法

C: a b c, a b -div> a b c

所以对于形状为 [a, b, c] 的张量 T1,我们有:

softmax(T1) = C(A(T1), B(A(T1)))

let exp_3d = tensor_3d.map(|x| x.exp());
let sum_3d = exp_3d.reduce(2, |a, b| a + b);
let softmax_3d = exp_3d.broadcast_op(&sum_3d, &[(0, 0), (1, 1)], |a, b| a / b);

生产环境:Candle

Candle 在 candle-nn 的 softmax 函数中做了类似的事情。

pub fn softmax<D: candle::shape::Dim>(xs: &Tensor, dim: D) -> Result<Tensor> {
    let dim = dim.to_index(xs.shape(), "softmax")?;
    let max = xs.max_keepdim(dim)?;
    let diff = xs.broadcast_sub(&max)?;
    let num = diff.exp()?;
    let den = num.sum_keepdim(dim)?;
    num.broadcast_div(&den)
}

代码

要运行此博客文章中的代码:

首先,克隆仓库

git clone git@github.com:greenrazer/easytensor.git

然后切换到此部分的标记提交:

git checkout part-3

然后运行测试:

cargo test

查看 src/ 中的代码。

后续步骤

我们现在已经实现了构成所有张量库基础的三个核心数据操作:映射、降维和广播。这些操作是计算主力,使得像 Candle 和 PyTorch 这样的库成为可能。

本系列的第一部分到此结束。在第二部分中,我们将把重点转向性能优化,使我们的操作足够快以适应实际使用。

在此期间,我鼓励您尝试这些操作。尝试实现其他复合操作,如注意力机制,或探索我们核心操作的不同组合如何创建复杂的行为。这种组合方法的美妙之处在于,一旦你理解了这三个构建块,你就可以构建几乎任何你需要的张量操作。

社区

注册登录评论