use DeepseekV2 tokenizer

This commit is contained in:
Yee Man Chan 2025-12-14 17:43:30 +08:00
parent a0269af292
commit ef5bc30544
1 changed files with 46 additions and 58 deletions

View File

@ -5107,71 +5107,59 @@ class KimiLinearModel(TextModel):
self.gguf_writer.add_expert_weights_scale(routed_scaling_factor)
def set_vocab(self):
# Kimi uses TikToken tokenizer - load via transformers
try:
self._set_vocab_gpt2()
return
except Exception:
pass
from transformers import AutoTokenizer
dir_model = self.dir_model
vocab_size = self.hparams["vocab_size"]
logger.info(f"Loading TikToken tokenizer from {dir_model}")
tokenizer = AutoTokenizer.from_pretrained(dir_model, trust_remote_code=True)
tokens: list[str] = []
toktypes: list[int] = []
# Get tokenizer pre string
tokenizer = AutoTokenizer.from_pretrained(self.dir_model, trust_remote_code=True)
tokpre = self.get_vocab_base_pre(tokenizer)
# Build vocab from tokenizer
merges = []
vocab = {}
# TikToken stores vocab in mergeable_ranks
if hasattr(tokenizer, 'mergeable_ranks'):
mergeable_ranks = tokenizer.mergeable_ranks
if tokpre == "kimi-k2":
# Build merges list using the approach similar to HunYuanMoE
merges = []
vocab = {}
mergeable_ranks = tokenizer.model._mergeable_ranks
for token, rank in mergeable_ranks.items():
vocab[self._token_bytes_to_string(token)] = rank
vocab[QwenModel.token_bytes_to_string(token)] = rank
if len(token) == 1:
continue
# Build merges
merged = self._bpe(mergeable_ranks, token, max_rank=rank)
merged = QwenModel.bpe(mergeable_ranks, token, max_rank=rank)
if len(merged) == 2:
merges.append(' '.join(map(self._token_bytes_to_string, merged)))
merges.append(' '.join(map(QwenModel.token_bytes_to_string, merged)))
# Build token list
vocab_size = self.hparams["vocab_size"]
special_tokens = tokenizer.special_tokens
reverse_vocab = {id_ : encoded_tok for encoded_tok, id_ in {**vocab, **special_tokens}.items()}
tokens: list[str] = []
toktypes: list[int] = []
for i in range(vocab_size):
if i not in reverse_vocab:
tokens.append(f"[PAD{i}]")
toktypes.append(gguf.TokenType.UNUSED)
else:
token = reverse_vocab[i]
tokens.append(token)
if i in special_tokens.values():
toktypes.append(gguf.TokenType.CONTROL)
else:
toktypes.append(gguf.TokenType.NORMAL)
self.gguf_writer.add_tokenizer_model("gpt2")
self.gguf_writer.add_tokenizer_pre(tokpre)
self.gguf_writer.add_token_list(tokens)
self.gguf_writer.add_token_types(toktypes)
self.gguf_writer.add_token_merges(merges)
special_vocab = gguf.SpecialVocab(self.dir_model, load_merges=False)
special_vocab.add_to_gguf(self.gguf_writer)
else:
# Fallback: get vocab directly
vocab = {tok: idx for tok, idx in tokenizer.get_vocab().items()}
# Get special tokens
added_vocab = {}
if hasattr(tokenizer, 'special_tokens'):
added_vocab = tokenizer.special_tokens
elif hasattr(tokenizer, 'added_tokens_encoder'):
added_vocab = tokenizer.added_tokens_encoder
# Combine vocab
reverse_vocab = {id_: encoded_tok for encoded_tok, id_ in {**vocab, **added_vocab}.items()}
for i in range(vocab_size):
if i not in reverse_vocab:
tokens.append(f"[PAD{i}]")
toktypes.append(gguf.TokenType.UNUSED)
elif i in added_vocab.values() if added_vocab else False:
tokens.append(reverse_vocab[i])
toktypes.append(gguf.TokenType.CONTROL)
else:
tokens.append(reverse_vocab[i])
toktypes.append(gguf.TokenType.NORMAL)
self.gguf_writer.add_tokenizer_model("gpt2")
self.gguf_writer.add_tokenizer_pre(tokpre)
self.gguf_writer.add_token_list(tokens)
self.gguf_writer.add_token_types(toktypes)
special_vocab = gguf.SpecialVocab(dir_model, load_merges=False)
special_vocab.merges = merges
special_vocab.add_to_gguf(self.gguf_writer)
logger.info(f"Loaded {len(tokens)} tokens, {len(merges)} merges")
raise NotImplementedError(f"Deepseek pre-tokenizer {tokpre!r} is not supported yet!")
@staticmethod
def _token_bytes_to_string(b: bytes) -> str:
"""Convert bytes to string representation for tokenizer"""