This commit is contained in:
Ettore Di Giacinto 2025-05-15 23:17:00 +02:00
parent 1dc76be5f8
commit b087a44fa0

View file

@ -3568,6 +3568,10 @@ inline void signal_handler(int signal) {
bool loaded_model; // TODO: add a mutex for this, but happens only once loading the model bool loaded_model; // TODO: add a mutex for this, but happens only once loading the model
static void start_llama_server(server_context& ctx_server) { static void start_llama_server(server_context& ctx_server) {
LOG_INF("%s: starting llama server\n", __func__);
LOG_INF("%s: waiting for model to be loaded\n", __func__);
// Wait for model to be loaded first // Wait for model to be loaded first
while (!loaded_model) { while (!loaded_model) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
@ -3873,6 +3877,11 @@ public:
llama_backend_init(); llama_backend_init();
llama_numa_init(params.numa); llama_numa_init(params.numa);
LOG_INF("system info: n_threads = %d, n_threads_batch = %d, total_threads = %d\n", params.cpuparams.n_threads, params.cpuparams_batch.n_threads, std::thread::hardware_concurrency());
LOG_INF("\n");
LOG_INF("%s\n", common_params_get_system_info(params).c_str());
LOG_INF("\n");
// load the model // load the model
if (!ctx_server.load_model(params)) { if (!ctx_server.load_model(params)) {
result->set_message("Failed loading model"); result->set_message("Failed loading model");
@ -4071,27 +4080,31 @@ public:
} }
grpc::Status Predict(ServerContext* context, const backend::PredictOptions* request, backend::Reply* reply) { grpc::Status Predict(ServerContext* context, const backend::PredictOptions* request, backend::Reply* reply) {
std::cout << "[DEBUG] Starting Predict request processing" << std::endl;
json data = parse_options(true, request); json data = parse_options(true, request);
std::cout << "[DEBUG] Parsed request options" << std::endl;
//Raise error if embeddings is set to true //Raise error if embeddings is set to true
if (ctx_server.params_base.embedding) { if (ctx_server.params_base.embedding) {
std::cout << "[DEBUG] Error: Embedding mode not supported in streaming" << std::endl;
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Embedding is not supported in streaming mode"); return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Embedding is not supported in streaming mode");
} }
auto completion_id = gen_chatcmplid(); auto completion_id = gen_chatcmplid();
std::cout << "[DEBUG] Generated completion ID: " << completion_id << std::endl;
std::unordered_set<int> task_ids; std::unordered_set<int> task_ids;
try { try {
std::vector<server_task> tasks; std::vector<server_task> tasks;
const auto & prompt = data.at("prompt"); const auto & prompt = data.at("prompt");
const auto type = SERVER_TASK_TYPE_COMPLETION; const auto type = SERVER_TASK_TYPE_COMPLETION;
// TODO: this log can become very long, put it behind a flag or think about a more compact format std::cout << "[DEBUG] Processing prompt of type: " << type << std::endl;
//SRV_DBG("Prompt: %s\n", prompt.is_string() ? prompt.get<std::string>().c_str() : prompt.dump(2).c_str());
std::vector<raw_buffer> files; std::vector<raw_buffer> files;
const auto &images_data = data.find("image_data"); const auto &images_data = data.find("image_data");
if (images_data != data.end() && images_data->is_array()) if (images_data != data.end() && images_data->is_array())
{ {
std::cout << "[DEBUG] Found " << images_data->size() << " images to process" << std::endl;
for (const auto &img : *images_data) for (const auto &img : *images_data)
{ {
const std::vector<uint8_t> image_buffer = base64_decode(img["data"].get<std::string>()); const std::vector<uint8_t> image_buffer = base64_decode(img["data"].get<std::string>());
@ -4105,11 +4118,14 @@ public:
const bool has_mtmd = ctx_server.mctx != nullptr; const bool has_mtmd = ctx_server.mctx != nullptr;
{ {
if (!has_mtmd && !files.empty()) { if (!has_mtmd && !files.empty()) {
std::cout << "[DEBUG] Error: Server does not support multimodal processing" << std::endl;
throw std::runtime_error("This server does not support multimodal"); throw std::runtime_error("This server does not support multimodal");
} }
for (auto & file : files) { for (auto & file : files) {
std::cout << "[DEBUG] Processing image file of size: " << file.size() << " bytes" << std::endl;
mtmd::bitmap bmp(mtmd_helper_bitmap_init_from_buf(file.data(), file.size())); mtmd::bitmap bmp(mtmd_helper_bitmap_init_from_buf(file.data(), file.size()));
if (!bmp.ptr) { if (!bmp.ptr) {
std::cout << "[DEBUG] Error: Failed to load image" << std::endl;
throw std::runtime_error("Failed to load image"); throw std::runtime_error("Failed to load image");
} }
// calculate bitmap hash (for KV caching) // calculate bitmap hash (for KV caching)
@ -4122,10 +4138,12 @@ public:
// process prompt // process prompt
std::vector<server_tokens> inputs; std::vector<server_tokens> inputs;
if (!prompt.is_string()) { if (!prompt.is_string()) {
std::cout << "[DEBUG] Error: Prompt must be a string" << std::endl;
throw std::runtime_error("prompt must be a string"); throw std::runtime_error("prompt must be a string");
} }
if (has_mtmd) { if (has_mtmd) {
std::cout << "[DEBUG] Processing multimodal input" << std::endl;
// multimodal // multimodal
std::string prompt_str = prompt.get<std::string>(); std::string prompt_str = prompt.get<std::string>();
mtmd_input_text inp_txt = { mtmd_input_text inp_txt = {
@ -4141,12 +4159,14 @@ public:
bitmaps_c_ptr.data(), bitmaps_c_ptr.data(),
bitmaps_c_ptr.size()); bitmaps_c_ptr.size());
if (tokenized != 0) { if (tokenized != 0) {
std::cout << "[DEBUG] Error: Failed to tokenize multimodal prompt" << std::endl;
throw std::runtime_error("Failed to tokenize prompt"); throw std::runtime_error("Failed to tokenize prompt");
} }
server_tokens tmp(chunks, true); server_tokens tmp(chunks, true);
inputs.push_back(std::move(tmp)); inputs.push_back(std::move(tmp));
} else { } else {
std::cout << "[DEBUG] Processing standard text input" << std::endl;
// non-multimodal version // non-multimodal version
auto tokenized_prompts = tokenize_input_prompts(ctx_server.vocab, prompt, true, true); auto tokenized_prompts = tokenize_input_prompts(ctx_server.vocab, prompt, true, true);
for (auto & p : tokenized_prompts) { for (auto & p : tokenized_prompts) {
@ -4155,6 +4175,7 @@ public:
} }
} }
std::cout << "[DEBUG] Created " << inputs.size() << " input tasks" << std::endl;
tasks.reserve(inputs.size()); tasks.reserve(inputs.size());
for (size_t i = 0; i < inputs.size(); i++) { for (size_t i = 0; i < inputs.size(); i++) {
server_task task = server_task(type); server_task task = server_task(type);
@ -4178,31 +4199,37 @@ public:
} }
task_ids = server_task::get_list_id(tasks); task_ids = server_task::get_list_id(tasks);
std::cout << "[DEBUG] Created " << tasks.size() << " tasks with IDs" << std::endl;
ctx_server.queue_results.add_waiting_tasks(tasks); ctx_server.queue_results.add_waiting_tasks(tasks);
ctx_server.queue_tasks.post(std::move(tasks)); ctx_server.queue_tasks.post(std::move(tasks));
} catch (const std::exception & e) { } catch (const std::exception & e) {
std::cout << "[DEBUG] Error occurred: " << e.what() << std::endl;
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, e.what()); return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, e.what());
} }
std::cout << "[DEBUG] Waiting for results..." << std::endl;
ctx_server.receive_multi_results(task_ids, [&](std::vector<server_task_result_ptr> & results) { ctx_server.receive_multi_results(task_ids, [&](std::vector<server_task_result_ptr> & results) {
if (results.size() == 1) { std::cout << "[DEBUG] Received " << results.size() << " results" << std::endl;
// single result if (results.size() == 1) {
reply->set_message(results[0]->to_json()); // single result
} else { reply->set_message(results[0]->to_json());
// multiple results (multitask) } else {
json arr = json::array(); // multiple results (multitask)
for (auto & res : results) { json arr = json::array();
arr.push_back(res->to_json()); for (auto & res : results) {
arr.push_back(res->to_json());
}
reply->set_message(arr);
} }
reply->set_message(arr);
}
}, [&](const json & error_data) { }, [&](const json & error_data) {
std::cout << "[DEBUG] Error in results: " << error_data.value("content", "") << std::endl;
reply->set_message(error_data.value("content", "")); reply->set_message(error_data.value("content", ""));
}, [&]() { }, [&]() {
return false; return false;
}); });
ctx_server.queue_results.remove_waiting_task_ids(task_ids); ctx_server.queue_results.remove_waiting_task_ids(task_ids);
std::cout << "[DEBUG] Predict request completed successfully" << std::endl;
return grpc::Status::OK; return grpc::Status::OK;
} }