| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- """
- Qwen 模型配置和加载
- """
- from dataclasses import dataclass
- from typing import List
- import os
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import logging
- from transformers import AutoModelForCausalLM, AutoTokenizer
- from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
- from finetunex.models.base import BaseModelConfig
- logger = logging.getLogger(__name__)
- @dataclass
- class QwenConfig(BaseModelConfig):
- """Qwen 模型专用配置"""
-
- model_name: str = "Qwen/Qwen3.5-0.5B"
- target_modules: List[str] = None
-
- def __post_init__(self):
- if self.target_modules is None:
- self.target_modules = [
- "q_proj",
- "k_proj",
- "v_proj",
- "o_proj",
- "gate_proj",
- "up_proj",
- "down_proj",
- ]
- def is_qwen3_5(self) -> bool:
- return "qwen3.5" in self.model_name.lower() or "qwen3_5" in self.model_name.lower()
- def is_qwen3(self) -> bool:
- name = self.model_name.lower()
- return "qwen3" in name and "qwen3.5" not in name and "qwen3_5" not in name
- class _NPUConv1d(nn.Module):
- """华为升腾 NPU 兼容的 Conv1d 实现
-
- 使用 unfold + einsum 替代 F.conv1d,
- 避免 NPU 上 Conv2D 算子编译失败的问题。
- """
- def __init__(self, original_conv1d: nn.Conv1d):
- super().__init__()
- self.stride = original_conv1d.stride[0]
- self.padding = original_conv1d.padding[0]
- self.dilation = original_conv1d.dilation[0]
- self.groups = original_conv1d.groups
- self.kernel_size = original_conv1d.kernel_size[0]
- self.in_channels = original_conv1d.in_channels
- self.out_channels = original_conv1d.out_channels
- self.weight = original_conv1d.weight
- self.bias = original_conv1d.bias
- def forward(self, input: torch.Tensor) -> torch.Tensor:
- if self.padding > 0:
- input = F.pad(input, (self.padding, 0))
- if self.kernel_size == 1 and self.stride == 1 and self.dilation == 1:
- output = F.linear(input.transpose(1, 2),
- self.weight.squeeze(-1),
- self.bias)
- return output.transpose(1, 2)
- unfolded = input.unfold(2, self.kernel_size, self.stride)
- weight = self.weight
- output = torch.einsum('bci,oci->bo', unfolded, weight)
- if self.bias is not None:
- output = output + self.bias.unsqueeze(0)
- return output
- def _patch_conv1d_for_npu():
- """Monkey-patch Conv1d 使其在华为升腾 NPU 上使用纯 PyTorch 实现"""
- original_forward = nn.Conv1d.forward
- if hasattr(nn.Conv1d, '_npu_patched'):
- return
- def npu_conv1d_forward(self, input):
- try:
- return original_forward(self, input)
- except RuntimeError as e:
- if "Conv2D" in str(e) or "500001" in str(e):
- logger.info(f"Conv1d 在 NPU 上失败,回退到纯 PyTorch 实现: {e}")
- fallback = _NPUConv1d(self)
- return fallback(input)
- raise
- nn.Conv1d.forward = npu_conv1d_forward
- nn.Conv1d._npu_patched = True
- logger.info("已应用 Conv1d NPU 兼容补丁")
- def _patch_qwen3_5_for_npu(model_path: str):
- """修补 Qwen3.5 模型配置以兼容华为升腾 NPU"""
- import json
- config_path = os.path.join(model_path, "config.json")
-
- if not os.path.exists(config_path):
- logger.warning(f"未找到模型配置文件:{config_path}")
- return
-
- with open(config_path, "r", encoding="utf-8") as f:
- model_config = json.load(f)
-
- if model_config.get("model_type") != "qwen3_5":
- return
-
- changed = False
-
- if "linear_attn" in model_config:
- logger.info("检测到 linear_attn 配置,NPU 不支持,将替换为 sdpa attention")
- del model_config["linear_attn"]
- changed = True
-
- if model_config.get("_attn_implementation", "") == "linear":
- logger.info("检测到 _attn_implementation=linear,将替换为 eager")
- model_config["_attn_implementation"] = "eager"
- changed = True
-
- attn_layers = model_config.get("attention_layers", None)
- if attn_layers:
- if any(v == "linear_attn" for v in attn_layers.values()):
- logger.info("检测到 attention_layers 中包含 linear_attn,将替换为 eager")
- model_config["attention_layers"] = {
- k: "eager" if v == "linear_attn" else v
- for k, v in attn_layers.items()
- }
- changed = True
-
- if changed:
- backup_path = config_path + ".bak"
- if not os.path.exists(backup_path):
- import shutil
- shutil.copy2(config_path, backup_path)
- logger.info(f"原始配置已备份到:{backup_path}")
-
- with open(config_path, "w", encoding="utf-8") as f:
- json.dump(model_config, f, indent=2, ensure_ascii=False)
- logger.info("模型配置已修改,linear attention 已替换为 sdpa attention")
- def load_qwen_model(config: QwenConfig):
- """加载 Qwen 模型"""
-
- print(f"正在加载模型:{config.model_name}")
-
- is_npu = hasattr(torch, 'npu') and torch.npu.is_available()
- is_qwen3_5 = config.is_qwen3_5()
- is_qwen3 = config.is_qwen3()
-
- if is_npu:
- logger.info("检测到华为升腾 NPU,应用兼容性补丁...")
- if is_qwen3_5:
- _patch_conv1d_for_npu()
- _patch_qwen3_5_for_npu(config.model_name)
- elif is_qwen3:
- logger.info("Qwen3 模型使用标准 attention,NPU 兼容性良好,无需额外补丁")
-
- compute_dtype = config.get_compute_dtype()
-
- use_quantization = config.use_4bit
- if use_quantization:
- try:
- from transformers import BitsAndBytesConfig
- bnb_config = BitsAndBytesConfig(
- load_in_4bit=config.use_4bit,
- bnb_4bit_quant_type=config.bnb_4bit_quant_type,
- bnb_4bit_compute_dtype=compute_dtype,
- bnb_4bit_use_double_quant=config.use_nested_quant,
- )
- print("使用 4bit 量化加载模型(需要 NVIDIA GPU)")
- except (ImportError, Exception) as e:
- logger.warning(f"无法使用 4bit 量化: {e}")
- logger.warning("将使用 bf16/fp16 加载模型")
- use_quantization = False
- bnb_config = None
- else:
- bnb_config = None
- print(f"使用 {compute_dtype} 精度加载模型")
-
- tokenizer = AutoTokenizer.from_pretrained(
- config.model_name,
- trust_remote_code=config.trust_remote_code,
- padding_side="right",
- )
- tokenizer.pad_token = tokenizer.eos_token
-
- model_kwargs = {
- "quantization_config": bnb_config if use_quantization else None,
- "device_map": "auto",
- "trust_remote_code": config.trust_remote_code,
- "torch_dtype": compute_dtype,
- }
-
- if is_npu:
- model_kwargs["attn_implementation"] = "eager"
-
- model = AutoModelForCausalLM.from_pretrained(
- config.model_name,
- **model_kwargs,
- )
-
- if use_quantization:
- model = prepare_model_for_kbit_training(model)
-
- peft_config = LoraConfig(
- lora_alpha=config.lora_alpha,
- lora_dropout=config.lora_dropout,
- r=config.lora_r,
- bias="none",
- task_type="CAUSAL_LM",
- target_modules=config.target_modules,
- )
-
- model = get_peft_model(model, peft_config)
-
- print(f"模型加载完成!可训练参数:{model.print_trainable_parameters()}")
-
- return model, tokenizer, peft_config
|