NumPy 科学计算库
1. 简介
NumPy(Numerical Python)是 Python 生态中最基础的科学计算库,提供了高性能的多维数组对象(ndarray)和丰富的数学函数。NumPy 是几乎所有 Python 数据科学和机器学习库的底层依赖,包括 PyTorch、TensorFlow、Pandas、SciPy 等。
NumPy 在 LLM 开发中的核心角色
- 数据预处理:文本数据的数值化、分词后的索引操作、Batch 的拼接与填充
- 嵌入计算:词嵌入矩阵的构建与操作
- 注意力矩阵运算:缩放点积注意力中的矩阵乘法、Softmax 数值稳定计算
- 线性代数运算:SVD 降维、PCA 分析、特征值分解
- 数值验证:为 PyTorch 自定义算子提供参考实现,验证正确性
2. 安装
# 标准安装
pip install numpy
# 指定版本
pip install numpy==1.26.4
# 验证安装
python -c "import numpy as np; print(np.__version__)"
3. 核心模块详解
3.1 ndarray:多维数组
ndarray(N-dimensional array)是 NumPy 的核心数据结构,提供高效的同类数据多维容器。
3.1.1 创建数组
import numpy as np
# 从列表创建
a = np.array([1, 2, 3]) # 1D 数组
b = np.array([[1, 2], [3, 4]], dtype=np.float32) # 2D 数组,指定类型
# 工厂函数
zeros = np.zeros((3, 4)) # 全零数组,形状 (3, 4)
ones = np.ones((2, 3)) # 全一数组
empty = np.empty((2, 3)) # 未初始化数组(值不确定,但速度最快)
full = np.full((2, 3), 7.0) # 用指定值填充
# 序列生成
arange = np.arange(0, 10, 2) # 等差序列:[0, 2, 4, 6, 8]
linspace = np.linspace(0, 1, 5) # 等间隔序列:[0, 0.25, 0.5, 0.75, 1.0]
logspace = np.logspace(0, 3, 4) # 对数间隔:[1, 10, 100, 1000]
# 随机数组
rng = np.random.default_rng(42) # 推荐方式:使用 Generator
rand = rng.random((2, 3)) # 均匀分布 [0, 1)
randn = rng.standard_normal((2, 3)) # 标准正态分布 N(0,1)
ints = rng.integers(0, 10, size=(2, 3)) # 整数 [0, 10)
# 特殊矩阵
eye = np.eye(3) # 3×3 单位矩阵
diag = np.diag([1, 2, 3]) # 对角矩阵
# 从 PyTorch 张量转换
import torch
t = torch.randn(2, 3)
arr = t.numpy() # CPU 张量 → NumPy(共享内存)
3.1.2 数组属性
a = np.random.randn(2, 3, 4)
a.ndim # 3 — 维度数(轴数)
a.shape # (2, 3, 4) — 各维度大小
a.size # 24 — 元素总数
a.dtype # float64 — 数据类型
a.itemsize # 8 — 每个元素的字节数
a.nbytes # 192 = 24 × 8 — 总字节数
a.strides # (96, 32, 8) — 各维度步长(字节)
3.1.3 数据类型
# NumPy 的数据类型体系
np.int8, np.int16, np.int32, np.int64 # 有符号整数
np.uint8, np.uint16, np.uint32, np.uint64 # 无符号整数
np.float16, np.float32, np.float64 # 浮点数
np.bool_ # 布尔型
np.complex64, np.complex128 # 复数
# 类型转换
a = np.array([1.5, 2.7, 3.9])
b = a.astype(np.int32) # [1, 2, 3] — 截断而非四舍五入
c = a.astype(np.float32) # 转为单精度
# LLM 中的常用类型
# 模型权重通常用 float32 或 float16
# Token ID 用 int32 或 int64
# 注意力掩码用 bool_
3.2 广播机制
广播(Broadcasting)是 NumPy 处理不同形状数组间运算的强大机制,无需显式复制数据。
3.2.1 广播规则
- 两个数组从最右边的维度开始对齐
- 两个维度大小相同,或其中一个为1,则兼容
- 兼容的维度中,大小为1的维度被”广播”(沿该维度复制)
- 不兼容的维度会导致错误
# 例1:(3, 1) + (1, 4) → (3, 4)
a = np.ones((3, 1)) # shape: (3, 1)
b = np.ones((1, 4)) # shape: (1, 4)
c = a + b # shape: (3, 4)
# 例2:(5, 3, 4) + (3, 4) → (5, 3, 4)
a = np.ones((5, 3, 4))
b = np.ones((3, 4))
c = a + b # b 被广播到 (5, 3, 4)
# 例3:(5, 3, 4) + (5, 1, 4) → (5, 3, 4)
a = np.ones((5, 3, 4))
b = np.ones((5, 1, 4))
c = a + b # b 的第1维从1广播到3
# 不兼容的情况
a = np.ones((3, 4))
b = np.ones((3, 5))
# a + b → ValueError: operands could not be broadcast together
3.2.2 LLM 中的广播应用
# 场景1:Layer Normalization
# hidden: (batch, seq_len, hidden_dim), gamma/beta: (hidden_dim,)
hidden = np.random.randn(4, 128, 768)
gamma = np.ones(768)
beta = np.zeros(768)
mean = hidden.mean(axis=-1, keepdims=True) # (4, 128, 1)
var = hidden.var(axis=-1, keepdims=True) # (4, 128, 1)
normalized = (hidden - mean) / np.sqrt(var + 1e-5) # 广播减法
output = gamma * normalized + beta # gamma/beta 广播到 (4, 128, 768)
# 场景2:RoPE 位置编码
# freqs: (seq_len, head_dim/2) → (1, seq_len, 1, head_dim/2)
freqs = np.random.randn(128, 64)
freqs = freqs[np.newaxis, :, np.newaxis, :] # 广播到 batch 和 num_heads 维度
3.3 矩阵运算
3.3.1 基本矩阵运算
A = np.random.randn(3, 4)
B = np.random.randn(4, 5)
# 矩阵乘法
C = np.dot(A, B) # (3, 4) @ (4, 5) → (3, 5)
C = A @ B # Python 3.5+ 推荐写法
C = np.matmul(A, B) # 与 @ 运算符等价
# 逐元素乘法(不是矩阵乘法!)
D = np.array([1, 2, 3])
E = np.array([4, 5, 6])
F = D * E # [4, 10, 18] — 逐元素相乘
# 转置
AT = A.T # (4, 3) — 转置
AT = np.transpose(A) # 等价写法
# 高维转置
X = np.random.randn(2, 3, 4, 5)
XT = np.transpose(X, (0, 2, 1, 3)) # 交换第1和第2维:(2, 4, 3, 5)
3.3.2 批量矩阵乘法(LLM 注意力中的核心运算)
# 批量矩阵乘法:支持同时计算多个矩阵乘法
# Q: (batch, heads, seq_len, head_dim)
# K: (batch, heads, head_dim, seq_len)
Q = np.random.randn(4, 8, 128, 64) # batch=4, heads=8
K = np.random.randn(4, 8, 128, 64)
# 注意力分数:Q @ K^T
attn_scores = Q @ np.transpose(K, (0, 1, 3, 2)) # (4, 8, 128, 128)
# 数值稳定的 Softmax
def stable_softmax(x, axis=-1):
"""避免数值溢出的 Softmax 实现"""
x_max = np.max(x, axis=axis, keepdims=True)
exp_x = np.exp(x - x_max) # 减去最大值防止溢出
return exp_x / np.sum(exp_x, axis=axis, keepdims=True)
attn_weights = stable_softmax(attn_scores / np.sqrt(64)) # 缩放点积注意力
3.3.3 行列式与逆矩阵
A = np.random.randn(3, 3)
# 行列式
det = np.linalg.det(A) # 标量
# 逆矩阵
A_inv = np.linalg.inv(A) # (3, 3)
# 验证:A @ A_inv ≈ I
print(np.allclose(A @ A_inv, np.eye(3))) # True
# 伪逆(非方阵也可用)
B = np.random.randn(3, 4)
B_pinv = np.linalg.pinv(B) # (4, 3)
3.4 随机数生成
3.4.1 Generator(推荐方式)
# 使用 new Generator API(NumPy 1.17+)
rng = np.random.default_rng(seed=42) # 可选种子,确保可复现
# 基本分布
rng.random((2, 3)) # 均匀分布 [0, 1)
rng.standard_normal((2, 3)) # 标准正态 N(0, 1)
rng.integers(0, 100, size=10) # 整数 [0, 100)
# LLM 相关分布
rng.normal(loc=0, scale=1, size=(768, 768)) # 正态分布 N(μ, σ²)
rng.uniform(low=-1, high=1, size=1000) # 均匀分布 U(a, b)
rng.multivariate_normal(mean=[0, 0], cov=[[1, 0.5], [0.5, 1]], size=1000) # 多元正态
# Dirichlet 分布(用于主题模型、混合权重)
alpha = np.array([1.0, 2.0, 3.0])
samples = rng.dirichlet(alpha, size=5) # (5, 3),每行和为1
3.4.2 LLM 中的随机数应用
# 权重初始化(Kaiming/He 初始化)
def kaiming_init(fan_in, shape, rng=None):
"""适用于 ReLU 激活的权重初始化"""
if rng is None:
rng = np.random.default_rng()
std = np.sqrt(2.0 / fan_in)
return rng.normal(0, std, size=shape)
weights = kaiming_init(768, (768, 3072)) # Transformer FFN 层初始化
# Xavier/Glorot 初始化
def xavier_init(fan_in, fan_out, rng=None):
"""适用于 Sigmoid/Tanh 激活的权重初始化"""
if rng is None:
rng = np.random.default_rng()
std = np.sqrt(2.0 / (fan_in + fan_out))
return rng.normal(0, std, size=(fan_in, fan_out))
# Dropout 实现
def dropout(x, p=0.1, rng=None):
"""NumPy 实现 Dropout"""
if rng is None:
rng = np.random.default_rng()
mask = rng.random(x.shape) > p # 以概率 p 置零
return x * mask / (1 - p) # 缩放以保持期望值不变
3.5 线性代数 (numpy.linalg)
3.5.1 奇异值分解 (SVD)
A = np.random.randn(4, 3) # 4×3 矩阵
# 完整 SVD
U, S, Vt = np.linalg.svd(A, full_matrices=True)
# U: (4, 4), S: (3,), Vt: (3, 3)
# A = U @ diag(S) @ Vt
# 截断 SVD(节省计算)
U, S, Vt = np.linalg.svd(A, full_matrices=False)
# U: (4, 3), S: (3,), Vt: (3, 3)
# 验证重构
A_reconstructed = U @ np.diag(S) @ Vt
print(np.allclose(A, A_reconstructed)) # True
# LLM 应用:低秩近似与降维
# 嵌入矩阵降维
embeddings = np.random.randn(50000, 768) # 词表大小 50000,维度 768
U, S, Vt = np.linalg.svd(embeddings, full_matrices=False)
# 保留前 k 个奇异值,实现降维
k = 128
embeddings_low_rank = U[:, :k] @ np.diag(S[:k]) # (50000, 128)
print(f"原始大小: {embeddings.nbytes / 1e6:.1f} MB")
print(f"降维后: {embeddings_low_rank.nbytes / 1e6:.1f} MB")
# 计算有效秩(奇异值衰减分析)
total_energy = np.sum(S ** 2)
cumulative_energy = np.cumsum(S ** 2) / total_energy
effective_rank = np.searchsorted(cumulative_energy, 0.95) + 1 # 保留95%能量
print(f"有效秩(95%能量): {effective_rank}")
3.5.2 特征值分解
# 对称矩阵的特征值分解
A = np.random.randn(3, 3)
A_sym = A @ A.T # 构造对称矩阵
eigenvalues, eigenvectors = np.linalg.eigh(A_sym) # 对称矩阵推荐用 eigh
# eigenvalues: (3,) — 特征值(升序排列)
# eigenvectors: (3, 3) — 每列是对应的特征向量
# 验证:A @ v = λ * v
for i in range(3):
v = eigenvectors[:, i]
lambda_i = eigenvalues[i]
print(np.allclose(A_sym @ v, lambda_i * v)) # True
# 非对称矩阵用 eig
eigenvalues, eigenvectors = np.linalg.eig(A)
# 注意:特征值可能是复数
# LLM 应用:协方差矩阵分析(PCA)
data = np.random.randn(1000, 50) # 1000 个样本,50 维
cov_matrix = np.cov(data.T) # (50, 50) 协方差矩阵
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
# 按特征值降序排列
idx = np.argsort(eigenvalues)[::-1]
eigenvalues = eigenvalues[idx]
eigenvectors = eigenvectors[:, idx]
# 投影到前 k 个主成分
k = 10
data_pca = data @ eigenvectors[:, :k] # (1000, 10)
3.5.3 解线性方程组
# 求解 Ax = b
A = np.array([[3, 1], [1, 2]], dtype=np.float64)
b = np.array([9, 8], dtype=np.float64)
x = np.linalg.solve(A, b) # 解向量
print(np.allclose(A @ x, b)) # True
# 最小二乘解(超定方程组)
A = np.random.randn(10, 3) # 10个方程,3个未知数
b = np.random.randn(10)
x, residuals, rank, sv = np.linalg.lstsq(A, b, rcond=None)
# x: 最小二乘解
# residuals: 残差平方和
# rank: 矩阵的秩
# sv: 奇异值
3.5.4 范数
A = np.random.randn(3, 4)
# 向量范数
v = np.array([3.0, 4.0])
np.linalg.norm(v, ord=1) # L1 范数: 7.0
np.linalg.norm(v, ord=2) # L2 范数: 5.0
np.linalg.norm(v, ord=np.inf) # 无穷范数: 4.0
# 矩阵范数
np.linalg.norm(A, ord='fro') # Frobenius 范数:所有元素平方和的平方根
np.linalg.norm(A, ord=2) # 谱范数(最大奇异值)
np.linalg.norm(A, ord='nuc') # 核范数(奇异值之和)
# LLM 应用:梯度裁剪中计算梯度范数
gradients = [np.random.randn(100, 200), np.random.randn(200, 50)]
total_norm = np.sqrt(sum(np.linalg.norm(g) ** 2 for g in gradients))
max_norm = 1.0
if total_norm > max_norm:
scale = max_norm / total_norm
gradients = [g * scale for g in gradients]
3.6 FFT:快速傅里叶变换
# 一维 FFT
signal = np.sin(2 * np.pi * 5 * np.linspace(0, 1, 128)) # 5Hz 正弦波
fft_result = np.fft.fft(signal) # 复数频谱
frequencies = np.fft.fftfreq(128, d=1/128) # 频率轴
magnitudes = np.abs(fft_result) # 幅度谱
phases = np.angle(fft_result) # 相位谱
# 逆 FFT
reconstructed = np.fft.ifft(fft_result)
print(np.allclose(signal, reconstructed.real)) # True
# 二维 FFT(可用于图像处理、注意力模式分析)
image = np.random.randn(64, 64)
fft2d = np.fft.fft2(image)
fft2d_shifted = np.fft.fftshift(fft2d) # 将零频移到中心
# LLM 应用:用 FFT 加速循环矩阵乘法(某些高效注意力机制)
# 循环矩阵乘法可通过 FFT 在 O(n log n) 实现,而非 O(n²)
def circulant_matmul(c, v):
"""利用 FFT 计算循环矩阵与向量的乘法"""
n = len(v)
fft_c = np.fft.fft(c, n)
fft_v = np.fft.fft(v, n)
return np.fft.ifft(fft_c * fft_v).real
# 验证
c = np.array([1, 2, 3, 4])
v = np.array([5, 6, 7, 8])
from scipy.linalg import circulant
C = circulant(c)
print(np.allclose(circulant_matmul(c, v), C @ v)) # True
4. 数学原理
4.1 SVD 分解:A = UΣV^T
奇异值分解(Singular Value Decomposition)是线性代数中最重要的矩阵分解之一,任意实矩阵 A(m×n)都可以分解为:
\[A = U \Sigma V^T\]其中:
- U:m×m 正交矩阵,称为左奇异向量矩阵,U^T U = I
- Σ:m×n 对角矩阵,对角线上的元素 σ₁ ≥ σ₂ ≥ … ≥ σᵣ > 0 称为奇异值
- V^T:n×n 正交矩阵,称为右奇异向量矩阵,V^T V = I
- r = rank(A),矩阵的秩
关键性质:
低秩近似:保留前 k 个最大奇异值,得到 A 的最优 rank-k 近似(Eckart-Young 定理): \(A_k = U_k \Sigma_k V_k^T = \sum_{i=1}^{k} \sigma_i u_i v_i^T\) 这是在 Frobenius 范数和谱范数下的最优低秩近似。
与特征值的关系:A^T A 的特征值 = σᵢ²,即奇异值的平方。
条件数:cond(A) = σ_max / σ_min,条件数越大矩阵越病态。
LLM 中的应用:
- LoRA 的低秩分解本质上是 SVD 思想的应用
- 嵌入矩阵压缩
- 模型权重分析(有效秩衡量模型冗余度)
4.2 矩阵乘法结合律
矩阵乘法满足结合律:(AB)C = A(BC),这在 LLM 计算中有重要优化意义:
计算量分析:
对于 A: (m, p), B: (p, q), C: (q, n):
- (AB)C 的乘法次数:mpq + mqn
- A(BC) 的乘法次数:pqn + mpn
选择不同的结合顺序可以显著减少计算量:
# 示例:注意力计算中的优化
# Q: (batch, heads, seq_len, head_dim)
# K^T: (batch, heads, head_dim, seq_len)
# V: (batch, heads, seq_len, head_dim)
# 标准顺序:先计算 QK^T 再乘 V
# Q @ K^T: (batch, heads, seq_len, seq_len) — O(seq_len² × head_dim)
# (QK^T) @ V: (batch, heads, seq_len, head_dim) — O(seq_len² × head_dim)
# 总计:O(seq_len² × head_dim)
# Flash Attention 利用结合律优化:
# 通过分块计算,避免显式存储 (seq_len, seq_len) 的注意力矩阵
# 从而将显存从 O(seq_len²) 降为 O(seq_len)
4.3 傅里叶变换公式
离散傅里叶变换(DFT)将时域信号转换为频域表示:
\[X[k] = \sum_{n=0}^{N-1} x[n] \cdot e^{-i 2\pi kn / N}, \quad k = 0, 1, ..., N-1\]逆变换(IDFT):
\[x[n] = \frac{1}{N} \sum_{k=0}^{N-1} X[k] \cdot e^{i 2\pi kn / N}, \quad n = 0, 1, ..., N-1\]FFT(快速傅里叶变换) 是 DFT 的高效算法,利用分治策略将 O(N²) 的计算量降为 O(N log N):
- 将 N 点 DFT 分解为两个 N/2 点 DFT(奇偶分组)
- 递归分解直到基情形
- 利用旋转因子 e^{-i2πk/N} 的对称性减少重复计算
LLM 中的应用:
- 循环/卷积注意力(如 Linear Attention)可用 FFT 加速
- 位置编码的频域分析
- 信号处理式的特征提取
5. 在 LLM 开发中的典型使用场景
5.1 文本数据预处理
import numpy as np
# Token ID 序列的 Padding 和 Truncation
def pad_sequences(sequences, max_len=None, pad_value=0):
"""将变长序列填充到相同长度"""
if max_len is None:
max_len = max(len(seq) for seq in sequences)
batch_size = len(sequences)
padded = np.full((batch_size, max_len), pad_value, dtype=np.int64)
attention_mask = np.zeros((batch_size, max_len), dtype=np.bool_)
for i, seq in enumerate(sequences):
length = min(len(seq), max_len)
padded[i, :length] = seq[:length]
attention_mask[i, :length] = True
return padded, attention_mask
# 使用示例
sequences = [
[101, 2023, 2003, 1037, 3231, 102],
[101, 2023, 2003, 102],
[101, 1045, 2066, 18435, 102],
]
padded, mask = pad_sequences(sequences, max_len=8)
print("Token IDs:\n", padded)
print("Attention Mask:\n", mask.astype(int))
# Attention Mask:
# [[1 1 1 1 1 1 0 0]
# [1 1 1 1 0 0 0 0]
# [1 1 1 1 1 0 0 0]]
5.2 注意力机制的 NumPy 实现
def scaled_dot_product_attention(Q, K, V, mask=None):
"""
缩放点积注意力的 NumPy 实现
参数:
Q: (batch, heads, seq_len, head_dim) — 查询矩阵
K: (batch, heads, seq_len, head_dim) — 键矩阵
V: (batch, heads, seq_len, head_dim) — 值矩阵
mask: (batch, heads, seq_len, seq_len) 或 (1, 1, seq_len, seq_len) — 注意力掩码
返回:
output: (batch, heads, seq_len, head_dim) — 注意力输出
attn_weights: (batch, heads, seq_len, seq_len) — 注意力权重
"""
head_dim = Q.shape[-1]
scale = np.sqrt(head_dim)
# QK^T / sqrt(d_k)
scores = Q @ np.transpose(K, (0, 1, 3, 2)) / scale # (batch, heads, seq_len, seq_len)
# 应用掩码(如因果掩码)
if mask is not None:
scores = np.where(mask == 0, -1e9, scores)
# Softmax
scores_max = np.max(scores, axis=-1, keepdims=True)
exp_scores = np.exp(scores - scores_max)
attn_weights = exp_scores / np.sum(exp_scores, axis=-1, keepdims=True)
# 加权求和
output = attn_weights @ V # (batch, heads, seq_len, head_dim)
return output, attn_weights
# 测试
batch, heads, seq_len, head_dim = 2, 4, 8, 64
Q = np.random.randn(batch, heads, seq_len, head_dim)
K = np.random.randn(batch, heads, seq_len, head_dim)
V = np.random.randn(batch, heads, seq_len, head_dim)
# 因果掩码
causal_mask = np.tril(np.ones((seq_len, seq_len)))[np.newaxis, np.newaxis, :, :]
output, weights = scaled_dot_product_attention(Q, K, V, mask=causal_mask)
print(f"Output shape: {output.shape}") # (2, 4, 8, 64)
print(f"Weights shape: {weights.shape}") # (2, 4, 8, 8)
5.3 嵌入矩阵操作
# 词嵌入查找与操作
vocab_size = 50000
hidden_dim = 768
embedding_matrix = np.random.randn(vocab_size, hidden_dim) * 0.02 # 小随机初始化
# 查找 Token 嵌入
token_ids = np.array([101, 2023, 2003, 102]) # "It is a [SEP]"
embeddings = embedding_matrix[token_ids] # (4, 768) — 高级索引
# 余弦相似度计算
def cosine_similarity(a, b):
"""计算两个向量的余弦相似度"""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
# 找到与目标词最相似的词
target_embedding = embedding_matrix[2023] # "is" 的嵌入
similarities = embedding_matrix @ target_embedding / (
np.linalg.norm(embedding_matrix, axis=1) * np.linalg.norm(target_embedding)
)
top_k = 5
top_indices = np.argsort(similarities)[-top_k:][::-1]
print(f"与 'is' 最相似的 {top_k} 个词的索引: {top_indices}")
5.4 模型权重分析与压缩
# 分析模型权重的奇异值分布
weight_matrix = np.random.randn(768, 3072) # FFN 层权重
U, S, Vt = np.linalg.svd(weight_matrix, full_matrices=False)
# 计算信息保留率
total_energy = np.sum(S ** 2)
cumulative_ratio = np.cumsum(S ** 2) / total_energy
# 95% 能量保留需要多少奇异值
k_95 = np.searchsorted(cumulative_ratio, 0.95) + 1
print(f"保留 95% 能量需要 {k_95}/{len(S)} 个奇异值")
print(f"压缩比: {768 * 3072 / (768 * k_95 + k_95 + 3072 * k_95):.2f}x")
# 基于核范数的正则化(模型压缩)
def nuclear_norm_reg(weights, lambda_=0.01):
"""核范数正则化:鼓励低秩结构"""
_, S, _ = np.linalg.svd(weights, full_matrices=False)
return lambda_ * np.sum(S)
# 计算模型的有效秩
def effective_rank(weights, threshold=0.99):
"""计算有效秩:保留 threshold 比例能量所需的奇异值数"""
_, S, _ = np.linalg.svd(weights, full_matrices=False)
total = np.sum(S ** 2)
cumulative = np.cumsum(S ** 2) / total
return np.searchsorted(cumulative, threshold) + 1
print(f"FFN 权重有效秩: {effective_rank(weight_matrix)}")
6. 代码原理 / 架构原理
6.1 ndarray 内存布局
NumPy 数组在内存中是连续存储的,通过 strides(步长)机制实现多维索引:
一个 shape=(3, 4) 的 float64 数组:
内存: [a₀₀, a₀₁, a₀₂, a₀₃, a₁₀, a₁₁, a₁₂, a₁₃, a₂₀, a₂₁, a₂₂, a₂₃]
↑ ↑
offset=0 offset=88
strides = (32, 8) 表示:
- 沿 axis=0 移动一步 = 跳过 32 字节(4 个 float64)
- 沿 axis=1 移动一步 = 跳过 8 字节(1 个 float64)
a[i, j] 的内存地址 = base + i * strides[0] + j * strides[1]
视图(View)与拷贝(Copy):
- 切片操作返回视图(共享内存):
a[1:3]、a.T、a.reshape() - 高级索引返回拷贝:
a[[0, 2]]、a[a > 0] np.shares_memory(a, b)可检测两个数组是否共享内存
6.2 广播的实现原理
广播不实际复制数据,而是通过修改 strides 实现逻辑上的扩展:
a: shape=(3, 1), strides=(24, 0) ← 注意 strides[1]=0,表示沿该维度"不移动"
b: shape=(1, 4), strides=(0, 8) ← strides[0]=0
a + b 的结果:
- shape=(3, 4)
- a 的元素沿 axis=1 逻辑复制(stride=0)
- b 的元素沿 axis=0 逻辑复制(stride=0)
- 实际不复制任何数据,只在计算时"假装"数据被复制
6.3 NumPy 与 PyTorch 的互操作
NumPy ndarray ←→ PyTorch Tensor
↓ ↑
.numpy() torch.from_numpy()
↓ ↑
共享内存条件:CPU 上的 Tensor + 不需要梯度
import torch
# NumPy → PyTorch(共享内存)
arr = np.array([1, 2, 3])
tensor = torch.from_numpy(arr) # 共享内存
arr[0] = 999
print(tensor[0]) # tensor(999) — 修改同步
# PyTorch → NumPy(共享内存)
tensor = torch.tensor([4, 5, 6])
arr = tensor.numpy() # 共享内存
tensor[0] = 999
print(arr[0]) # 999 — 修改同步
# GPU 张量需要先转到 CPU
gpu_tensor = torch.randn(2, 3).cuda()
arr = gpu_tensor.cpu().numpy() # 必须先 .cpu(),此步产生拷贝
# 带梯度的张量需要先 detach
x = torch.tensor([1.0, 2.0], requires_grad=True)
arr = x.detach().numpy() # 必须 .detach()
7. 常见注意事项和最佳实践
7.1 性能优化
# 1. 避免循环,使用向量化操作
# 差:Python 循环
result = np.empty(1000000)
for i in range(1000000):
result[i] = np.sin(i) * np.cos(i)
# 好:向量化
x = np.arange(1000000)
result = np.sin(x) * np.cos(x) # 快 100 倍以上
# 2. 预分配数组
# 差:逐步扩展
result = np.array([])
for i in range(1000):
result = np.append(result, i) # 每次都重新分配内存
# 好:预分配
result = np.empty(1000)
for i in range(1000):
result[i] = i
# 更好:直接构造
result = np.arange(1000)
# 3. 使用原地操作减少内存分配
a = np.random.randn(1000, 1000)
np.add(a, 1, out=a) # 原地加法,不创建新数组
np.multiply(a, 2, out=a) # 原地乘法
# 4. 选择合适的数据类型
# float64 → float32 可节省一半内存
data = np.random.randn(10000, 10000).astype(np.float32) # 400MB
# vs float64: 800MB
7.2 数值稳定性
# 1. 数值稳定的 Softmax
def stable_softmax(x, axis=-1):
x_max = np.max(x, axis=axis, keepdims=True)
exp_x = np.exp(x - x_max)
return exp_x / np.sum(exp_x, axis=axis, keepdims=True)
# 2. 数值稳定的 LogSumExp
def logsumexp(x, axis=-1):
x_max = np.max(x, axis=axis, keepdims=True)
return x_max.squeeze(axis) + np.log(np.sum(np.exp(x - x_max), axis=axis))
# 3. 避免大数相减(精度损失)
# 差
a, b = 1e10, 1e10 + 1
diff = a - b # 可能丢失精度
# 好:使用 np.subtract 或重排计算
# 4. 使用 np.finfo 检查精度
print(np.finfo(np.float32).eps) # 1.19e-07 — float32 最小精度
print(np.finfo(np.float16).eps) # 9.77e-04 — float16 最小精度
print(np.finfo(np.float16).max) # 65500.0 — float16 最大值
7.3 常见陷阱
# 1. 视图 vs 拷贝混淆
a = np.array([1, 2, 3, 4, 5])
b = a[1:3] # 视图(共享内存)
b[0] = 999
print(a) # [1, 999, 3, 4, 5] — a 也被修改了!
c = a[1:3].copy() # 显式拷贝,不影响原数组
# 2. 整数除法与浮点除法
# Python 3 中 / 是浮点除法,NumPy 中也如此
# 但 dtype=int 的数组运算仍为整数
a = np.array([1, 2, 3])
print(a / 2) # [0.5, 1. , 1.5] — 浮点
print(a // 2) # [0, 1, 1] — 整数除法
# 3. 布尔索引返回拷贝
a = np.array([1, 2, 3, 4, 5])
a[a > 2] = 0 # 这可以工作
# 但:
b = a[a > 0]
b[0] = 999 # 不影响 a,因为布尔索引返回拷贝
# 4. 广播导致的意外结果
a = np.array([[1, 2, 3]]) # (1, 3)
b = np.array([[1], [2]]) # (2, 1)
c = a + b # (2, 3) — 可能不是期望的结果
# 理解广播规则很重要
# 5. NaN 传播
a = np.array([1, 2, np.nan, 4])
print(np.mean(a)) # nan — NaN 会传播
print(np.nanmean(a)) # 2.333... — 忽略 NaN 的均值
print(np.nansum(a)) # 7.0 — 忽略 NaN 的求和
7.4 与 PyTorch 协同工作的最佳实践
# 1. 数据预处理用 NumPy,训练用 PyTorch
# NumPy 适合:文本处理、特征工程、统计分析
# PyTorch 适合:模型训练、GPU 计算、自动求导
# 2. 注意内存共享
arr = np.random.randn(100, 768)
tensor = torch.from_numpy(arr)
# arr 和 tensor 共享内存,修改一个会影响另一个
# 如果需要独立副本:
tensor = torch.from_numpy(arr.copy())
# 或
tensor = torch.tensor(arr) # 总是创建拷贝
# 3. 批量转换
# 多个 NumPy 数组 → PyTorch DataLoader
from torch.utils.data import TensorDataset, DataLoader
features = np.random.randn(1000, 768).astype(np.float32)
labels = np.random.randint(0, 10, size=1000)
dataset = TensorDataset(
torch.from_numpy(features),
torch.from_numpy(labels)
)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
