{% quot 本文部分内容整理自网络和 ChatGPT,侵删/仅供参考。 %}
Shuffle 函数
import random
def shuffle(nums):
for i in range(len(nums) - 1, 0, -1):
j = random.randint(0, i)
nums[i], nums[j] = nums[j], nums[i]
return numsSoftmax 函数
处理数值稳定性。
import numpy as np
def softmax(logits: np.ndarray) -> np.ndarray:
# 减去最大值以提高数值稳定性,对结果相当于分子分母同时约简了exp(max)
max_logits = np.max(logits)
exp_logits = np.exp(logits - max_logits)
return exp_logits / np.sum(exp_logits)
# 示例用法
logits = np.array([2.0, 1.0, 0.1])
softmax_values = softmax(logits)
print(softmax_values) # 输出 softmax 概率
def softmax(input_tensor, dim):
# 数值稳定性处理:减去最大值防止溢出
max_vals = torch.max(input_tensor, dim=dim, keepdim=True).values
exp_x = torch.exp(input_tensor - max_vals) # 减去最大值后做指数运算
sum_exp = exp_x.sum(dim=dim, keepdim=True) # 沿指定维度求和,保持维度
return exp_x / sum_exp # 广播机制自动对齐维度更简洁的处理数值稳定性方案实现:
import numpy as np
def log_softmax(scores: list) -> np.ndarray:
# Subtract the maximum value for numerical stability
scores = scores - np.max(scores)
return scores - np.log(np.sum(np.exp(scores)))MSE 损失
import numpy as np
def mean_squared_error(y_true: np.ndarray, y_pred: np.ndarray) -> float:
# 确保输入为 NumPy 数组
y_true = np.asarray(y_true)
y_pred = np.asarray(y_pred)
# 计算 MSE
mse = np.mean((y_true - y_pred) ** 2)
return mse
# 示例用法
y_true = [3, -0.5, 2, 7]
y_pred = [2.5, 0.0, 2, 8]
mse_value = mean_squared_error(y_true, y_pred)
print(f"均方误差: {mse_value}")另外,更完整的场景参见 具有反向传播的单神经元
交叉熵损失
# powered by ChatGPT4o
import numpy as np
def binary_cross_entropy(y_true: np.ndarray, y_pred: np.ndarray) -> float:
# 确保预测值在有效范围内,避免log(0)
y_pred = np.clip(y_pred, 1e-15, 1 - 1e-15)
# 计算交叉熵损失
loss = -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
return loss
# 示例用法
y_true = np.array([1, 0, 1])
y_pred = np.array([0.9, 0.1, 0.8])
loss_value = binary_cross_entropy(y_true, y_pred)
print(f"二分类交叉熵损失: {loss_value}")
# 二分类不用MSE:CE更符合概率分布的实质(熵)、以及sigmod或softmax是对类的概率进行估计,所以在计算损失时天然适应概率意义上的相近,而非欧拉距离上的相近。同时MSE无差别地关注全部类别上预测概率和真实概率的差,交叉熵关注的是正确类别的预测概率,所以相当于MSE引入了一个先验即所有类别的损失贡献是等价的。
def categorical_cross_entropy(y_true, y_pred):
"""
计算多分类交叉熵
:param y_true: 真实标签,独热编码形式的 numpy 数组,形状为 (样本数, 类别数)
:param y_pred: 模型预测的概率分布,形状为 (样本数, 类别数)
:return: 每个样本的交叉熵损失值
"""
# 避免对数计算中的数值不稳定,加上一个小的 epsilon 值
epsilon = 1e-12
y_pred = np.clip(y_pred, epsilon, 1. - epsilon)
# 按公式计算交叉熵
cross_entropy = -np.sum(y_true * np.log(y_pred), axis=1)
return cross_entropy
# 示例数据
y_true = np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]]) # 独热编码标签
y_pred = np.array([[0.7, 0.2, 0.1], [0.1, 0.8, 0.1], [0.1, 0.3, 0.6]]) # 模型预测的概率
# 计算交叉熵损失
loss = categorical_cross_entropy(y_true, y_pred)
print("平均交叉熵损失:", np.mean(loss))numpy 手撕二分类交叉熵全流程实现。
import numpy as np
def sigmoid(z):
return 1 / (1 + np.exp(-z))
def sigmoid_derivative(z):
return sigmoid(z) * (1 - sigmoid(z))
class SimpleNN:
def __init__(self, input_size, hidden_size, output_size):
self.weights1 = np.random.randn(input_size, hidden_size)
self.bias1 = np.zeros((1, hidden_size))
self.weights2 = np.random.randn(hidden_size, output_size)
self.bias2 = np.zeros((1, hidden_size))
def forward(self, X):
self.Z1 = np.dot(X, self.weights1) + self.bias1
self.A1 = sigmoid(self.Z1)
self.Z2 = np.dot(self.A1, self.weights2) + self.bias2
self.A2 = sigmoid(self.Z2)
return self.A2
def backward(self, X, y, lr):
m = X.shape[0]
# 输出层
dZ2 = self.A2 - y
dW2 = np.dot(self.A1.T, dZ2) / m
db2 = np.sum(dZ2, axis=0, keepdims=True) / m
# 隐藏层
dA1 = np.dot(dZ2, self.weights2.T)
dZ1 = dA1 * sigmoid_derivative(self.Z1)
dW1 = np.dot(X.T, dZ1) / m
db1 = np.sum(dZ1, axis=0, keepdims=True) / m
# 更新权重
self.weights1 -= lr * dW1
self.bias1 -= lr * db1
self.weights2 -= lr * dW2
self.bias2 -= lr * db2
def compute_loss(self, X, y):
# 计算交叉熵损失
A2 = self.forward(X)
m = X.shape[0]
loss = -np.mean(y * np.log(A2) + (1 - y) * np.log(1 - A2))
return loss
def train(self, X_train, y_train, epochs, lr):
for epoch in range(epochs):
self.forward(X_train)
self.backward(X_train, y_train, lr)
if epoch % 100 == 0:
loss = self.compute_loss(X_train, y_train)
print(f"Epoch:{epoch}/{epochs}, Loss: {loss:.4f}")
def predict(self, X):
A2 = self.forward(X)
return (A2 > 0.5).astype(int)交叉熵和 softmax 的组合求导非常优雅,仅是预测概率和真实标签之间的差异,优化方向符合直觉,并且计算极其简洁。 另外,困惑度 的计算也是和交叉熵相关的。
在语言模型中,为确保一致,通常使用自然对数 为底,困惑度 PPL 的原始定义是 为底,整体不影响趋势分析。 困惑度为 1 表示完全确定准确,最优状态,最差状态是无穷大,均匀分布时等于词表大小。 总的来说,困惑度更适合在给定数据集的情况下评定语言模型的好坏,尤其适合风格迁移任务,例如让它尽可能输出格式符合百科的用语。而使用困惑度评价不同文本质量存在争议,因为常见但无聊的句子往往能够得到更低的困惑度,所以仅在没有参考文本的情况下才能有限选用。
max_length = model.config.n_positions
stride = 512
seq_len = encodings.input_ids.size(1)
nlls = []
prev_end_loc = 0
for begin_loc in tqdm(range(0, seq_len, stride)):
end_loc = min(begin_loc + max_length, seq_len)
trg_len = end_loc - prev_end_loc # may be different from stride on last loop
input_ids = encodings.input_ids[:, begin_loc:end_loc].to(device)
target_ids = input_ids.clone()
target_ids[:, :-trg_len] = -100
with torch.no_grad():
outputs = model(input_ids, labels=target_ids)
# loss is calculated using CrossEntropyLoss which averages over valid labels
# N.B. the model only calculates loss over trg_len - 1 labels, because it internally shifts the labels
# to the left by 1.
neg_log_likelihood = outputs.loss
nlls.append(neg_log_likelihood)
prev_end_loc = end_loc
if end_loc == seq_len:
break
ppl = torch.exp(torch.stack(nlls).mean())多头注意力机制 (MHA)
import torch
import torch.nn as nn
class MultiHeadAttention(nn.Module):
def __init__(self, num_heads, d_model, batch_size, seq_len):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.head_dim = d_model // num_heads
self.q_proj = nn.Linear(d_model, d_model)
self.k_proj = nn.Linear(d_model, d_model)
self.v_proj = nn.Linear(d_model, d_model)
self.o_proj = nn.Linear(d_model, d_model)
def split(self, x, batch_size):
# 维度切分,(batch_size, seq_len, d_model) -> (batch_size, seq_len, num_heads, head_dim)
x = x.view(batch_size, -1, self.num_heads, self.head_dim)
# 重排维度,便于后续计算方便和并行多头
return x.permute(0, 2, 1, 3)
def forward(self, hidden_state, attention_mask=None):
batch_size = hidden_state.size()[0]
query = self.q_proj(hidden_state)
key = self.k_proj(hidden_state)
value = self.v_proj(hidden_state)
query, key, value = self.split(query, batch_size), self.split(key, batch_size), self.split(value, batch_size)
attention_scores = torch.matmul(query, key.transpose(-1, -2) / torch.sqrt(torch.tensor(self.head_dim)))
if attention_mask is not None:
attention_scores = attention_scores.masked_fill(attention_mask == 0, float('-inf'))
# attention_scores、attention_probs 均为 (batch_size, num_heads, seq_len, seq_len)
attention_probs = torch.softmax(attention_scores, dim=-1)
output = torch.matmul(attention_probs, value)
# output: (batch_size, seq_len, d_model)
# contiguous: 确保张量内存中连续,确保view操作正常进行,一般均连续,permute和transpose操作可能改变
output = output.permute(0, 2, 1, 3).contiguous().view(batch_size, -1, self.head_dim * self.num_heads)
output = self.o_proj(output)
return output查询注意力机制 (MQA)
# powered by ChatGPT4o
import torch
import torch.nn as nn
import math
class MultiQueryAttention(nn.Module):
def __init__(self, num_heads, d_model):
super(MultiQueryAttention, self).__init__()
self.num_heads = num_heads
self.head_dim = d_model // num_heads
assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
# Linear layers for Query, Key, Value
self.q_proj = nn.Linear(d_model, d_model)
self.k_proj = nn.Linear(d_model, self.head_dim) # Shared Key for all heads
self.v_proj = nn.Linear(d_model, self.head_dim) # Shared Value for all heads
# Output linear layer
self.o_proj = nn.Linear(d_model, d_model)
def forward(self, hidden_state, attention_mask=None):
batch_size, seq_len, d_model = hidden_state.size()
# Linear projections to create Query, Key, Value
query = self.q_proj(hidden_state) # Shape: (batch_size, seq_len, d_model)
key = self.k_proj(hidden_state) # Shape: (batch_size, seq_len, head_dim)
value = self.v_proj(hidden_state) # Shape: (batch_size, seq_len, head_dim)
# Reshape Query for multi-head attention
query = query.view(batch_size, seq_len, self.num_heads, self.head_dim).permute(0, 2, 1, 3) # (batch_size, num_heads, seq_len, head_dim)
# Reshape Key and Value for broadcasting
key = key.unsqueeze(1).expand(-1, self.num_heads, -1, -1) # (batch_size, num_heads, seq_len, head_dim)
value = value.unsqueeze(1).expand(-1, self.num_heads, -1, -1) # (batch_size, num_heads, seq_len, head_dim)
# Scaled dot-product attention
scale = 1 / math.sqrt(self.head_dim)
attention_scores = torch.matmul(query, key.transpose(-1, -2)) * scale # (batch_size, num_heads, seq_len, seq_len)
if attention_mask is not None:
attention_scores = attention_scores.masked_fill(attention_mask == 0, float('-inf'))
attention_probs = torch.softmax(attention_scores, dim=-1) # (batch_size, num_heads, seq_len, seq_len)
# Weighted sum of Value
context = torch.matmul(attention_probs, value) # (batch_size, num_heads, seq_len, head_dim)
# Concatenate heads and project output
context = context.permute(0, 2, 1, 3).contiguous().view(batch_size, seq_len, -1) # (batch_size, seq_len, d_model)
output = self.o_proj(context) # (batch_size, seq_len, d_model)
return output
分组查询注意力机制 (GQA)
import torch
import torch.nn as nn
class GroupedQueryAttention(nn.Module):
def __init__(self, num_heads, d_model, num_groups):
super(GroupedQueryAttention, self).__init__()
self.num_heads = num_heads
self.head_dim = d_model // num_heads
self.num_groups = num_groups
self.each_group_count = num_heads // num_groups
self.group_dim = self.head_dim * self.num_groups
self.q_proj = nn.Linear(d_model, d_model)
self.k_proj = nn.Linear(d_model, self.group_dim)
self.v_proj = nn.Linear(d_model, self.group_dim)
self.o_proj = nn.Linear(d_model, d_model)
self.scaler = 1 / math.sqrt(self.head_dim)
def kv_expand(self, x, batch_size, seq_len):
x = x.view(batch_size, seq_len, self.num_groups, self.head_dim)
x = x.unsqueeze(3).expand(-1, -1, -1, self.each_group_count, -1)
x = x.reshape(batch_size, seq_len, self.num_heads, self.head_dim)
x = x.permute(0, 2, 1, 3)
return x
def forward(self, hidden_state, attention_mask=None):
batch_size, seq_len = hidden_state.size()[0], hidden_state.size()[1]
query = self.q_proj(hidden_state)
key = self.k_proj(hidden_state)
value = self.v_proj(hidden_state)
# 主要差异在下面两行
query = query.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
key, value = self.kv_expand(key, batch_size, seq_len), self.kv_expand(value, batch_size, seq_len)
attention_scores = torch.matmul(query, key.transpose(-1, -2)) / self.scaler
if attention_mask is not None:
attention_scores = attention_scores.masked_fill(attention_mask == 0, float('-inf'))
attention_probs = torch.softmax(attention_scores, dim=-1)
output = torch.matmul(attention_probs, value)
output = output.permute(0, 2, 1, 3).contiguous().view(batch_size, -1, self.head_dim * self.num_heads)
output = self.o_proj(output)
return output交叉注意力机制
该函数接受编码器的输出和查询 (query)作为输入,并返回注意力加权的编码器输出。
# powered by ChatGPT4o
import torch
import torch.nn as nn
class CrossAttention(nn.Module):
def __init__(self, num_heads, d_model):
super(CrossAttention, self).__init__()
self.num_heads = num_heads
self.head_dim = d_model // num_heads
assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
self.q_proj = nn.Linear(d_model, d_model) # 用于查询
self.k_proj = nn.Linear(d_model, d_model) # 用于键
self.v_proj = nn.Linear(d_model, d_model) # 用于值
self.o_proj = nn.Linear(d_model, d_model) # 最终输出
def split(self, x, batch_size):
# 维度切分,(batch_size, seq_len, d_model) -> (batch_size, seq_len, num_heads, head_dim)
x = x.view(batch_size, -1, self.num_heads, self.head_dim)
# 重排维度,便于多头并行计算
return x.permute(0, 2, 1, 3)
def forward(self, query_input, key_value_input, attention_mask=None):
"""
query_input: 解码器的隐藏状态 (batch_size, query_len, d_model)
key_value_input: 编码器的输出 (batch_size, key_value_len, d_model)
attention_mask: 可选的注意力掩码 (batch_size, 1, 1, key_value_len)
"""
batch_size = query_input.size(0)
# 生成 Query, Key, Value
query = self.q_proj(query_input) # (batch_size, query_len, d_model)
key = self.k_proj(key_value_input) # (batch_size, key_value_len, d_model)
value = self.v_proj(key_value_input) # (batch_size, key_value_len, d_model)
# 拆分多头
query = self.split(query, batch_size) # (batch_size, num_heads, query_len, head_dim)
key = self.split(key, batch_size) # (batch_size, num_heads, key_value_len, head_dim)
value = self.split(value, batch_size) # (batch_size, num_heads, key_value_len, head_dim)
# 计算注意力得分 (batch_size, num_heads, query_len, key_value_len)
attention_scores = torch.matmul(query, key.transpose(-1, -2)) / torch.sqrt(torch.tensor(self.head_dim, dtype=torch.float32))
# 如果有注意力掩码,应用掩码
if attention_mask is not None:
attention_scores = attention_scores.masked_fill(attention_mask == 0, float('-inf'))
# 归一化注意力权重
attention_probs = torch.softmax(attention_scores, dim=-1) # (batch_size, num_heads, query_len, key_value_len)
# 计算注意力输出 (batch_size, num_heads, query_len, head_dim)
output = torch.matmul(attention_probs, value)
# 合并多头 (batch_size, query_len, d_model)
output = output.permute(0, 2, 1, 3).contiguous().view(batch_size, -1, self.head_dim * self.num_heads)
# 通过线性层投影回原始维度
output = self.o_proj(output)
return output
Layer Normalization
import torch
import torch.nn as nn
class LayerNorm(nn.Module):
def __init__(self, hidden_dim, eps=1e-6):
super(LayerNorm, self).__init__()
self.hidden_dim = hidden_dim
self.eps = eps
self.gamma = nn.Parameter(torch.ones(hidden_dim))
self.beta = nn.Parameter(torch.ones(hidden_dim))
def forward(self, x):
mean = x.mean(dim=-1, keepdim=True)
std = x.std(dim=-1, keepdim=True)
x_norm = (x - mean) / (std + self.eps)
out = self.gamma * x_norm + self.beta
return out
bsz, hidden_dim, seq_len = 8, 128, 20
x = torch.randn(bsz, seq_len, hidden_dim)
layer_norm = LayerNorm(hidden_dim)
out = layer_norm(x)RMS Normalization
简单来说,就是不需要计算均值,并且直接使用均方差来标准化,并且只有 缩放参数,没有 偏移参数。
import torch
import torch.nn as nn
class RMSNorm(nn.Module):
def __init__(self, hidden_dim, eps=1e-9):
super().__init()
self.hidden_dim = hidden_dim
self.eps = eps
self.gamma = nn.Parameter(torch.ones(hidden_dim))
def forward(self, x):
root_mean_sqrt = torch.sqrt(x.pow(2).mean(dim=-1, keepdim=True) + self.eps)
return self.gamma * x / root_mean_sqrt
bsz, hidden_dim, seq_len = 8, 128, 20
x = torch.randn(bsz, seq_len, hidden_dim)
rms_norm = RMSNorm(hidden_dim)
out = rms_norm(x)class RMSNorm(nn.Module): # 论文版本
def __init__(self, d, p=-1., eps=1e-8, bias=False):
"""
Root Mean Square Layer Normalization
:param d: model size
:param p: partial RMSNorm, valid value [0, 1], default -1.0 (disabled)
:param eps: epsilon value, default 1e-8
:param bias: whether use bias term for RMSNorm, disabled by
default because RMSNorm doesn't enforce re-centering invariance.
"""
super(RMSNorm, self).__init__()
self.eps = eps
self.d = d
self.p = p # 对应ρRMS Norm
self.bias = bias
self.scale = nn.Parameter(torch.ones(d))
self.register_parameter("scale", self.scale)
if self.bias:
self.offset = nn.Parameter(torch.zeros(d))
self.register_parameter("offset", self.offset)
def forward(self, x):
if self.p < 0. or self.p > 1.:
norm_x = x.norm(2, dim=-1, keepdim=True)
d_x = self.d
else:
partial_size = int(self.d * self.p)
partial_x, _ = torch.split(x, [partial_size, self.d - partial_size], dim=-1)
norm_x = partial_x.norm(2, dim=-1, keepdim=True)
d_x = partial_size
rms_x = norm_x * d_x ** (-1. / 2)
x_normed = x / (rms_x + self.eps)
if self.bias:
return self.scale * x_normed + self.offset
return self.scale * x_normed
class RMSNorm(torch.nn.Module): # baichuan版本
def __init__(self, hidden_size, epsilon=1e-6):
super().__init__()
self.weight = torch.nn.Parameter(torch.empty(hidden_size))
self.epsilon = epsilon
def forward(self, hidden_states):
variance = hidden_states.to(torch.float32).pow(2).mean(-1, keepdim=True)
hidden_states = hidden_states * torch.rsqrt(variance + self.epsilon) #torch.rsqrt是均方根倒数
# convert into half-precision
if self.weight.dtype in [torch.float16, torch.bfloat16]:
hidden_states = hidden_states.to(self.weight.dtype)
return self.weight * hidden_statesMLP
import torch
import torch.nn as nn
import torch.nn.functional as F
class MLP(nn.Module):
def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
super().__init__()
self.linear1 = nn.Linear(d_model, d_ff) # 扩展层
self.linear2 = nn.Linear(d_ff, d_model) # 收缩层
self.dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x shape: [batch_size, seq_len, d_model]
x = self.linear1(x)
x = F.relu(x)
x = self.dropout(x)
x = self.linear2(x)
return xTransformer Encoder
import torch
import torch.nn as nn
class TransformerEncoderLayer(nn.Module):
def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
super(TransformerEncoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(num_heads, embed_dim)
self.ln1 = nn.LayerNorm(embed_dim)
self.ln2 = nn.LayerNorm(embed_dim)
self.ffn = nn.Sequential(
nn.Linear(embed_dim, ff_dim),
nn.ReLU(),
nn.Linear(ff_dim, embed_dim),
nn.Dropout(dropout)
)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask=None):
attn_output = self.self_attn(x, mask)
# post norm
x = x + self.dropout(attn_output)
x = self.ln1(x)
ffn_output = self.ffn(x)
x = x + self.dropout(ffn_output)
x = self.ln2(x)
return x
class TransformerEncoder(nn.Module):
def __init__(self, num_layers, embed_dim, num_heads, ff_dim, dropout=0.1):
super(TransformerEncoder, self).__init__()
self.layers = nn.ModuleList([
TransformerEncoderLayer(embed_dim, num_heads, ff_dim, dropout)
for _ in range(num_layers)
])
def forward(self, x, mask=None):
for layer in self.layers:
x = layer(x, mask)
return xTransformer Decoder
# powered by ChatGPT4o
import torch
import torch.nn as nn
class TransformerDecoderLayer(nn.Module):
def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
super(TransformerDecoderLayer, self).__init__()
# 自注意力层
self.self_attn = MultiHeadAttention(num_heads, embed_dim)
# 交叉注意力层(编码器-解码器注意力)
self.cross_attn = CrossAttention(num_heads, embed_dim)
# 层归一化
self.ln1 = nn.LayerNorm(embed_dim)
self.ln2 = nn.LayerNorm(embed_dim)
self.ln3 = nn.LayerNorm(embed_dim)
# 前馈网络
self.ffn = nn.Sequential(
nn.Linear(embed_dim, ff_dim),
nn.ReLU(),
nn.Linear(ff_dim, embed_dim),
nn.Dropout(dropout)
)
# Dropout层
self.dropout = nn.Dropout(dropout)
def forward(self, x, memory, tgt_mask=None, memory_mask=None):
# 自注意力
attn_output = self.self_attn(x, tgt_mask)
x = x + self.dropout(attn_output) # 残差连接
x = self.ln1(x)
# 跨注意力(编码器-解码器注意力)
cross_attn_output = self.cross_attn(x, memory, memory_mask)
x = x + self.dropout(cross_attn_output) # 残差连接
x = self.ln2(x)
# 前馈网络
ffn_output = self.ffn(x)
x = x + self.dropout(ffn_output) # 残差连接
x = self.ln3(x)
return x
class TransformerDecoder(nn.Module):
def __init__(self, num_layers, embed_dim, num_heads, ff_dim, dropout=0.1):
super(TransformerDecoder, self).__init__()
# 多层解码器
self.layers = nn.ModuleList([
TransformerDecoderLayer(embed_dim, num_heads, ff_dim, dropout)
for _ in range(num_layers)
])
def forward(self, x, memory, tgt_mask=None, memory_mask=None):
# 依次经过每一层解码器
for layer in self.layers:
x = layer(x, memory, tgt_mask, memory_mask)
return x位置编码
# powered by ChatGPT4o
import torch
import math
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
"""
:param d_model: 模型的维度 (embedding 的大小)
:param max_len: 最大的序列长度
"""
super(PositionalEncoding, self).__init__()
# 初始化位置编码矩阵,形状为 (max_len, d_model)
pe = torch.zeros(max_len, d_model)
# 位置编号 i 从 0 到 max_len-1
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
# 计算每个位置的正弦和余弦编码
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
# 使用正弦和余弦函数填充编码矩阵
pe[:, 0::2] = torch.sin(position * div_term) # 偶数索引使用正弦
pe[:, 1::2] = torch.cos(position * div_term) # 奇数索引使用余弦
# 增加一个额外的维度以与词嵌入维度对齐
pe = pe.unsqueeze(0) # 形状变为 (1, max_len, d_model)
# 将位置编码注册为 buffer,这样它不会作为参数被优化
self.register_buffer('pe', pe)
def forward(self, x):
"""
:param x: 输入张量,形状为 (batch_size, seq_len, d_model)
:return: 加上位置编码后的张量
"""
seq_len = x.size(1)
# 对输入的序列加上位置编码
return x + self.pe[:, :seq_len]
if __name__ == "__main__":
batch_size = 2
seq_len = 10
d_model = 16
positional_encoding = PositionalEncoding(d_model=d_model, max_len=5000)
# 假设输入的 token embedding 张量,形状为 (batch_size, seq_len, d_model)
x = torch.zeros(batch_size, seq_len, d_model)
# 获取带位置编码的输出
output = positional_encoding(x)
print(output.shape) # 应该输出 (batch_size, seq_len, d_model)RoPE
import torch
import torch.nn as nn
class RotaryPositionalEmbedding(nn.Module):
def __init__(self, dim, **kwargs):
super().__init__(**kwargs)
self.dim = dim
self.rope_cache = None
def get_cache(self, seq_len, base=10000):
theta = 1.0 / (base ** (torch.arange(start=0, end=self.dim, step=2) / self.dim))
seq_idx = torch.arange(end=seq_len)
idx_theta = torch.outer(input=seq_idx, vec2=theta).float()
cache = torch.stack(tensors=[torch.cos(idx_theta), torch.sin(idx_theta)], dim=-1)
return cache
def forward(self, x):
seq_len = x.shape[1]
if self.rope_cache is None:
self.rope_cache = self.get_cache(seq_len)
rot_dim = self.dim
xshaped = x.reshape(-1, seq_len, rot_dim // 2, 2)
rope_cache = self.rope_cache.unsqueeze(dim=0)
x_out = torch.stack(tensors=[
xshaped[..., 0] * rope_cache[..., 0] - xshaped[..., 1] * rope_cache[..., 1],
xshaped[..., 1] * rope_cache[..., 0] + xshaped[..., 0] * rope_cache[..., 1],
], dim=-1,)
x_out = x_out.flatten(start_dim=2)
return x_outLoRA
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
class LoRALinear(nn.Module):
def __init__(self, in_features, out_features, merge, rank=16, lora_alpha=16, dropout=0.5):
super(LoRALinear, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.merge = merge
self.rank = rank
self.dropout_rate = dropout
self.lora_alpha = lora_alpha
self.linear = nn.Linear(in_features, out_features)
if rank > 0:
self.lora_b = nn.Parameter(torch.zeros(out_features, rank))
self.lora_a = nn.Parameter(torch.zeros(rank, in_features))
self.scale = self.lora_alpha / self.rank
self.linear.weight.requires_grad = False
nn.init.kaiming_uniform(self.lora_a, a=math.sqrt(5))
nn.init.zeros_(self.lora_b)
if self.dropout_rate > 0:
self.dropout = nn.Dropout(self.dropout_rate)
else:
self.dropout = nn.Identity()
def forward(self, x):
if self.rank > 0 and self.merge:
output = F.linear(x, self.linear.weight, self.lora_b @ self.lora_a * self.scale, self.linear.bias)
output = self.dropout(output)
return output
else:
return self.dropout(self.linear(x))KL 散度
用来衡量两个概率分布的差异,在离散概率分布 和 下, 相对于 的 KL 散度定义为: KL 散度具有非负性和非对称性,当且仅当 和 完全一致时,KL 散度为 0。
# powered by DeepSeek
import numpy as np
def kl_divergence(p, q, epsilon=1e-10):
# 确保 p 和 q 是有效的概率分布
assert np.all(p >= 0) and np.all(q >= 0), "概率值必须非负"
assert np.isclose(np.sum(p), 1) and np.isclose(np.sum(q), 1), "概率分布必须归一化"
# 计算 KL 散度
q = np.clip(q, epsilon, None)
kl_div = np.sum(np.where(p != 0, p * np.log(p / q), 0))
return kl_div
# 示例
p = np.array([0.1, 0.4, 0.5])
q = np.array([0.2, 0.3, 0.5])
print("KL 散度:", kl_divergence(p, q))AUC 指标
AUC(Area Under the Curve)表示 ROC 曲线下的面积(纵轴 TPR,横轴 FPR),它的值的范围通常在 0.5 到 1 之间。 AUC 越接近 1.0,检测方法真实性越高。 AUC 等于 0.5 时,表示模型的预测能力与随机猜测没有差别。此时,模型无法有效地区分正负样本。 AUC 低于 0.5 时,表示模型的预测能力还不如随机猜测,但是反预测即可。
import numpy as np
def calculate_auc(y_true: np.ndarray, y_scores: np.ndarray) -> float:
# 将实际值和分数配对并排序
desc_score_indices = np.argsort(y_scores)[::-1]
y_true = y_true[desc_score_indices]
# 计算TPR(真正率)和FPR(假正率)
tps = np.cumsum(y_true) # True Positives
fps = np.arange(1, len(y_true) + 1) - tps # False Positives
tpr = tps / np.sum(y_true) # True Positive Rate
fpr = fps / (len(y_true) - np.sum(y_true)) # False Positive Rate
# 计算AUC,使用梯形法则
auc = np.trapz(tpr, fpr)
return auc
# 示例用法
y_true = np.array([1, 0, 1, 1, 0, 1]) # 实际标签
y_scores = np.array([0.9, 0.2, 0.8, 0.7, 0.3, 0.6]) # 预测分数
auc_value = calculate_auc(y_true, y_scores)
print(f"AUC值: {auc_value}")梯度下降/牛顿法求平方根
梯度下降,也就是 求导,即 。
# powered by DeepSeek
def sqrt_gd(a, lr=0.001, eps=1e-6, max_iter=10000):
x = 1.0
for _ in range(max_iter):
grad = 4 * x * (x ** 2 - a)
x -= lr * grad
if abs(x ** 2 - a) < eps:
break
return x牛顿迭代,即
def newton_sqrt(a, eps=1e-6, max_iter=10000):
x = a / 2
for _ in range(max_iter):
error = x ** 2 - a
if abs(error) < eps:
break
grad = 2 * x
x = x - (error / grad)
return xGRPO Loss
def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
prompt_ids, prompt_mask = inputs["prompt_ids"], inputs["prompt_mask"]
completion_ids, completion_mask = inputs["completion_ids"], inputs["completion_mask"]
input_ids = torch.cat([prompt_ids, completion_ids], dim=1)
attention_mask = torch.cat([prompt_mask, completion_mask], dim=1)
logits_to_keep = completion_ids.size(1)
per_token_logps = self._get_per_token_logps(model, input_ids, attention_mask, logits_to_keep)
ref_per_token_logps = inputs["ref_per_token_logps"]
# kl散度近似计算(二阶泰勒展开)
per_token_kl = torch.exp(ref_per_token_logps - per_token_logps) - (ref_per_token_logps - per_token_logps) - 1
advantages = inputs["advantages"]
old_per_token_logps = inputs["old_per_token_logps"] if self.num_iterations > 1 else per_token_logps.detach()
coef_1 = torch.exp(per_token_logps - old_per_token_logps)
coef_2 = torch.clamp(coef_1, 1 - self.epsilon, 1 + self.epsilon)
per_token_loss1 = coef_1 * advantages.unsqueeze(1)
per_token_loss2 = coef_2 * advantages.unsqueeze(1)
per_token_loss = -torch.min(per_token_loss1, per_token_loss2)InfoNCE
对比学习经典损失函数,将问题视为负样本个数 的分类问题,不局限于只能处理一对一对的样本(NCE),如果将负样本都认为是同一类显然是不够合理的。
# powered by ChatGPT4o
import torch
import torch.nn.functional as F
# 设置随机种子,便于复现
torch.manual_seed(0)
# 假设我们有一个批次的查询(query)、正样本(positive)和多个负样本(negatives)
batch_size = 4 # 每个批次的样本数
embedding_dim = 128 # 嵌入维度
num_negatives = 10 # 每个查询的负样本数量
# 随机生成查询、正样本和负样本的嵌入
query = torch.randn(batch_size, embedding_dim)
positive = torch.randn(batch_size, embedding_dim)
negatives = torch.randn(batch_size, num_negatives, embedding_dim)
# 计算查询与正样本、负样本的相似度
# 使用cosine相似度
query = F.normalize(query, dim=1)
positive = F.normalize(positive, dim=1)
negatives = F.normalize(negatives, dim=2)
# 正样本的相似度 (query 与 positive 的点积)
positive_sim = torch.sum(query * positive, dim=1, keepdim=True) # shape: (batch_size, 1)
# 负样本的相似度 (query 与 negatives 的点积)
negative_sim = torch.bmm(negatives, query.unsqueeze(2)).squeeze(2) # shape: (batch_size, num_negatives)
# 将正样本和负样本相似度拼接在一起
logits = torch.cat([positive_sim, negative_sim], dim=1) # shape: (batch_size, 1 + num_negatives)
# 创建标签,正样本的标签为0(这个例子中正样本索引为0)
labels = torch.zeros(batch_size, dtype=torch.long)
# 使用交叉熵损失计算 InfoNCE 损失
# InfoNCE的目标是让正样本的相似度最大,负样本最小
temperature = 0.1 # 温度系数,越小越激进(更容易关注到难负样例),越大则相对温和
logits /= temperature
loss = F.cross_entropy(logits, labels)
print("InfoNCE Loss:", loss.item())Focal Loss
是交叉熵的修改,通过平衡因子 和调节因子 来实现控制类别不平衡(由于负样本数量通常较多, 时更关注负类, 时则更关注正类)和聚焦于困难样本( 一般取 2,对于轻易预测准确的样本来说损失会明显更低)。当 且 时,Focal Loss 会退化为标准的交叉熵损失。
# powered by ChatGPT4o
import torch
import torch.nn as nn
import torch.nn.functional as F
class FocalLoss(nn.Module):
def __init__(self, alpha=None, gamma=2.0, reduction='mean'):
super(FocalLoss, self).__init__()
# 处理 alpha,确保它是 Tensor,并且有与类别数匹配的维度
if alpha is None:
self.alpha = torch.ones(1) # 如果没有给定 alpha,使用 [1.0]
else:
self.alpha = torch.tensor(alpha, dtype=torch.float32)
self.gamma = gamma # 聚焦因子
self.reduction = reduction # 损失计算方式
def forward(self, inputs, targets):
# 使用 softmax 计算每个类别的概率
inputs = F.softmax(inputs, dim=-1)
# 选择正确类别的概率
p_t = inputs.gather(dim=-1, index=targets.unsqueeze(-1))
# 确保 alpha 与 targets 对应的索引
alpha_t = self.alpha.gather(0, targets.view(-1))
# 防止计算 log 时出现零值,增加一个小常数
epsilon = 1e-7
p_t = p_t.clamp(min=epsilon) # 防止 p_t 为零
# 计算 Focal Loss
loss = -alpha_t * (1 - p_t) ** self.gamma * torch.log(p_t)
# 进行 reduction(默认是均值)
if self.reduction == 'mean':
return loss.mean()
elif self.reduction == 'sum':
return loss.sum()
else:
return loss蒙特卡洛估算圆周率
import random
# 设置随机点数量(数量越大结果越精确,但计算时间越长)
total_points = 1000000
points_inside = 0
for _ in range(total_points):
# 在边长为2的正方形区域内生成随机点(坐标范围:-1到1)
x = random.uniform(-1, 1)
y = random.uniform(-1, 1)
# 判断点是否在单位圆内
if x**2 + y**2 <= 1:
points_inside += 1
# 计算圆周率近似值(圆面积公式 πr² 与正方形面积公式边长的平方之比)
pi_estimate = 4 * points_inside / total_points
print(f"模拟点数: {total_points}")
print(f"估算的π值: {pi_estimate}")
print(f"实际π值: 3.141592653589793")
print(f"绝对误差: {abs(pi_estimate - 3.141592653589793)}")蓄水池采样
与大模型关系不大,但是也记录一下。

import random
def reservior_sampling(n, k):
nums = [i for i in range(1, n + 1)]
res = []
for i in range(k): # 直接填充前k个数字
res.append(nums[i])
for i in range(k + 1, len(nums)):
# 此后每个新元素有k/i概率替换进去
replace_idx = random.randint(0, i)
if replace_idx < k:
res[replace_idx] = nums[i]
return res