add server-queue
This commit is contained in:
parent
e1a756e934
commit
3b7946034c
|
|
@ -17,6 +17,8 @@ set(TARGET_SRCS
|
|||
server-http.h
|
||||
server-task.cpp
|
||||
server-task.h
|
||||
server-queue.cpp
|
||||
server-queue.h
|
||||
server-common.cpp
|
||||
server-common.h
|
||||
)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,261 @@
|
|||
#include "server-common.h"
|
||||
#include "server-task.h"
|
||||
#include "server-queue.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <deque>
|
||||
#include <mutex>
|
||||
#include <unordered_set>
|
||||
|
||||
//
|
||||
// server_queue
|
||||
//
|
||||
|
||||
int server_queue::post(server_task && task, bool front) {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
GGML_ASSERT(task.id != -1);
|
||||
// if this is cancel task make sure to clean up pending tasks
|
||||
if (task.type == SERVER_TASK_TYPE_CANCEL) {
|
||||
cleanup_pending_task(task.id_target);
|
||||
}
|
||||
const int task_id = task.id;
|
||||
QUE_DBG("new task, id = %d, front = %d\n", task_id, front);
|
||||
if (front) {
|
||||
queue_tasks.push_front(std::move(task));
|
||||
} else {
|
||||
queue_tasks.push_back(std::move(task));
|
||||
}
|
||||
condition_tasks.notify_one();
|
||||
return task_id;
|
||||
}
|
||||
|
||||
int server_queue::post(std::vector<server_task> && tasks, bool front) {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
for (auto & task : tasks) {
|
||||
if (task.id == -1) {
|
||||
task.id = id++;
|
||||
}
|
||||
// if this is cancel task make sure to clean up pending tasks
|
||||
if (task.type == SERVER_TASK_TYPE_CANCEL) {
|
||||
cleanup_pending_task(task.id_target);
|
||||
}
|
||||
QUE_DBG("new task, id = %d/%d, front = %d\n", task.id, (int) tasks.size(), front);
|
||||
if (front) {
|
||||
queue_tasks.push_front(std::move(task));
|
||||
} else {
|
||||
queue_tasks.push_back(std::move(task));
|
||||
}
|
||||
}
|
||||
condition_tasks.notify_one();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void server_queue::defer(server_task && task) {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
QUE_DBG("defer task, id = %d\n", task.id);
|
||||
queue_tasks_deferred.push_back(std::move(task));
|
||||
condition_tasks.notify_one();
|
||||
}
|
||||
|
||||
int server_queue::get_new_id() {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
int new_id = id++;
|
||||
return new_id;
|
||||
}
|
||||
|
||||
void server_queue::on_new_task(std::function<void(server_task &&)> callback) {
|
||||
callback_new_task = std::move(callback);
|
||||
}
|
||||
|
||||
void server_queue::on_update_slots(std::function<void(void)> callback) {
|
||||
callback_update_slots = std::move(callback);
|
||||
}
|
||||
|
||||
void server_queue::pop_deferred_task() {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
if (!queue_tasks_deferred.empty()) {
|
||||
queue_tasks.emplace_front(std::move(queue_tasks_deferred.front()));
|
||||
queue_tasks_deferred.pop_front();
|
||||
}
|
||||
condition_tasks.notify_one();
|
||||
}
|
||||
|
||||
void server_queue::terminate() {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
running = false;
|
||||
condition_tasks.notify_all();
|
||||
}
|
||||
|
||||
void server_queue::start_loop() {
|
||||
running = true;
|
||||
|
||||
while (true) {
|
||||
QUE_DBG("%s", "processing new tasks\n");
|
||||
|
||||
while (true) {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
if (!running) {
|
||||
QUE_DBG("%s", "terminate\n");
|
||||
return;
|
||||
}
|
||||
if (queue_tasks.empty()) {
|
||||
lock.unlock();
|
||||
break;
|
||||
}
|
||||
server_task task = std::move(queue_tasks.front());
|
||||
queue_tasks.pop_front();
|
||||
lock.unlock();
|
||||
|
||||
QUE_DBG("processing task, id = %d\n", task.id);
|
||||
callback_new_task(std::move(task));
|
||||
}
|
||||
|
||||
// all tasks in the current loop is processed, slots data is now ready
|
||||
QUE_DBG("%s", "update slots\n");
|
||||
|
||||
callback_update_slots();
|
||||
|
||||
QUE_DBG("%s", "waiting for new tasks\n");
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
if (!running) {
|
||||
QUE_DBG("%s", "terminate\n");
|
||||
return;
|
||||
}
|
||||
if (queue_tasks.empty()) {
|
||||
condition_tasks.wait(lock, [&]{
|
||||
return (!queue_tasks.empty() || !running);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void server_queue::cleanup_pending_task(int id_target) {
|
||||
// no need lock because this is called exclusively by post()
|
||||
auto rm_func = [id_target](const server_task & task) {
|
||||
return task.id == id_target;
|
||||
};
|
||||
queue_tasks.erase(
|
||||
std::remove_if(queue_tasks.begin(), queue_tasks.end(), rm_func),
|
||||
queue_tasks.end());
|
||||
queue_tasks_deferred.erase(
|
||||
std::remove_if(queue_tasks_deferred.begin(), queue_tasks_deferred.end(), rm_func),
|
||||
queue_tasks_deferred.end());
|
||||
}
|
||||
|
||||
//
|
||||
// server_response
|
||||
//
|
||||
|
||||
void server_response::add_waiting_task_id(int id_task) {
|
||||
SRV_DBG("add task %d to waiting list. current waiting = %d (before add)\n", id_task, (int) waiting_task_ids.size());
|
||||
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
waiting_task_ids.insert(id_task);
|
||||
}
|
||||
|
||||
void server_response::add_waiting_tasks(const std::vector<server_task> & tasks) {
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
|
||||
for (const auto & task : tasks) {
|
||||
SRV_DBG("add task %d to waiting list. current waiting = %d (before add)\n", task.id, (int) waiting_task_ids.size());
|
||||
waiting_task_ids.insert(task.id);
|
||||
}
|
||||
}
|
||||
|
||||
void server_response::remove_waiting_task_id(int id_task) {
|
||||
SRV_DBG("remove task %d from waiting list. current waiting = %d (before remove)\n", id_task, (int) waiting_task_ids.size());
|
||||
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
waiting_task_ids.erase(id_task);
|
||||
// make sure to clean up all pending results
|
||||
queue_results.erase(
|
||||
std::remove_if(queue_results.begin(), queue_results.end(), [id_task](const server_task_result_ptr & res) {
|
||||
return res->id == id_task;
|
||||
}),
|
||||
queue_results.end());
|
||||
}
|
||||
|
||||
void server_response::remove_waiting_task_ids(const std::unordered_set<int> & id_tasks) {
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
|
||||
for (const auto & id_task : id_tasks) {
|
||||
SRV_DBG("remove task %d from waiting list. current waiting = %d (before remove)\n", id_task, (int) waiting_task_ids.size());
|
||||
waiting_task_ids.erase(id_task);
|
||||
}
|
||||
}
|
||||
|
||||
server_task_result_ptr server_response::recv(const std::unordered_set<int> & id_tasks) {
|
||||
while (true) {
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
condition_results.wait(lock, [&]{
|
||||
if (!running) {
|
||||
SRV_DBG("%s : queue result stop\n", __func__);
|
||||
std::terminate(); // we cannot return here since the caller is HTTP code
|
||||
}
|
||||
return !queue_results.empty();
|
||||
});
|
||||
|
||||
for (size_t i = 0; i < queue_results.size(); i++) {
|
||||
if (id_tasks.find(queue_results[i]->id) != id_tasks.end()) {
|
||||
server_task_result_ptr res = std::move(queue_results[i]);
|
||||
queue_results.erase(queue_results.begin() + i);
|
||||
return res;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// should never reach here
|
||||
}
|
||||
|
||||
server_task_result_ptr server_response::recv_with_timeout(const std::unordered_set<int> & id_tasks, int timeout) {
|
||||
while (true) {
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
|
||||
for (int i = 0; i < (int) queue_results.size(); i++) {
|
||||
if (id_tasks.find(queue_results[i]->id) != id_tasks.end()) {
|
||||
server_task_result_ptr res = std::move(queue_results[i]);
|
||||
queue_results.erase(queue_results.begin() + i);
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
std::cv_status cr_res = condition_results.wait_for(lock, std::chrono::seconds(timeout));
|
||||
if (!running) {
|
||||
SRV_DBG("%s : queue result stop\n", __func__);
|
||||
std::terminate(); // we cannot return here since the caller is HTTP code
|
||||
}
|
||||
if (cr_res == std::cv_status::timeout) {
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// should never reach here
|
||||
}
|
||||
|
||||
server_task_result_ptr server_response::recv(int id_task) {
|
||||
std::unordered_set<int> id_tasks = {id_task};
|
||||
return recv(id_tasks);
|
||||
}
|
||||
|
||||
void server_response::send(server_task_result_ptr && result) {
|
||||
SRV_DBG("sending result for task id = %d\n", result->id);
|
||||
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
for (const auto & id_task : waiting_task_ids) {
|
||||
if (result->id == id_task) {
|
||||
SRV_DBG("task id = %d pushed to result queue\n", result->id);
|
||||
|
||||
queue_results.emplace_back(std::move(result));
|
||||
condition_results.notify_all();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void server_response::terminate() {
|
||||
running = false;
|
||||
condition_results.notify_all();
|
||||
}
|
||||
|
|
@ -0,0 +1,112 @@
|
|||
#pragma once
|
||||
|
||||
#include "server-common.h"
|
||||
#include "server-task.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <deque>
|
||||
#include <mutex>
|
||||
#include <unordered_set>
|
||||
|
||||
struct server_queue {
|
||||
private:
|
||||
int id = 0;
|
||||
bool running;
|
||||
|
||||
// queues
|
||||
std::deque<server_task> queue_tasks;
|
||||
std::deque<server_task> queue_tasks_deferred;
|
||||
|
||||
std::mutex mutex_tasks;
|
||||
std::condition_variable condition_tasks;
|
||||
|
||||
// callback functions
|
||||
std::function<void(server_task &&)> callback_new_task;
|
||||
std::function<void(void)> callback_update_slots;
|
||||
|
||||
public:
|
||||
// Add a new task to the end of the queue
|
||||
int post(server_task && task, bool front = false);
|
||||
|
||||
// multi-task version of post()
|
||||
int post(std::vector<server_task> && tasks, bool front = false);
|
||||
|
||||
// Add a new task, but defer until one slot is available
|
||||
void defer(server_task && task);
|
||||
|
||||
// Get the next id for creating a new task
|
||||
int get_new_id();
|
||||
|
||||
// Register function to process a new task
|
||||
void on_new_task(std::function<void(server_task &&)> callback);
|
||||
|
||||
// Register the function to be called when all slots data is ready to be processed
|
||||
void on_update_slots(std::function<void(void)> callback);
|
||||
|
||||
// Call when the state of one slot is changed, it will move one task from deferred to main queue
|
||||
void pop_deferred_task();
|
||||
|
||||
// end the start_loop routine
|
||||
void terminate();
|
||||
|
||||
/**
|
||||
* Main loop consists of these steps:
|
||||
* - Wait until a new task arrives
|
||||
* - Process the task (i.e. maybe copy data into slot)
|
||||
* - Check if multitask is finished
|
||||
* - Update all slots
|
||||
*/
|
||||
void start_loop();
|
||||
|
||||
// for metrics
|
||||
size_t queue_tasks_deferred_size() {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
return queue_tasks_deferred.size();
|
||||
}
|
||||
|
||||
private:
|
||||
void cleanup_pending_task(int id_target);
|
||||
};
|
||||
|
||||
struct server_response {
|
||||
private:
|
||||
bool running = true;
|
||||
|
||||
// for keeping track of all tasks waiting for the result
|
||||
std::unordered_set<int> waiting_task_ids;
|
||||
|
||||
// the main result queue (using ptr for polymorphism)
|
||||
std::vector<server_task_result_ptr> queue_results;
|
||||
|
||||
std::mutex mutex_results;
|
||||
std::condition_variable condition_results;
|
||||
|
||||
public:
|
||||
// add the id_task to the list of tasks waiting for response
|
||||
void add_waiting_task_id(int id_task);
|
||||
|
||||
void add_waiting_tasks(const std::vector<server_task> & tasks);
|
||||
|
||||
// when the request is finished, we can remove task associated with it
|
||||
void remove_waiting_task_id(int id_task);
|
||||
|
||||
// remove multiple tasks from waiting list
|
||||
void remove_waiting_task_ids(const std::unordered_set<int> & id_tasks);
|
||||
|
||||
// This function blocks the thread until there is a response for one of the id_tasks
|
||||
server_task_result_ptr recv(const std::unordered_set<int> & id_tasks);
|
||||
|
||||
// same as recv(), but have timeout in seconds
|
||||
// if timeout is reached, nullptr is returned
|
||||
server_task_result_ptr recv_with_timeout(const std::unordered_set<int> & id_tasks, int timeout);
|
||||
|
||||
// single-task version of recv()
|
||||
server_task_result_ptr recv(int id_task);
|
||||
|
||||
// Send a new result to a waiting id_task
|
||||
void send(server_task_result_ptr && result);
|
||||
|
||||
// terminate the waiting loop
|
||||
void terminate();
|
||||
};
|
||||
|
|
@ -1,3 +1,5 @@
|
|||
#pragma once
|
||||
|
||||
#include "common.h"
|
||||
#include "llama.h"
|
||||
#include "chat.h"
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
#include "server-common.h"
|
||||
#include "server-http.h"
|
||||
#include "server-task.h"
|
||||
#include "server-queue.h"
|
||||
|
||||
#include "arg.h"
|
||||
#include "common.h"
|
||||
|
|
@ -11,13 +12,9 @@
|
|||
#include "mtmd.h"
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <cstddef>
|
||||
#include <cinttypes>
|
||||
#include <deque>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <signal.h>
|
||||
#include <thread>
|
||||
#include <unordered_set>
|
||||
|
|
@ -447,303 +444,6 @@ struct server_metrics {
|
|||
}
|
||||
};
|
||||
|
||||
struct server_queue {
|
||||
int id = 0;
|
||||
bool running;
|
||||
|
||||
// queues
|
||||
std::deque<server_task> queue_tasks;
|
||||
std::deque<server_task> queue_tasks_deferred;
|
||||
|
||||
std::mutex mutex_tasks;
|
||||
std::condition_variable condition_tasks;
|
||||
|
||||
// callback functions
|
||||
std::function<void(server_task &&)> callback_new_task;
|
||||
std::function<void(void)> callback_update_slots;
|
||||
|
||||
// Add a new task to the end of the queue
|
||||
int post(server_task && task, bool front = false) {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
GGML_ASSERT(task.id != -1);
|
||||
// if this is cancel task make sure to clean up pending tasks
|
||||
if (task.type == SERVER_TASK_TYPE_CANCEL) {
|
||||
cleanup_pending_task(task.id_target);
|
||||
}
|
||||
const int task_id = task.id;
|
||||
QUE_DBG("new task, id = %d, front = %d\n", task_id, front);
|
||||
if (front) {
|
||||
queue_tasks.push_front(std::move(task));
|
||||
} else {
|
||||
queue_tasks.push_back(std::move(task));
|
||||
}
|
||||
condition_tasks.notify_one();
|
||||
return task_id;
|
||||
}
|
||||
|
||||
// multi-task version of post()
|
||||
int post(std::vector<server_task> && tasks, bool front = false) {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
for (auto & task : tasks) {
|
||||
if (task.id == -1) {
|
||||
task.id = id++;
|
||||
}
|
||||
// if this is cancel task make sure to clean up pending tasks
|
||||
if (task.type == SERVER_TASK_TYPE_CANCEL) {
|
||||
cleanup_pending_task(task.id_target);
|
||||
}
|
||||
QUE_DBG("new task, id = %d/%d, front = %d\n", task.id, (int) tasks.size(), front);
|
||||
if (front) {
|
||||
queue_tasks.push_front(std::move(task));
|
||||
} else {
|
||||
queue_tasks.push_back(std::move(task));
|
||||
}
|
||||
}
|
||||
condition_tasks.notify_one();
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Add a new task, but defer until one slot is available
|
||||
void defer(server_task && task) {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
QUE_DBG("defer task, id = %d\n", task.id);
|
||||
queue_tasks_deferred.push_back(std::move(task));
|
||||
condition_tasks.notify_one();
|
||||
}
|
||||
|
||||
// Get the next id for creating a new task
|
||||
int get_new_id() {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
int new_id = id++;
|
||||
return new_id;
|
||||
}
|
||||
|
||||
// Register function to process a new task
|
||||
void on_new_task(std::function<void(server_task &&)> callback) {
|
||||
callback_new_task = std::move(callback);
|
||||
}
|
||||
|
||||
// Register the function to be called when all slots data is ready to be processed
|
||||
void on_update_slots(std::function<void(void)> callback) {
|
||||
callback_update_slots = std::move(callback);
|
||||
}
|
||||
|
||||
// Call when the state of one slot is changed, it will move one task from deferred to main queue
|
||||
void pop_deferred_task() {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
if (!queue_tasks_deferred.empty()) {
|
||||
queue_tasks.emplace_front(std::move(queue_tasks_deferred.front()));
|
||||
queue_tasks_deferred.pop_front();
|
||||
}
|
||||
condition_tasks.notify_one();
|
||||
}
|
||||
|
||||
// end the start_loop routine
|
||||
void terminate() {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
running = false;
|
||||
condition_tasks.notify_all();
|
||||
}
|
||||
|
||||
/**
|
||||
* Main loop consists of these steps:
|
||||
* - Wait until a new task arrives
|
||||
* - Process the task (i.e. maybe copy data into slot)
|
||||
* - Check if multitask is finished
|
||||
* - Update all slots
|
||||
*/
|
||||
void start_loop() {
|
||||
running = true;
|
||||
|
||||
while (true) {
|
||||
QUE_DBG("%s", "processing new tasks\n");
|
||||
|
||||
while (true) {
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
if (!running) {
|
||||
QUE_DBG("%s", "terminate\n");
|
||||
return;
|
||||
}
|
||||
if (queue_tasks.empty()) {
|
||||
lock.unlock();
|
||||
break;
|
||||
}
|
||||
server_task task = std::move(queue_tasks.front());
|
||||
queue_tasks.pop_front();
|
||||
lock.unlock();
|
||||
|
||||
QUE_DBG("processing task, id = %d\n", task.id);
|
||||
callback_new_task(std::move(task));
|
||||
}
|
||||
|
||||
// all tasks in the current loop is processed, slots data is now ready
|
||||
QUE_DBG("%s", "update slots\n");
|
||||
|
||||
callback_update_slots();
|
||||
|
||||
QUE_DBG("%s", "waiting for new tasks\n");
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_tasks);
|
||||
if (!running) {
|
||||
QUE_DBG("%s", "terminate\n");
|
||||
return;
|
||||
}
|
||||
if (queue_tasks.empty()) {
|
||||
condition_tasks.wait(lock, [&]{
|
||||
return (!queue_tasks.empty() || !running);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
void cleanup_pending_task(int id_target) {
|
||||
// no need lock because this is called exclusively by post()
|
||||
auto rm_func = [id_target](const server_task & task) {
|
||||
return task.id == id_target;
|
||||
};
|
||||
queue_tasks.erase(
|
||||
std::remove_if(queue_tasks.begin(), queue_tasks.end(), rm_func),
|
||||
queue_tasks.end());
|
||||
queue_tasks_deferred.erase(
|
||||
std::remove_if(queue_tasks_deferred.begin(), queue_tasks_deferred.end(), rm_func),
|
||||
queue_tasks_deferred.end());
|
||||
}
|
||||
};
|
||||
|
||||
struct server_response {
|
||||
bool running = true;
|
||||
|
||||
// for keeping track of all tasks waiting for the result
|
||||
std::unordered_set<int> waiting_task_ids;
|
||||
|
||||
// the main result queue (using ptr for polymorphism)
|
||||
std::vector<server_task_result_ptr> queue_results;
|
||||
|
||||
std::mutex mutex_results;
|
||||
std::condition_variable condition_results;
|
||||
|
||||
// add the id_task to the list of tasks waiting for response
|
||||
void add_waiting_task_id(int id_task) {
|
||||
SRV_DBG("add task %d to waiting list. current waiting = %d (before add)\n", id_task, (int) waiting_task_ids.size());
|
||||
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
waiting_task_ids.insert(id_task);
|
||||
}
|
||||
|
||||
void add_waiting_tasks(const std::vector<server_task> & tasks) {
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
|
||||
for (const auto & task : tasks) {
|
||||
SRV_DBG("add task %d to waiting list. current waiting = %d (before add)\n", task.id, (int) waiting_task_ids.size());
|
||||
waiting_task_ids.insert(task.id);
|
||||
}
|
||||
}
|
||||
|
||||
// when the request is finished, we can remove task associated with it
|
||||
void remove_waiting_task_id(int id_task) {
|
||||
SRV_DBG("remove task %d from waiting list. current waiting = %d (before remove)\n", id_task, (int) waiting_task_ids.size());
|
||||
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
waiting_task_ids.erase(id_task);
|
||||
// make sure to clean up all pending results
|
||||
queue_results.erase(
|
||||
std::remove_if(queue_results.begin(), queue_results.end(), [id_task](const server_task_result_ptr & res) {
|
||||
return res->id == id_task;
|
||||
}),
|
||||
queue_results.end());
|
||||
}
|
||||
|
||||
void remove_waiting_task_ids(const std::unordered_set<int> & id_tasks) {
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
|
||||
for (const auto & id_task : id_tasks) {
|
||||
SRV_DBG("remove task %d from waiting list. current waiting = %d (before remove)\n", id_task, (int) waiting_task_ids.size());
|
||||
waiting_task_ids.erase(id_task);
|
||||
}
|
||||
}
|
||||
|
||||
// This function blocks the thread until there is a response for one of the id_tasks
|
||||
server_task_result_ptr recv(const std::unordered_set<int> & id_tasks) {
|
||||
while (true) {
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
condition_results.wait(lock, [&]{
|
||||
if (!running) {
|
||||
SRV_DBG("%s : queue result stop\n", __func__);
|
||||
std::terminate(); // we cannot return here since the caller is HTTP code
|
||||
}
|
||||
return !queue_results.empty();
|
||||
});
|
||||
|
||||
for (size_t i = 0; i < queue_results.size(); i++) {
|
||||
if (id_tasks.find(queue_results[i]->id) != id_tasks.end()) {
|
||||
server_task_result_ptr res = std::move(queue_results[i]);
|
||||
queue_results.erase(queue_results.begin() + i);
|
||||
return res;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// should never reach here
|
||||
}
|
||||
|
||||
// same as recv(), but have timeout in seconds
|
||||
// if timeout is reached, nullptr is returned
|
||||
server_task_result_ptr recv_with_timeout(const std::unordered_set<int> & id_tasks, int timeout) {
|
||||
while (true) {
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
|
||||
for (int i = 0; i < (int) queue_results.size(); i++) {
|
||||
if (id_tasks.find(queue_results[i]->id) != id_tasks.end()) {
|
||||
server_task_result_ptr res = std::move(queue_results[i]);
|
||||
queue_results.erase(queue_results.begin() + i);
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
std::cv_status cr_res = condition_results.wait_for(lock, std::chrono::seconds(timeout));
|
||||
if (!running) {
|
||||
SRV_DBG("%s : queue result stop\n", __func__);
|
||||
std::terminate(); // we cannot return here since the caller is HTTP code
|
||||
}
|
||||
if (cr_res == std::cv_status::timeout) {
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// should never reach here
|
||||
}
|
||||
|
||||
// single-task version of recv()
|
||||
server_task_result_ptr recv(int id_task) {
|
||||
std::unordered_set<int> id_tasks = {id_task};
|
||||
return recv(id_tasks);
|
||||
}
|
||||
|
||||
// Send a new result to a waiting id_task
|
||||
void send(server_task_result_ptr && result) {
|
||||
SRV_DBG("sending result for task id = %d\n", result->id);
|
||||
|
||||
std::unique_lock<std::mutex> lock(mutex_results);
|
||||
for (const auto & id_task : waiting_task_ids) {
|
||||
if (result->id == id_task) {
|
||||
SRV_DBG("task id = %d pushed to result queue\n", result->id);
|
||||
|
||||
queue_results.emplace_back(std::move(result));
|
||||
condition_results.notify_all();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// terminate the waiting loop
|
||||
void terminate() {
|
||||
running = false;
|
||||
condition_results.notify_all();
|
||||
}
|
||||
};
|
||||
|
||||
struct server_context {
|
||||
common_params params_base;
|
||||
|
||||
|
|
@ -1748,7 +1448,7 @@ struct server_context {
|
|||
res->slots_data = std::move(slots_data);
|
||||
res->n_idle_slots = n_idle_slots;
|
||||
res->n_processing_slots = n_processing_slots;
|
||||
res->n_tasks_deferred = queue_tasks.queue_tasks_deferred.size();
|
||||
res->n_tasks_deferred = queue_tasks.queue_tasks_deferred_size();
|
||||
res->t_start = metrics.t_start;
|
||||
|
||||
res->n_prompt_tokens_processed_total = metrics.n_prompt_tokens_processed_total;
|
||||
|
|
|
|||
Loading…
Reference in New Issue