diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index b79a5270b5..ca72e6cb9f 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -455,6 +455,127 @@ struct server_slot { } }; +// +// checkpoint persistence helpers for hybrid/recurrent models +// +// Hybrid models (e.g. Qwen3.5, Jamba, Falcon-H1) use recurrent layers whose +// state cannot be partially restored from the KV cache alone. The server +// creates "context checkpoints" during prompt processing that snapshot the +// full recurrent state at regular intervals. These checkpoints live in +// server_prompt::checkpoints and are essential to avoid a full prompt +// re-processing when the slot is reused. +// +// The built-in /slots save/restore API persists the raw KV+recurrent memory +// via llama_state_seq_{save,load}_file, but does NOT persist the checkpoint +// metadata. The two helpers below fill that gap: they write/read a small +// companion file (.checkpoints) next to the main slot save file. +// +// File format (binary, little-endian): +// uint32 magic = 0x4C4C4350 ("LLCP") +// uint32 version = 1 +// uint32 n_checkpoints +// For each checkpoint: +// int32 pos_min +// int32 pos_max +// int64 n_tokens +// uint64 data_size +// uint8 data[data_size] +// + +static bool slot_checkpoints_save(const std::string & filepath, + const std::list & checkpoints) { + if (checkpoints.empty()) { + return true; + } + + const std::string cp_path = filepath + ".checkpoints"; + FILE * fp = fopen(cp_path.c_str(), "wb"); + if (!fp) { + SRV_WRN("failed to open checkpoint file for writing: %s\n", cp_path.c_str()); + return false; + } + + const uint32_t magic = 0x4C4C4350; + const uint32_t version = 1; + const uint32_t n_cp = (uint32_t) checkpoints.size(); + + bool ok = true; + ok = ok && fwrite(&magic, sizeof(magic), 1, fp) == 1; + ok = ok && fwrite(&version, sizeof(version), 1, fp) == 1; + ok = ok && fwrite(&n_cp, sizeof(n_cp), 1, fp) == 1; + + for (const auto & cp : checkpoints) { + const uint64_t data_size = cp.data.size(); + ok = ok && fwrite(&cp.pos_min, sizeof(cp.pos_min), 1, fp) == 1; + ok = ok && fwrite(&cp.pos_max, sizeof(cp.pos_max), 1, fp) == 1; + ok = ok && fwrite(&cp.n_tokens, sizeof(cp.n_tokens), 1, fp) == 1; + ok = ok && fwrite(&data_size, sizeof(data_size), 1, fp) == 1; + if (data_size > 0) { + ok = ok && fwrite(cp.data.data(), 1, data_size, fp) == data_size; + } + } + + fclose(fp); + + if (!ok) { + SRV_WRN("failed to write checkpoint data to %s\n", cp_path.c_str()); + std::remove(cp_path.c_str()); + return false; + } + + SRV_INF("saved %u context checkpoints to %s\n", n_cp, cp_path.c_str()); + return true; +} + +static bool slot_checkpoints_load(const std::string & filepath, + std::list & checkpoints) { + const std::string cp_path = filepath + ".checkpoints"; + FILE * fp = fopen(cp_path.c_str(), "rb"); + if (!fp) { + return true; // no checkpoint file is not an error + } + + uint32_t magic = 0, version = 0, n_cp = 0; + bool ok = true; + ok = ok && fread(&magic, sizeof(magic), 1, fp) == 1; + ok = ok && fread(&version, sizeof(version), 1, fp) == 1; + ok = ok && fread(&n_cp, sizeof(n_cp), 1, fp) == 1; + + if (!ok || magic != 0x4C4C4350 || version != 1) { + SRV_WRN("invalid checkpoint file header: %s\n", cp_path.c_str()); + fclose(fp); + return false; + } + + checkpoints.clear(); + + for (uint32_t i = 0; i < n_cp && ok; i++) { + server_prompt_checkpoint cp; + uint64_t data_size = 0; + ok = ok && fread(&cp.pos_min, sizeof(cp.pos_min), 1, fp) == 1; + ok = ok && fread(&cp.pos_max, sizeof(cp.pos_max), 1, fp) == 1; + ok = ok && fread(&cp.n_tokens, sizeof(cp.n_tokens), 1, fp) == 1; + ok = ok && fread(&data_size, sizeof(data_size), 1, fp) == 1; + if (ok && data_size > 0) { + cp.data.resize(data_size); + ok = ok && fread(cp.data.data(), 1, data_size, fp) == data_size; + } + if (ok) { + checkpoints.push_back(std::move(cp)); + } + } + + fclose(fp); + + if (!ok) { + SRV_WRN("failed to read checkpoint data from %s\n", cp_path.c_str()); + checkpoints.clear(); + return false; + } + + SRV_INF("restored %u context checkpoints from %s\n", n_cp, cp_path.c_str()); + return true; +} // @@ -1822,6 +1943,9 @@ private: const llama_tokens & tokens = slot->prompt.tokens.get_text_tokens(); const size_t nwrite = llama_state_seq_save_file(ctx, filepath.c_str(), slot->id, tokens.data(), token_count); + // persist context checkpoints alongside the slot state + slot_checkpoints_save(filepath, slot->prompt.checkpoints); + const int64_t t_end = ggml_time_us(); const double t_save_ms = (t_end - t_start) / 1000.0; @@ -1869,6 +1993,9 @@ private: slot->prompt.tokens.clear(); slot->prompt.tokens.insert(tokens); + // restore context checkpoints if a companion file exists + slot_checkpoints_load(filepath, slot->prompt.checkpoints); + const int64_t t_end = ggml_time_us(); const double t_restore_ms = (t_end - t_start) / 1000.0; @@ -2965,6 +3092,66 @@ llama_context * server_context::get_llama_context() const { return impl->ctx; } +void server_context::auto_save_slots() { + const auto & params = impl->params_base; + if (params.slot_save_path.empty()) { + return; + } + + for (auto & slot : impl->slots) { + if (slot.prompt.tokens.size() == 0) { + continue; + } + + const std::string model_stem = std::filesystem::path(params.model.path).stem().string(); + const std::string filepath = params.slot_save_path + "/" + model_stem; + + const llama_tokens & tokens = slot.prompt.tokens.get_text_tokens(); + const size_t token_count = slot.prompt.tokens.size(); + const size_t nwrite = llama_state_seq_save_file(impl->ctx, filepath.c_str(), slot.id, tokens.data(), token_count); + + slot_checkpoints_save(filepath, slot.prompt.checkpoints); + + SRV_INF("auto-saved slot %d (%zu tokens, %.1f MiB) to %s\n", + slot.id, token_count, (float) nwrite / (1024.0f * 1024.0f), filepath.c_str()); + } +} + +void server_context::auto_restore_slots() { + const auto & params = impl->params_base; + if (params.slot_save_path.empty()) { + return; + } + + const std::string model_stem = std::filesystem::path(params.model.path).stem().string(); + const std::string filepath = params.slot_save_path + "/" + model_stem; + + if (!std::filesystem::exists(filepath)) { + return; + } + + for (auto & slot : impl->slots) { + llama_tokens tokens; + tokens.resize(slot.n_ctx); + size_t token_count = 0; + const size_t nread = llama_state_seq_load_file(impl->ctx, filepath.c_str(), slot.id, tokens.data(), tokens.size(), &token_count); + + if (nread == 0) { + SRV_WRN("auto-restore failed for slot %d from %s\n", slot.id, filepath.c_str()); + continue; + } + + tokens.resize(token_count); + slot.prompt.tokens.clear(); + slot.prompt.tokens.insert(tokens); + + slot_checkpoints_load(filepath, slot.prompt.checkpoints); + + SRV_INF("auto-restored slot %d (%zu tokens, %.1f MiB) from %s\n", + slot.id, token_count, (float) nread / (1024.0f * 1024.0f), filepath.c_str()); + } +} + server_response_reader server_context::get_response_reader() { return impl->get_response_reader(); } diff --git a/tools/server/server-context.h b/tools/server/server-context.h index a4d2201cbe..a7bf65c4c9 100644 --- a/tools/server/server-context.h +++ b/tools/server/server-context.h @@ -64,6 +64,11 @@ struct server_context { // terminate main loop (will unblock start_loop) void terminate(); + // auto-save/restore slot state for seamless model hot-swapping in router mode + // requires --slot-save-path to be set + void auto_save_slots(); + void auto_restore_slots(); + // get the underlaying llama_context, can return nullptr if sleeping // not thread-safe, should only be used from the main thread llama_context * get_llama_context() const; diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 73e3aa44e0..721ae3a1d3 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -279,6 +279,9 @@ int main(int argc, char ** argv) { LOG_INF("%s: model loaded\n", __func__); + // in router mode, restore previously saved slot state for this model + ctx_server.auto_restore_slots(); + shutdown_handler = [&](int) { // this will unblock start_loop() ctx_server.terminate(); @@ -323,6 +326,9 @@ int main(int argc, char ** argv) { // this call blocks the main thread until queue_tasks.terminate() is called ctx_server.start_loop(); + // in router mode, save slot state before exit so it can be restored on reload + ctx_server.auto_save_slots(); + clean_up(); if (ctx_http.thread.joinable()) { ctx_http.thread.join();