# ============================================================================ # Google Colab Script: Universal GGUF Hot Patching at Tensor Level # Downloads via Hugging Face hub + xet (hf_xet) # Compatible with ALL quantization types and mix strategies # Uses the official gguf library from ggml-org/llama.cpp # ============================================================================ # --- Install dependencies --- !pip install gguf numpy tqdm huggingface_hub hf_xet import os import sys import json import hashlib import struct import time import math import numpy as np from tqdm import tqdm from typing import Dict, List, Tuple, Optional, Any, Set from collections import Counter from dataclasses import dataclass, field # Hugging Face Hub + Xet for downloads from huggingface_hub import hf_hub_download import hf_xet # enables xet:// protocol for fast downloads # Official gguf library from gguf import GGUFReader as OfficialGGUFReader from gguf import GGUFValueType, GGMLQuantizationType # ============================================================================ # 1. CONFIGURATION # ============================================================================ WORK_DIR = "/content/gguf_hotpatch" os.makedirs(WORK_DIR, exist_ok=True) FILES = { "source": { "label": "lmstudio (source/original)", "repo_id": "lmstudio-community/Qwen3.5-9B-GGUF", "filename": "Qwen3.5-9B-Q8_0.gguf", "local_name": "lmstudio_Qwen3.5-9B-Q8_0.gguf", }, "target": { "label": "HauhauCS (target/patched)", "repo_id": "HauhauCS/Qwen3.5-9B-Uncensored-HauhauCS-Aggressive", "filename": "Qwen3.5-9B-Uncensored-HauhauCS-Aggressive-Q8_0.gguf", "local_name": "HauhauCS_Qwen3.5-9B-Q8_0.gguf", }, "apply_to": { "label": "Jackrong (apply patch to)", "repo_id": "Jackrong/Qwen3.5-9B-Claude-4.6-Opus-Reasoning-Distilled-GGUF", "filename": "Qwen3.5-9B.Q8_0.gguf", "local_name": "Jackrong_Qwen3.5-9B.Q8_0.gguf", }, } OUTPUT_PATH = os.path.join(WORK_DIR, "Jackrong_Qwen3.5-9B.Q8_0_patched.gguf") PATCH_REPORT_PATH = os.path.join(WORK_DIR, "patch_report.json") # ============================================================================ # 2. DOWNLOAD VIA HUGGING FACE HUB + XET # ============================================================================ def download_hf_file(repo_id: str, filename: str, local_name: str) -> str: """ Downloads a file from Hugging Face Hub using hf_xet for acceleration. hf_xet automatically activates when the repo uses Xet storage, providing chunk-level deduplication and parallel downloads. For repos without Xet, falls back to standard HF download. Returns the local file path. """ local_path = os.path.join(WORK_DIR, local_name) # Check if already downloaded if os.path.exists(local_path): size = os.path.getsize(local_path) if size > 0: print(f" ✅ Already exists: {local_name} ({size / 1e9:.2f} GB)") return local_path print(f" 📥 Downloading from {repo_id}/{filename}") print(f" via Hugging Face Hub + xet acceleration...") try: # hf_hub_download with local_dir puts the file at local_dir/filename # We use cache first, then symlink/copy to our work dir cached_path = hf_hub_download( repo_id=repo_id, filename=filename, repo_type="model", local_dir=None, # use HF cache resume_download=True, ) # Create symlink or copy to our work dir with desired name if os.path.exists(local_path): os.remove(local_path) # Symlink to avoid doubling disk usage os.symlink(cached_path, local_path) size = os.path.getsize(local_path) print(f" ✅ Downloaded: {local_name} ({size / 1e9:.2f} GB)") print(f" Cache: {cached_path}") return local_path except Exception as e: print(f" ❌ hf_hub_download failed: {e}") print(f" 🔄 Trying direct download with local_dir...") # Fallback: download directly to work dir try: cached_path = hf_hub_download( repo_id=repo_id, filename=filename, repo_type="model", local_dir=WORK_DIR, local_dir_use_symlinks=False, resume_download=True, ) # hf_hub_download with local_dir puts file at WORK_DIR/filename downloaded_path = os.path.join(WORK_DIR, filename) if downloaded_path != local_path and os.path.exists(downloaded_path): os.rename(downloaded_path, local_path) size = os.path.getsize(local_path) print(f" ✅ Downloaded: {local_name} ({size / 1e9:.2f} GB)") return local_path except Exception as e2: print(f" ❌ Direct download also failed: {e2}") raise # ============================================================================ # 3. QUANTIZATION TYPE REGISTRY # ============================================================================ @dataclass class QuantTypeInfo: type_id: int name: str block_size: int type_size: int is_quantized: bool block_fields: List[Tuple[str, str, int]] = field(default_factory=list) @property def bytes_per_element(self) -> float: return self.type_size / self.block_size if self.block_size else 0 QUANT_REGISTRY: Dict[int, QuantTypeInfo] = {} def _reg(tid, name, bs, ts, iq=True, bf=None): QUANT_REGISTRY[tid] = QuantTypeInfo(tid, name, bs, ts, iq, bf or []) # Unquantized _reg(0, "F32", 1, 4, False, [("value","f32",1)]) _reg(1, "F16", 1, 2, False, [("value","f16",1)]) _reg(30, "BF16", 1, 2, False, [("value","bf16",1)]) _reg(28, "F64", 1, 8, False, [("value","f64",1)]) _reg(24, "I8", 1, 1, False, [("value","i8",1)]) _reg(25, "I16", 1, 2, False, [("value","i16",1)]) _reg(26, "I32", 1, 4, False, [("value","i32",1)]) _reg(27, "I64", 1, 8, False, [("value","i64",1)]) # Standard _reg(2, "Q4_0", 32, 18, True, [("scale","f16",1),("quants","u8",16)]) _reg(3, "Q4_1", 32, 20, True, [("scale","f16",1),("min","f16",1),("quants","u8",16)]) _reg(6, "Q5_0", 32, 22, True, [("scale","f16",1),("qh","u8",4),("quants","u8",16)]) _reg(7, "Q5_1", 32, 24, True, [("scale","f16",1),("min","f16",1),("qh","u8",4),("quants","u8",16)]) _reg(8, "Q8_0", 32, 34, True, [("scale","f16",1),("quants","i8",32)]) _reg(9, "Q8_1", 32, 36, True, [("scale","f16",1),("delta","f16",1),("quants","i8",32)]) # K-quants _reg(10, "Q2_K", 256, 84, True, [("scales","u8",16),("qs","u8",64),("dmin","f16",1),("d","f16",1)]) _reg(11, "Q3_K", 256, 110, True, [("hmask","u8",32),("qs","u8",64),("scales","u8",12),("d","f16",1)]) _reg(12, "Q4_K", 256, 144, True, [("dmin","f16",1),("d","f16",1),("scales","u8",12),("qs","u8",128)]) _reg(13, "Q5_K", 256, 176, True, [("d","f16",1),("dmin","f16",1),("scales","u8",12),("qh","u8",32),("qs","u8",128)]) _reg(14, "Q6_K", 256, 210, True, [("ql","u8",128),("qh","u8",64),("scales","i8",16),("d","f16",1)]) _reg(15, "Q8_K", 256, 292, True, [("d","f32",1),("qs","i8",256),("bsums","i16",16)]) # I-quants _reg(16, "IQ2_XXS", 256, 66, True, [("d","f16",1),("qs","u16",32)]) _reg(17, "IQ2_XS", 256, 74, True, [("d","f16",1),("qs","u16",32),("scales","u8",6)]) _reg(22, "IQ2_S", 256, 82, True, [("d","f16",1),("qs","u8",64),("qh","u8",16)]) _reg(18, "IQ3_XXS", 256, 98, True, [("d","f16",1),("qs","u8",96)]) _reg(21, "IQ3_S", 256, 110, True, [("d","f16",1),("qs","u8",96),("qh","u8",12)]) _reg(20, "IQ4_NL", 32, 18, True, [("d","f16",1),("qs","u8",16)]) _reg(23, "IQ4_XS", 256, 136, True, [("d","f16",1),("scales_h","u16",1),("scales_l","u8",8),("qs","u8",128)]) _reg(19, "IQ1_S", 256, 50, True, [("d","f16",1),("qs","u8",32),("qh","u16",8)]) _reg(29, "IQ1_M", 256, 56, True, [("qs","u8",32),("qh","u8",16),("scales","u8",6)]) # TQ types _reg(34, "TQ1_0", 256, 54, True, [("qs","u8",48),("qh","u8",4),("d","f16",1)]) _reg(35, "TQ2_0", 256, 66, True, [("qs","u8",64),("d","f16",1)]) def get_quant_info(type_id: int) -> QuantTypeInfo: if type_id in QUANT_REGISTRY: return QUANT_REGISTRY[type_id] return QuantTypeInfo(type_id, f"UNKNOWN_{type_id}", 1, 1, False, [("raw","u8",1)]) # ============================================================================ # 4. MIX STRATEGY DETECTION # ============================================================================ KNOWN_MIX_STRATEGIES = { "Q2_K": {"primary": ["Q2_K"], "sensitive": ["Q3_K","Q4_K","Q6_K"], "desc": "2-bit with Q3_K/Q4_K/Q6_K for sensitive layers"}, "Q3_K_S": {"primary": ["Q3_K"], "sensitive": [], "desc": "3-bit uniform"}, "Q3_K_M": {"primary": ["Q3_K"], "sensitive": ["Q4_K","Q6_K"], "desc": "3-bit with Q4_K/Q6_K for sensitive layers"}, "Q3_K_L": {"primary": ["Q3_K"], "sensitive": ["Q4_K","Q5_K","Q6_K"], "desc": "3-bit with Q4_K/Q5_K/Q6_K for sensitive layers"}, "Q4_K_S": {"primary": ["Q4_K"], "sensitive": ["Q6_K"], "desc": "4-bit with Q6_K for sensitive layers"}, "Q4_K_M": {"primary": ["Q4_K"], "sensitive": ["Q6_K"], "desc": "4-bit with Q6_K for attention and output layers"}, "Q5_K_S": {"primary": ["Q5_K"], "sensitive": ["Q6_K"], "desc": "5-bit with Q6_K for sensitive layers"}, "Q5_K_M": {"primary": ["Q4_K","Q5_K"], "sensitive": ["Q6_K"], "desc": "5-bit with Q4_K/Q6_K mix"}, "Q6_K": {"primary": ["Q6_K"], "sensitive": ["Q8_K"], "desc": "6-bit with Q8_K for output"}, "Q8_0": {"primary": ["Q8_0"], "sensitive": [], "desc": "8-bit uniform"}, "IQ2_M": {"primary": ["IQ2_S","IQ2_XS"], "sensitive": ["Q4_K","Q6_K"], "desc": "IQ 2-bit mix with Q4_K/Q6_K for sensitive"}, "IQ3_M": {"primary": ["IQ3_S"], "sensitive": ["Q4_K","Q6_K"], "desc": "IQ 3-bit with Q4_K/Q6_K for sensitive"}, "IQ4_XS": {"primary": ["IQ4_XS"], "sensitive": ["Q6_K"], "desc": "IQ 4-bit with Q6_K for sensitive"}, } SENSITIVE_PATTERNS = [ "token_embd", "output_norm", "output.weight", "attn_v.weight", "attn_k.weight", "attn_q.weight", "attn_output.weight", ] def detect_mix_strategy(tensors: Dict[str, 'TensorMeta']) -> dict: type_counts = Counter() type_by_role = {"sensitive": Counter(), "regular": Counter(), "other": Counter()} for name, meta in tensors.items(): qt = meta.dtype_name type_counts[qt] += 1 is_sens = any(p in name for p in SENSITIVE_PATTERNS) if is_sens: type_by_role["sensitive"][qt] += 1 elif "blk." in name or "layers." in name: type_by_role["regular"][qt] += 1 else: type_by_role["other"][qt] += 1 all_types = set(type_counts.keys()) best_match, best_score = None, 0 for sname, sinfo in KNOWN_MIX_STRATEGIES.items(): expected = set(sinfo["primary"] + sinfo["sensitive"]) overlap = len(all_types & expected) extra = len(all_types - expected - {"F32","F16","BF16"}) score = overlap - extra * 0.5 if score > best_score: best_score, best_match = score, sname result = { "detected_strategy": best_match, "confidence": "high" if best_score >= 2 else "medium" if best_score >= 1 else "low", "quant_type_distribution": dict(type_counts.most_common()), "by_role": {k: dict(v.most_common()) for k, v in type_by_role.items()}, "unique_types": sorted(all_types), "total_tensors": len(tensors), } if best_match and best_match in KNOWN_MIX_STRATEGIES: result["strategy_description"] = KNOWN_MIX_STRATEGIES[best_match]["desc"] return result def check_patch_compatibility(s_strat, t_strat, a_strat) -> dict: warnings = [] s_t = set(s_strat["unique_types"]) t_t = set(t_strat["unique_types"]) a_t = set(a_strat["unique_types"]) if s_t != t_t: added, removed = t_t - s_t, s_t - t_t if added: warnings.append(f"Target has new quant types: {added}") if removed: warnings.append(f"Target removed quant types: {removed}") missing = (s_t & t_t) - a_t if missing: warnings.append(f"apply_to missing types {missing} → byte-level fallback") ss = s_strat.get("detected_strategy") ts = t_strat.get("detected_strategy") ats = a_strat.get("detected_strategy") if ss != ts: warnings.append(f"Different strategies: source={ss}, target={ts}") if ats != ss: warnings.append(f"apply_to strategy ({ats}) differs from source ({ss})") return {"compatible": True, "warnings": warnings, "source": ss, "target": ts, "apply_to": ats} # ============================================================================ # 5. BLOCK-AWARE DELTA ENGINE # ============================================================================ class BlockDeltaEngine: @staticmethod def apply_delta_to_chunk(sb, tb, ab, qi): if not qi.is_quantized: return BlockDeltaEngine._unquantized(sb, tb, ab, qi) if qi.block_fields: return BlockDeltaEngine._block_aware(sb, tb, ab, qi) return BlockDeltaEngine._byte_delta(sb, tb, ab) @staticmethod def _unquantized(sb, tb, ab, qi): n = qi.name if n == "F32": s,t,a = (np.frombuffer(x, np.float32).copy() for x in (sb,tb,ab)) return (a+(t-s)).tobytes() if n == "F16": s,t,a = (np.frombuffer(x, np.float16).astype(np.float32) for x in (sb,tb,ab)) return (a+(t-s)).astype(np.float16).tobytes() if n == "BF16": s,t,a = (_bf16_to_f32(np.frombuffer(x, np.uint16).copy()) for x in (sb,tb,ab)) return _f32_to_bf16(a+(t-s)).tobytes() if n == "F64": s,t,a = (np.frombuffer(x, np.float64).copy() for x in (sb,tb,ab)) return (a+(t-s)).tobytes() if n == "I8": s,t,a = (np.frombuffer(x, np.int8).astype(np.int16) for x in (sb,tb,ab)) return np.clip(a+(t-s),-128,127).astype(np.int8).tobytes() if n == "I16": s,t,a = (np.frombuffer(x, np.int16).astype(np.int32) for x in (sb,tb,ab)) return np.clip(a+(t-s),-32768,32767).astype(np.int16).tobytes() if n == "I32": s,t,a = (np.frombuffer(x, np.int32).astype(np.int64) for x in (sb,tb,ab)) return np.clip(a+(t-s),-(2**31),2**31-1).astype(np.int32).tobytes() if n == "I64": s,t,a = (np.frombuffer(x, np.int64).copy() for x in (sb,tb,ab)) return (a+(t-s)).tobytes() return BlockDeltaEngine._byte_delta(sb, tb, ab) @staticmethod def _block_aware(sb, tb, ab, qi): bsz = qi.type_size nb = len(sb) // bsz if nb == 0: return ab specs = [] off = 0 for fn, fd, fc in qi.block_fields: es = _dtype_size(fd) fs = es * fc specs.append((off, fs, fd, fn)) off += fs if off != bsz: return BlockDeltaEngine._byte_delta(sb, tb, ab) sa = np.frombuffer(sb, np.uint8).copy() ta = np.frombuffer(tb, np.uint8).copy() aa = np.frombuffer(ab, np.uint8).copy() result = aa.copy() for fo, fs, fd, fn in specs: idx = (np.arange(nb)[:,None] * bsz + fo + np.arange(fs)).ravel() sf, tf, af = sa[idx], ta[idx], aa[idx] if np.array_equal(sf, tf): continue if fd == "f16": sv = np.frombuffer(sf.tobytes(), np.float16).astype(np.float32) tv = np.frombuffer(tf.tobytes(), np.float16).astype(np.float32) av = np.frombuffer(af.tobytes(), np.float16).astype(np.float32) result[idx] = np.frombuffer((av+(tv-sv)).astype(np.float16).tobytes(), np.uint8) elif fd == "f32": sv = np.frombuffer(sf.tobytes(), np.float32).copy() tv = np.frombuffer(tf.tobytes(), np.float32).copy() av = np.frombuffer(af.tobytes(), np.float32).copy() result[idx] = np.frombuffer((av+(tv-sv)).tobytes(), np.uint8) elif fd == "bf16": sv = _bf16_to_f32(np.frombuffer(sf.tobytes(), np.uint16).copy()) tv = _bf16_to_f32(np.frombuffer(tf.tobytes(), np.uint16).copy()) av = _bf16_to_f32(np.frombuffer(af.tobytes(), np.uint16).copy()) result[idx] = np.frombuffer(_f32_to_bf16(av+(tv-sv)).tobytes(), np.uint8) elif fd == "i8": sv = np.frombuffer(sf.tobytes(), np.int8).astype(np.int16) tv = np.frombuffer(tf.tobytes(), np.int8).astype(np.int16) av = np.frombuffer(af.tobytes(), np.int8).astype(np.int16) result[idx] = np.frombuffer(np.clip(av+(tv-sv),-128,127).astype(np.int8).tobytes(), np.uint8) elif fd == "u8": result[idx] = np.clip(af.astype(np.int16)+(tf.astype(np.int16)-sf.astype(np.int16)),0,255).astype(np.uint8) elif fd == "u16": sv = np.frombuffer(sf.tobytes(), np.uint16).astype(np.int32) tv = np.frombuffer(tf.tobytes(), np.uint16).astype(np.int32) av = np.frombuffer(af.tobytes(), np.uint16).astype(np.int32) result[idx] = np.frombuffer(np.clip(av+(tv-sv),0,65535).astype(np.uint16).tobytes(), np.uint8) elif fd == "i16": sv = np.frombuffer(sf.tobytes(), np.int16).astype(np.int32) tv = np.frombuffer(tf.tobytes(), np.int16).astype(np.int32) av = np.frombuffer(af.tobytes(), np.int16).astype(np.int32) result[idx] = np.frombuffer(np.clip(av+(tv-sv),-32768,32767).astype(np.int16).tobytes(), np.uint8) elif fd == "f64": sv = np.frombuffer(sf.tobytes(), np.float64).copy() tv = np.frombuffer(tf.tobytes(), np.float64).copy() av = np.frombuffer(af.tobytes(), np.float64).copy() result[idx] = np.frombuffer((av+(tv-sv)).tobytes(), np.uint8) else: result[idx] = np.clip(af.astype(np.int16)+(tf.astype(np.int16)-sf.astype(np.int16)),0,255).astype(np.uint8) return result.tobytes() @staticmethod def _byte_delta(sb, tb, ab): s = np.frombuffer(sb, np.uint8).astype(np.int16) t = np.frombuffer(tb, np.uint8).astype(np.int16) a = np.frombuffer(ab, np.uint8).astype(np.int16) return np.clip(a+(t-s),0,255).astype(np.uint8).tobytes() def _dtype_size(d): return {"u8":1,"i8":1,"u16":2,"i16":2,"f16":2,"bf16":2, "u32":4,"i32":4,"f32":4,"u64":8,"i64":8,"f64":8}.get(d,1) def _bf16_to_f32(u16): return np.frombuffer((u16.astype(np.uint32)<<16).tobytes(), np.float32).copy() def _f32_to_bf16(f32): u32 = np.frombuffer(f32.astype(np.float32).tobytes(), np.uint32).copy() u32 += 0x7FFF + ((u32>>16)&1) return (u32>>16).astype(np.uint16) # ============================================================================ # 6. GGUF FILE WRAPPER # ============================================================================ @dataclass class TensorMeta: name: str; shape: list; dtype_id: int; dtype_enum: Any; dtype_name: str n_elements: int; data_offset: int; data_size: int; quant_info: QuantTypeInfo @dataclass class MetadataEntry: key: str; value_type: Any; value: Any class GGUFFile: def __init__(self, filepath): self.filepath = filepath self.filesize = os.path.getsize(filepath) self.reader = OfficialGGUFReader(filepath) self.metadata = {} for fo in self.reader.fields.values(): k = fo.name try: v = self._extract(fo) if len(fo.parts)>0 else None self.metadata[k] = MetadataEntry(k, fo.types[0] if fo.types else None, v) except Exception as e: self.metadata[k] = MetadataEntry(k, None, f"") self.tensors = {} for t in self.reader.tensors: de = t.tensor_type di = de.value if hasattr(de,'value') else int(de) dn = de.name if hasattr(de,'name') else str(de) self.tensors[t.name] = TensorMeta( t.name, list(t.shape), di, de, dn, int(t.n_elements), int(t.data_offset), int(t.n_bytes), get_quant_info(di)) self._f = open(filepath, "rb") def _extract(self, fo): types, parts, data = fo.types, fo.parts, fo.data if not types: return None ft = types[0] if ft == GGUFValueType.ARRAY: return [self._scalar(bytes(parts[i]), types[1]) for i in data] if len(types)>=2 else [] if ft == GGUFValueType.STRING: return bytes(parts[data[0]]).decode("utf-8", errors="replace") if data else "" return self._scalar(bytes(parts[data[0]]), ft) if data else None def _scalar(self, raw, vt): m = {GGUFValueType.UINT8:("0: c = min(cs,rem); ch = gf.read_chunk(name,off,c); h.update(ch); off+=len(ch); rem-=len(ch) return h.hexdigest() def _veq(a,b): if type(a)!=type(b): return False if isinstance(a,list): return len(a)==len(b) and all(_veq(x,y) for x,y in zip(a,b)) if isinstance(a,float): if a!=a and b!=b: return True return abs(a-b)<1e-10 return a==b def analyze_differences(source, target, cs=4*1024*1024): patch = GGUFPatch(source.filepath, target.filepath) print("\n 📋 Analyzing metadata...") for k in sorted(set(source.metadata)|set(target.metadata)): ins, int_ = k in source.metadata, k in target.metadata if ins and int_: if _veq(source.metadata[k].value, target.metadata[k].value): patch.metadata_identical.append(k) else: patch.metadata_patches.append(MetadataPatch(k,"modify",source.metadata[k].value,target.metadata[k].value)) elif ins: patch.metadata_patches.append(MetadataPatch(k,"remove",source.metadata[k].value)) else: patch.metadata_patches.append(MetadataPatch(k,"add",target_value=target.metadata[k].value)) print(" 🔬 Analyzing tensors...") sn, tn = set(source.tensors), set(target.tensors) for n in sorted(sn-tn): patch.tensor_patches.append(TensorPatch(n,"remove",source_shape=source.tensors[n].shape,source_dtype=source.tensors[n].dtype_name)) for n in sorted(tn-sn): patch.tensor_patches.append(TensorPatch(n,"add",target_shape=target.tensors[n].shape,target_dtype=target.tensors[n].dtype_name,data_size=target.tensors[n].data_size)) common = sorted(sn & tn) print(f" 🔍 Comparing {len(common)} common tensors...") for name in tqdm(common, desc=" Comparing", ncols=100): sm, tm = source.tensors[name], target.tensors[name] if sm.shape!=tm.shape or sm.dtype_id!=tm.dtype_id or sm.data_size!=tm.data_size: patch.tensor_patches.append(TensorPatch(name,"data_replace", source_shape=sm.shape,target_shape=tm.shape,source_dtype=sm.dtype_name, target_dtype=tm.dtype_name,source_dtype_id=sm.dtype_id,target_dtype_id=tm.dtype_id,data_size=tm.data_size)) continue ds=sm.data_size; off=0; db=0; ident=True while off0 and block_aware else chunk_size off=0; dd=0 while off12,}/{s:>12,} bytes ({d/s*100:.4f}%)" if s else f" {qt}: {len(ps)} tensors") print(f"\n 🔸 Top modified:") for p in sorted(dp, key=lambda x:x.diff_bytes, reverse=True)[:15]: print(f" {p.tensor_name} [{p.source_dtype}] {p.diff_bytes:,}/{p.data_size:,} ({p.diff_percentage:.4f}%)") # ============================================================================ # 8. MAIN # ============================================================================ def main(): print("="*80) print("🚀 UNIVERSAL GGUF HOT PATCHING") print(" Downloads via Hugging Face Hub + xet acceleration") print(" All quant types & mix strategies supported") print("="*80) print(f"\n 📦 Supported quant types ({len(QUANT_REGISTRY)}):") for i in range(0,len(QUANT_REGISTRY),8): print(" "+" ".join(f"{q.name:8s}" for q in list(QUANT_REGISTRY.values())[i:i+8])) print(f"\n 📦 Known mix strategies ({len(KNOWN_MIX_STRATEGIES)}):") for n,i in KNOWN_MIX_STRATEGIES.items(): print(f" {n:12s}: {i['desc']}") # Download via HF Hub + xet print(f"\n{'='*80}") print(f"📥 STEP 1: DOWNLOADING VIA HUGGING FACE HUB + XET") print(f"{'='*80}") paths = {} for key, cfg in FILES.items(): print(f"\n --- {cfg['label']} ---") paths[key] = download_hf_file(cfg["repo_id"], cfg["filename"], cfg["local_name"]) print(f" Size: {os.path.getsize(paths[key]):,} bytes") # Parse print(f"\n{'='*80}") print(f"📖 STEP 2: PARSING GGUF FILES") print(f"{'='*80}") source = GGUFFile(paths["source"]) target = GGUFFile(paths["target"]) apply_to = GGUFFile(paths["apply_to"]) for lbl, gf in [("Source",source),("Target",target),("Apply_to",apply_to)]: d = Counter(t.dtype_name for t in gf.tensors.values()) print(f" {lbl:10s}: {len(gf.metadata)} meta, {len(gf.tensors)} tensors [{dict(d)}]") # Analyze print(f"\n{'='*80}") print(f"🔍 STEP 3: ANALYZING DIFFERENCES") print(f"{'='*80}") patch = analyze_differences(source, target) print_report(patch, source, target, apply_to) # Apply print(f"\n{'='*80}") print(f"🔧 STEP 4: APPLYING PATCH") print(f"{'='*80}") stats, entries = apply_tensor_patch(source, target, apply_to, patch, OUTPUT_PATH) # Verify success = verify_result(source, target, OUTPUT_PATH, patch) # Save report report = { "files": {k: paths[k] for k in FILES}, "output": OUTPUT_PATH, "download_method": "huggingface_hub + hf_xet", "strategies": { "source": detect_mix_strategy(source.tensors), "target": detect_mix_strategy(target.tensors), "apply_to": detect_mix_strategy(apply_to.tensors), }, "stats": stats, "details": entries, "verified": success, } with open(PATCH_REPORT_PATH,"w") as f: json.dump(report, f, indent=2, default=str) print(f"\n{'='*80}") print(f"🏁 SUMMARY") print(f"{'='*80}") for k,cfg in FILES.items(): print(f" 📄 {cfg['label']:40s} {os.path.getsize(paths[k]):>15,} bytes") print(f" 📄 {'PATCHED OUTPUT':40s} {os.path.getsize(OUTPUT_PATH):>15,} bytes") print(f"\n {'✅ Success!' if success else '⚠️ Completed with mismatches'}") print(f" 📁 {OUTPUT_PATH}") source.close(); target.close(); apply_to.close() if __name__ == "__main__": main()