diff --git a/convert_hf_to_gguf.py b/convert_hf_to_gguf.py index 45538fcabb..ac353c7dda 100755 --- a/convert_hf_to_gguf.py +++ b/convert_hf_to_gguf.py @@ -5107,71 +5107,59 @@ class KimiLinearModel(TextModel): self.gguf_writer.add_expert_weights_scale(routed_scaling_factor) def set_vocab(self): - # Kimi uses TikToken tokenizer - load via transformers + try: + self._set_vocab_gpt2() + return + except Exception: + pass + from transformers import AutoTokenizer - - dir_model = self.dir_model - vocab_size = self.hparams["vocab_size"] - - logger.info(f"Loading TikToken tokenizer from {dir_model}") - tokenizer = AutoTokenizer.from_pretrained(dir_model, trust_remote_code=True) - - tokens: list[str] = [] - toktypes: list[int] = [] - - # Get tokenizer pre string + tokenizer = AutoTokenizer.from_pretrained(self.dir_model, trust_remote_code=True) tokpre = self.get_vocab_base_pre(tokenizer) - - # Build vocab from tokenizer - merges = [] - vocab = {} - - # TikToken stores vocab in mergeable_ranks - if hasattr(tokenizer, 'mergeable_ranks'): - mergeable_ranks = tokenizer.mergeable_ranks + + if tokpre == "kimi-k2": + # Build merges list using the approach similar to HunYuanMoE + merges = [] + vocab = {} + mergeable_ranks = tokenizer.model._mergeable_ranks for token, rank in mergeable_ranks.items(): - vocab[self._token_bytes_to_string(token)] = rank + vocab[QwenModel.token_bytes_to_string(token)] = rank if len(token) == 1: continue - # Build merges - merged = self._bpe(mergeable_ranks, token, max_rank=rank) + merged = QwenModel.bpe(mergeable_ranks, token, max_rank=rank) if len(merged) == 2: - merges.append(' '.join(map(self._token_bytes_to_string, merged))) + merges.append(' '.join(map(QwenModel.token_bytes_to_string, merged))) + + # Build token list + vocab_size = self.hparams["vocab_size"] + special_tokens = tokenizer.special_tokens + reverse_vocab = {id_ : encoded_tok for encoded_tok, id_ in {**vocab, **special_tokens}.items()} + tokens: list[str] = [] + toktypes: list[int] = [] + + for i in range(vocab_size): + if i not in reverse_vocab: + tokens.append(f"[PAD{i}]") + toktypes.append(gguf.TokenType.UNUSED) + else: + token = reverse_vocab[i] + tokens.append(token) + if i in special_tokens.values(): + toktypes.append(gguf.TokenType.CONTROL) + else: + toktypes.append(gguf.TokenType.NORMAL) + + self.gguf_writer.add_tokenizer_model("gpt2") + self.gguf_writer.add_tokenizer_pre(tokpre) + self.gguf_writer.add_token_list(tokens) + self.gguf_writer.add_token_types(toktypes) + self.gguf_writer.add_token_merges(merges) + + special_vocab = gguf.SpecialVocab(self.dir_model, load_merges=False) + special_vocab.add_to_gguf(self.gguf_writer) else: - # Fallback: get vocab directly - vocab = {tok: idx for tok, idx in tokenizer.get_vocab().items()} - - # Get special tokens - added_vocab = {} - if hasattr(tokenizer, 'special_tokens'): - added_vocab = tokenizer.special_tokens - elif hasattr(tokenizer, 'added_tokens_encoder'): - added_vocab = tokenizer.added_tokens_encoder - - # Combine vocab - reverse_vocab = {id_: encoded_tok for encoded_tok, id_ in {**vocab, **added_vocab}.items()} - - for i in range(vocab_size): - if i not in reverse_vocab: - tokens.append(f"[PAD{i}]") - toktypes.append(gguf.TokenType.UNUSED) - elif i in added_vocab.values() if added_vocab else False: - tokens.append(reverse_vocab[i]) - toktypes.append(gguf.TokenType.CONTROL) - else: - tokens.append(reverse_vocab[i]) - toktypes.append(gguf.TokenType.NORMAL) - - self.gguf_writer.add_tokenizer_model("gpt2") - self.gguf_writer.add_tokenizer_pre(tokpre) - self.gguf_writer.add_token_list(tokens) - self.gguf_writer.add_token_types(toktypes) - - special_vocab = gguf.SpecialVocab(dir_model, load_merges=False) - special_vocab.merges = merges - special_vocab.add_to_gguf(self.gguf_writer) - logger.info(f"Loaded {len(tokens)} tokens, {len(merges)} merges") - + raise NotImplementedError(f"Deepseek pre-tokenizer {tokpre!r} is not supported yet!") + @staticmethod def _token_bytes_to_string(b: bytes) -> str: """Convert bytes to string representation for tokenizer"""