diff --git a/qa/L0_vertex_ai/vertex_ai_test.py b/qa/L0_vertex_ai/vertex_ai_test.py index b6f9fc42b4..df913d0016 100755 --- a/qa/L0_vertex_ai/vertex_ai_test.py +++ b/qa/L0_vertex_ai/vertex_ai_test.py @@ -324,5 +324,115 @@ def test_malformed_binary_header_large_number(self): ) + # ===================================================================== + # KServe V1 Protocol Tests + # ===================================================================== + + def test_v1_predict(self): + """V1 format: {"instances": [...]} -> {"predictions": [...]}""" + request_body = { + "instances": [ + { + "INPUT0": self.input_data_, + "INPUT1": self.input_data_, + } + ] + } + + import json + + headers = {"Content-Type": "application/json"} + r = requests.post( + self.url_, data=json.dumps(request_body), headers=headers + ) + r.raise_for_status() + + result = r.json() + self.assertIn("predictions", result) + self.assertEqual(len(result["predictions"]), 1) + + prediction = result["predictions"][0] + self.assertIn("OUTPUT0", prediction) + self.assertIn("OUTPUT1", prediction) + + for i in range(16): + self.assertEqual(prediction["OUTPUT0"][i], self.expected_output0_data_[i]) + self.assertEqual(prediction["OUTPUT1"][i], self.expected_output1_data_[i]) + + def test_v1_predict_with_parameters(self): + """V1 format with top-level parameters passthrough""" + import json + + request_body = { + "instances": [ + { + "INPUT0": self.input_data_, + "INPUT1": self.input_data_, + } + ], + "parameters": {}, + } + + headers = {"Content-Type": "application/json"} + r = requests.post( + self.url_, data=json.dumps(request_body), headers=headers + ) + r.raise_for_status() + + result = r.json() + self.assertIn("predictions", result) + prediction = result["predictions"][0] + for i in range(16): + self.assertEqual(prediction["OUTPUT0"][i], self.expected_output0_data_[i]) + self.assertEqual(prediction["OUTPUT1"][i], self.expected_output1_data_[i]) + + def test_v2_predict_backward_compat(self): + """V2 format still works (backward compatibility regression test)""" + inputs = [] + outputs = [] + inputs.append(httpclient.InferInput("INPUT0", [1, 16], "INT32")) + inputs.append(httpclient.InferInput("INPUT1", [1, 16], "INT32")) + + input_data = np.array(self.input_data_, dtype=np.int32) + input_data = np.expand_dims(input_data, axis=0) + inputs[0].set_data_from_numpy(input_data, binary_data=False) + inputs[1].set_data_from_numpy(input_data, binary_data=False) + + outputs.append(httpclient.InferRequestedOutput("OUTPUT0", binary_data=False)) + outputs.append(httpclient.InferRequestedOutput("OUTPUT1", binary_data=False)) + request_body, _ = httpclient.InferenceServerClient.generate_request_body( + inputs, outputs=outputs + ) + + headers = {"Content-Type": "application/json"} + r = requests.post(self.url_, data=request_body, headers=headers) + r.raise_for_status() + + result = httpclient.InferenceServerClient.parse_response_body(r._content) + output0_data = result.as_numpy("OUTPUT0") + output1_data = result.as_numpy("OUTPUT1") + for i in range(16): + self.assertEqual(output0_data[0][i], self.expected_output0_data_[i]) + self.assertEqual(output1_data[0][i], self.expected_output1_data_[i]) + + def test_v1_predict_empty_instances(self): + """V1 format with empty instances should return an error""" + import json + + request_body = {"instances": []} + + headers = {"Content-Type": "application/json"} + r = requests.post( + self.url_, data=json.dumps(request_body), headers=headers + ) + self.assertEqual( + 400, + r.status_code, + "Expected error code 400 for empty instances; got: {}".format( + r.status_code + ), + ) + + if __name__ == "__main__": unittest.main() diff --git a/src/common.cc b/src/common.cc index a7591b8324..703354330d 100644 --- a/src/common.cc +++ b/src/common.cc @@ -35,6 +35,7 @@ extern "C" { #include +#include } namespace triton { namespace server { @@ -163,6 +164,40 @@ DecodeBase64( return nullptr; } +TRITONSERVER_Error* +EncodeBase64( + const char* input, size_t input_len, std::string& encoded_data) +{ + if (input_len > static_cast(INT_MAX)) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + "input data exceeds the maximum allowed data size limit INT_MAX"); + } + + // Base64 output is ceil(input_len/3)*4 chars. libb64 inserts a newline + // every 72 OUTPUT chars, so newline count is based on output size. + size_t base64_chars = ((input_len + 2) / 3) * 4; + size_t max_encoded_size = base64_chars + (base64_chars / 72) + 4; + encoded_data.resize(max_encoded_size); + + base64_encodestate state; + base64_init_encodestate(&state); + + size_t encoded_len = base64_encode_block( + input, input_len, &encoded_data[0], &state); + encoded_len += base64_encode_blockend(&encoded_data[0] + encoded_len, &state); + + // Remove any trailing newlines added by libb64 + while (encoded_len > 0 && + (encoded_data[encoded_len - 1] == '\n' || + encoded_data[encoded_len - 1] == '\r')) { + encoded_len--; + } + encoded_data.resize(encoded_len); + + return nullptr; +} + TRITONSERVER_Error* ValidateSharedMemoryKey(const std::string& name, const std::string& shm_key) { diff --git a/src/common.h b/src/common.h index c49f79c12b..5415e950af 100644 --- a/src/common.h +++ b/src/common.h @@ -211,6 +211,15 @@ TRITONSERVER_Error* DecodeBase64( const char* input, size_t input_len, std::vector& decoded_data, size_t& decoded_size, const std::string& name); +/// Encodes binary data to a Base64 encoded string. +/// +/// \param input The raw binary data to encode. +/// \param input_len The length of the input data. +/// \param encoded_data A string to store the Base64 encoded result. +/// \return The error status. +TRITONSERVER_Error* EncodeBase64( + const char* input, size_t input_len, std::string& encoded_data); + /// Validate shared memory key /// diff --git a/src/http_server.cc b/src/http_server.cc index d351956449..d43f0dbe45 100644 --- a/src/http_server.cc +++ b/src/http_server.cc @@ -502,13 +502,36 @@ JsonBytesArrayByteSize( *byte_size += byte_size_; } } else { - // Serialized data size is the length of the string itself plus - // 4 bytes to record the string length. - const char* str; - size_t len = 0; - RETURN_MSG_IF_ERR( - tensor_data.AsString(&str, &len), "Unable to parse JSON bytes array"); - *byte_size += len + sizeof(uint32_t); + // Check for {"b64": "..."} binary encoding (TF Serving V1 protocol). + // Compute the exact decoded size from the base64 string. + triton::common::TritonJson::Value b64_json; + if (tensor_data.Find("b64", &b64_json)) { + const char* b64_str; + size_t b64_len = 0; + RETURN_MSG_IF_ERR( + tensor_data.MemberAsString("b64", &b64_str, &b64_len), + "Unable to parse 'b64' value in JSON bytes array"); + // Compute exact base64 decoded size. + // Standard base64: every 4 chars encode 3 bytes, with '=' padding. + // Also handle unpadded input where length is not a multiple of 4. + size_t padding = 0; + if (b64_len >= 1 && b64_str[b64_len - 1] == '=') padding++; + if (b64_len >= 2 && b64_str[b64_len - 2] == '=') padding++; + size_t effective_len = b64_len - padding; + size_t decoded_len = + (effective_len / 4) * 3 + + (effective_len % 4 == 3 ? 2 : (effective_len % 4 == 2 ? 1 : 0)); + *byte_size += decoded_len + sizeof(uint32_t); + } else { + // Serialized data size is the length of the string itself plus + // 4 bytes to record the string length. + const char* str; + size_t len = 0; + RETURN_MSG_IF_ERR( + tensor_data.AsString(&str, &len), + "Unable to parse JSON bytes array"); + *byte_size += len + sizeof(uint32_t); + } } return nullptr; // success @@ -656,7 +679,25 @@ ReadDataFromJsonHelper( case TRITONSERVER_TYPE_BYTES: { const char* cstr{nullptr}; size_t len{0}; - RETURN_IF_ERR(tensor_data.AsString(&cstr, &len)); + // Storage for decoded base64 data; must outlive memcpy below. + std::vector decoded_data; + + // Check for {"b64": "..."} binary encoding (TF Serving V1 protocol). + triton::common::TritonJson::Value b64_json; + if (tensor_data.Find("b64", &b64_json)) { + const char* b64_str{nullptr}; + size_t b64_len{0}; + RETURN_IF_ERR( + tensor_data.MemberAsString("b64", &b64_str, &b64_len)); + size_t decoded_size; + RETURN_IF_ERR( + DecodeBase64(b64_str, b64_len, decoded_data, decoded_size, "b64")); + cstr = decoded_data.data(); + len = decoded_size; + } else { + RETURN_IF_ERR(tensor_data.AsString(&cstr, &len)); + } + if (len > INT64_MAX) { return TRITONSERVER_ErrorNew( TRITONSERVER_ERROR_INTERNAL, @@ -1188,7 +1229,10 @@ HTTPAPIServer::HTTPAPIServer( R"(/v2/systemsharedmemory(?:/region/([^/]+))?/(status|register|unregister))"), cudasharedmemory_regex_( R"(/v2/cudasharedmemory(?:/region/([^/]+))?/(status|register|unregister))"), - trace_regex_(R"(/v2/trace/setting)"), max_input_size_(max_input_size), + trace_regex_(R"(/v2/trace/setting)"), + v1_model_regex_( + R"(/v1/models/([^/:]+)(?:/versions/([0-9]+))?(?:(?:/(metadata))|(?::([a-z]+)))?)"), + max_input_size_(max_input_size), restricted_apis_(restricted_apis) { // FIXME, don't cache server metadata. The http endpoint should @@ -4841,11 +4885,1130 @@ HTTPAPIServer::GenerateRequestClass::ExactMappingOutput( return nullptr; // success } +// ===================================================================== +// KServe V1 Protocol Implementation +// ===================================================================== + void -HTTPAPIServer::Handle(evhtp_request_t* req) +HTTPAPIServer::V1InferRequestClass::SetResponseHeader( + const bool has_binary_data, const size_t header_length) { - LOG_VERBOSE(1) << "HTTP request: " << req->method << " " - << req->uri->path->full; + // V1 responses are always pure JSON + AddContentTypeHeader(req_, "application/json"); +} + +void +HTTPAPIServer::V1InferRequestClass::InferResponseComplete( + TRITONSERVER_InferenceResponse* response, const uint32_t flags, void* userp) +{ + // Same pattern as InferRequestClass::InferResponseComplete + HTTPAPIServer::V1InferRequestClass* infer_request = + reinterpret_cast(userp); + + if (response != nullptr) { + ++infer_request->response_count_; + } + + TRITONSERVER_Error* err = nullptr; + if (infer_request->response_count_ != 1) { + err = TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, + std::string( + "expected a single response, got " + + std::to_string(infer_request->response_count_)) + .c_str()); + } else if (response != nullptr) { + err = infer_request->FinalizeResponse(response); +#ifdef TRITON_ENABLE_TRACING + if (infer_request->trace_ != nullptr) { + infer_request->trace_->CaptureTimestamp( + "INFER_RESPONSE_COMPLETE", TraceManager::CaptureTimestamp()); + } +#endif // TRITON_ENABLE_TRACING + } + + LOG_TRITONSERVER_ERROR( + TRITONSERVER_InferenceResponseDelete(response), + "deleting inference response"); + + if (err != nullptr) { + EVBufferAddErrorJson(infer_request->req_->buffer_out, err); + infer_request->response_code_ = HttpCodeFromError(err); + TRITONSERVER_ErrorDelete(err); + } + + if ((flags & TRITONSERVER_RESPONSE_COMPLETE_FINAL) == 0) { + return; + } + evthr_defer( + infer_request->thread_, InferRequestClass::ReplyCallback, infer_request); +} + +TRITONSERVER_Error* +HTTPAPIServer::V1InferRequestClass::FinalizeResponse( + TRITONSERVER_InferenceResponse* response) +{ + RETURN_IF_ERR(TRITONSERVER_InferenceResponseError(response)); + + triton::common::TritonJson::Value response_json( + triton::common::TritonJson::ValueType::OBJECT); + + triton::common::TritonJson::Value predictions_json( + response_json, triton::common::TritonJson::ValueType::ARRAY); + + uint32_t output_count; + RETURN_IF_ERR( + TRITONSERVER_InferenceResponseOutputCount(response, &output_count)); + + const size_t N = batch_size_; + + if (N <= 1) { + // Single instance — original unbatched path + triton::common::TritonJson::Value prediction_json( + response_json, triton::common::TritonJson::ValueType::OBJECT); + + for (uint32_t idx = 0; idx < output_count; ++idx) { + const char* cname; + TRITONSERVER_DataType datatype; + const int64_t* shape; + uint64_t dim_count; + const void* base; + size_t byte_size; + TRITONSERVER_MemoryType memory_type; + int64_t memory_type_id; + void* userp; + + RETURN_IF_ERR(TRITONSERVER_InferenceResponseOutput( + response, idx, &cname, &datatype, &shape, &dim_count, &base, + &byte_size, &memory_type, &memory_type_id, &userp)); + + size_t element_count = 1; + for (uint64_t j = 0; j < dim_count; j++) { + element_count *= shape[j]; + } + + // For TYPE_BYTES in V1 protocol, base64-encode and wrap as {"b64":"..."} + if (datatype == TRITONSERVER_TYPE_BYTES) { + const char* cbase_bytes = reinterpret_cast(base); + if (element_count == 1) { + // Single element: extract the bytes blob and b64-encode + if (byte_size >= sizeof(uint32_t)) { + const size_t blob_len = + *(reinterpret_cast(cbase_bytes)); + const char* blob_data = cbase_bytes + sizeof(uint32_t); + if (sizeof(uint32_t) + blob_len <= byte_size) { + std::string b64_str; + RETURN_IF_ERR(EncodeBase64(blob_data, blob_len, b64_str)); + triton::common::TritonJson::Value b64_obj( + response_json, triton::common::TritonJson::ValueType::OBJECT); + RETURN_IF_ERR(b64_obj.AddString("b64", b64_str)); + RETURN_IF_ERR( + prediction_json.Add(cname, std::move(b64_obj))); + } + } + } else { + // Multiple elements: array of {"b64":"..."} objects + triton::common::TritonJson::Value data_json( + response_json, triton::common::TritonJson::ValueType::ARRAY); + size_t offset = 0; + for (size_t e = 0; e < element_count; ++e) { + if (offset + sizeof(uint32_t) > byte_size) break; + const size_t blob_len = + *(reinterpret_cast(cbase_bytes + offset)); + offset += sizeof(uint32_t); + if (offset + blob_len > byte_size) break; + std::string b64_str; + RETURN_IF_ERR( + EncodeBase64(cbase_bytes + offset, blob_len, b64_str)); + triton::common::TritonJson::Value b64_obj( + response_json, triton::common::TritonJson::ValueType::OBJECT); + RETURN_IF_ERR(b64_obj.AddString("b64", b64_str)); + RETURN_IF_ERR(data_json.Append(std::move(b64_obj))); + offset += blob_len; + } + RETURN_IF_ERR(prediction_json.Add(cname, std::move(data_json))); + } + } else { + triton::common::TritonJson::Value data_json( + response_json, triton::common::TritonJson::ValueType::ARRAY); + RETURN_IF_ERR(WriteDataToJson( + &data_json, cname, datatype, base, byte_size, element_count)); + + if (element_count == 1) { + triton::common::TritonJson::Value el; + RETURN_IF_ERR(data_json.At(0, &el)); + RETURN_IF_ERR(prediction_json.Add(cname, std::move(el))); + } else { + RETURN_IF_ERR(prediction_json.Add(cname, std::move(data_json))); + } + } + } + + RETURN_IF_ERR(predictions_json.Append(std::move(prediction_json))); + } else { + // Batched — split outputs across N prediction objects. + // First, collect output metadata. + struct OutputInfo { + const char* name; + TRITONSERVER_DataType datatype; + const int64_t* shape; + uint64_t dim_count; + const void* base; + size_t byte_size; + size_t total_elements; + size_t per_instance_elements; + // For fixed-size types: uniform byte size per instance. + // For TYPE_BYTES: 0 (offsets are computed by walking the data). + size_t per_instance_bytes; + // Per-instance byte offsets for TYPE_BYTES (variable-length). + // For fixed-size types this is empty; offset = inst * per_instance_bytes. + std::vector instance_offsets; + }; + std::vector outputs(output_count); + + for (uint32_t idx = 0; idx < output_count; ++idx) { + TRITONSERVER_MemoryType memory_type; + int64_t memory_type_id; + void* userp; + + RETURN_IF_ERR(TRITONSERVER_InferenceResponseOutput( + response, idx, &outputs[idx].name, &outputs[idx].datatype, + &outputs[idx].shape, &outputs[idx].dim_count, &outputs[idx].base, + &outputs[idx].byte_size, &memory_type, &memory_type_id, &userp)); + + outputs[idx].total_elements = 1; + for (uint64_t j = 0; j < outputs[idx].dim_count; j++) { + outputs[idx].total_elements *= outputs[idx].shape[j]; + } + outputs[idx].per_instance_elements = outputs[idx].total_elements / N; + + if (outputs[idx].datatype == TRITONSERVER_TYPE_BYTES) { + // TYPE_BYTES is length-prefixed (4-byte len + variable blob), + // so we must walk the data to find per-instance boundaries. + outputs[idx].per_instance_bytes = 0; // not used for BYTES + outputs[idx].instance_offsets.resize(N); + const char* walk = + static_cast(outputs[idx].base); + size_t pos = 0; + for (size_t inst = 0; inst < N; ++inst) { + outputs[idx].instance_offsets[inst] = pos; + for (size_t e = 0; e < outputs[idx].per_instance_elements; ++e) { + if (pos + sizeof(uint32_t) > outputs[idx].byte_size) break; + uint32_t blob_len = + *(reinterpret_cast(walk + pos)); + pos += sizeof(uint32_t) + blob_len; + } + } + } else { + outputs[idx].per_instance_bytes = outputs[idx].byte_size / N; + } + } + + // Build N prediction objects + for (size_t inst = 0; inst < N; ++inst) { + triton::common::TritonJson::Value prediction_json( + response_json, triton::common::TritonJson::ValueType::OBJECT); + + for (uint32_t idx = 0; idx < output_count; ++idx) { + const auto& oi = outputs[idx]; + + // For TYPE_BYTES in V1 protocol, base64-encode and wrap as {"b64":"..."} + if (oi.datatype == TRITONSERVER_TYPE_BYTES) { + const char* cbase_bytes = + static_cast(oi.base) + oi.instance_offsets[inst]; + // Compute this instance's byte span by looking at next offset + size_t inst_byte_span = + (inst + 1 < N) ? (oi.instance_offsets[inst + 1] - + oi.instance_offsets[inst]) + : (oi.byte_size - oi.instance_offsets[inst]); + + if (oi.per_instance_elements == 1) { + if (inst_byte_span >= sizeof(uint32_t)) { + const size_t blob_len = + *(reinterpret_cast(cbase_bytes)); + const char* blob_data = cbase_bytes + sizeof(uint32_t); + if (sizeof(uint32_t) + blob_len <= inst_byte_span) { + std::string b64_str; + RETURN_IF_ERR(EncodeBase64(blob_data, blob_len, b64_str)); + triton::common::TritonJson::Value b64_obj( + response_json, + triton::common::TritonJson::ValueType::OBJECT); + RETURN_IF_ERR(b64_obj.AddString("b64", b64_str)); + RETURN_IF_ERR( + prediction_json.Add(oi.name, std::move(b64_obj))); + } + } + } else { + triton::common::TritonJson::Value data_json( + response_json, triton::common::TritonJson::ValueType::ARRAY); + size_t off = 0; + for (size_t e = 0; e < oi.per_instance_elements; ++e) { + if (off + sizeof(uint32_t) > inst_byte_span) break; + const size_t blob_len = + *(reinterpret_cast(cbase_bytes + off)); + off += sizeof(uint32_t); + if (off + blob_len > inst_byte_span) break; + std::string b64_str; + RETURN_IF_ERR( + EncodeBase64(cbase_bytes + off, blob_len, b64_str)); + triton::common::TritonJson::Value b64_obj( + response_json, + triton::common::TritonJson::ValueType::OBJECT); + RETURN_IF_ERR(b64_obj.AddString("b64", b64_str)); + RETURN_IF_ERR(data_json.Append(std::move(b64_obj))); + off += blob_len; + } + RETURN_IF_ERR( + prediction_json.Add(oi.name, std::move(data_json))); + } + } else { + size_t offset = inst * oi.per_instance_bytes; + const void* inst_base = + static_cast(oi.base) + offset; + + triton::common::TritonJson::Value data_json( + response_json, triton::common::TritonJson::ValueType::ARRAY); + RETURN_IF_ERR(WriteDataToJson( + &data_json, oi.name, oi.datatype, inst_base, + oi.per_instance_bytes, oi.per_instance_elements)); + + if (oi.per_instance_elements == 1) { + triton::common::TritonJson::Value el; + RETURN_IF_ERR(data_json.At(0, &el)); + RETURN_IF_ERR(prediction_json.Add(oi.name, std::move(el))); + } else { + RETURN_IF_ERR( + prediction_json.Add(oi.name, std::move(data_json))); + } + } + } + + RETURN_IF_ERR(predictions_json.Append(std::move(prediction_json))); + } + } + + RETURN_IF_ERR(response_json.Add("predictions", std::move(predictions_json))); + + triton::common::TritonJson::WriteBuffer buffer; + RETURN_IF_ERR(response_json.Write(&buffer)); + evbuffer_add(req_->buffer_out, buffer.Base(), buffer.Size()); + + return nullptr; // success +} + +TRITONSERVER_Error* +HTTPAPIServer::V1InferRequestClass::MapInstanceField( + const std::string& name, + triton::common::TritonJson::Value& instance, + std::map& input_metadata, + size_t& consumed_input_byte_size) +{ + auto it = input_metadata.find(name); + if (it == input_metadata.end()) { + // Not a known input — treat as a parameter + RETURN_IF_ERR(SetTritonParameterFromJsonParameter( + name, instance, triton_request_.get())); + } else { + // Parse data type and shape + std::string value; + it->second.MemberAsString("datatype", &value); + auto dtype = TRITONSERVER_StringToDataType(value.c_str()); + + triton::common::TritonJson::Value tensor_data; + if (!instance.Find(name.c_str(), &tensor_data)) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + (std::string("unexpected key not found in V1 instance, " + "expecting key '") + + name + "'") + .c_str()); + } + + size_t byte_size{0}; + size_t element_cnt = tensor_data.IsArray() ? tensor_data.ArraySize() : 1; + + if (dtype == TRITONSERVER_TYPE_BYTES) { + RETURN_IF_ERR(JsonBytesArrayByteSize(tensor_data, &byte_size)); + } else { + size_t element_size = TRITONSERVER_DataTypeByteSize(dtype); + if (element_size == 0) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + (std::string("input '") + name + "' has unsupported datatype " + + value) + .c_str()); + } + + if (element_cnt > (SIZE_MAX / element_size)) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + (std::string("input '") + name + + "' has too many elements of datatype " + value) + .c_str()); + } + + byte_size = element_cnt * element_size; + } + + if (byte_size == 0) { + RETURN_IF_ERR(TRITONSERVER_InferenceRequestAddInput( + triton_request_.get(), name.c_str(), dtype, nullptr, 0)); + return nullptr; + } + + if (byte_size + consumed_input_byte_size > max_input_size_ || + byte_size + consumed_input_byte_size < consumed_input_byte_size) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + (std::string("request size of ") + + std::to_string(consumed_input_byte_size) + " bytes with input '" + + name + "' of size " + std::to_string(byte_size) + + " bytes exceeds the maximum allowed input size of " + + std::to_string(max_input_size_) + + ". Use --http-max-input-size to increase the limit.") + .c_str()); + } + + consumed_input_byte_size += byte_size; + + // Shape padding — two-pass process to match model input shape + std::vector shape_vec; + { + triton::common::TritonJson::Value shape_value; + if (!it->second.Find("shape", &shape_value)) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, + (std::string( + "Unexpected 'shape' not found in model metadata for " + "input '") + + name) + .c_str()); + } + + for (size_t i = 0; i < shape_value.ArraySize(); ++i) { + int64_t d = 0; + RETURN_IF_ERR(shape_value.IndexAsInt(i, &d)); + shape_vec.push_back(d); + } + + // Pass 1: distribute element_cnt across fixed dimensions + for (auto rit = shape_vec.rbegin(); rit != shape_vec.rend(); ++rit) { + if (*rit != -1) { + if (element_cnt % *rit) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + (std::string("Cannot convert V1 input '") + name + + "' to tensor with proper shape") + .c_str()); + } + element_cnt /= *rit; + } + } + + // Pass 2: fill in dynamic dimensions + for (auto rit = shape_vec.rbegin(); rit != shape_vec.rend(); ++rit) { + if (*rit == -1) { + *rit = element_cnt; + element_cnt = 1; + } + } + + if (element_cnt != 1) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + (std::string("Cannot convert V1 input '") + name + + "' to tensor with proper shape") + .c_str()); + } + } + + // Restore original element count for data reading + element_cnt = tensor_data.IsArray() ? tensor_data.ArraySize() : 1; + serialized_data_.emplace_back(); + std::vector& serialized = serialized_data_.back(); + serialized.resize(byte_size); + + RETURN_IF_ERR(ReadDataFromJson( + name.c_str(), tensor_data, &serialized[0], dtype, + dtype == TRITONSERVER_TYPE_BYTES ? byte_size : element_cnt)); + RETURN_IF_ERR(TRITONSERVER_InferenceRequestAddInput( + triton_request_.get(), name.c_str(), dtype, &shape_vec[0], + shape_vec.size())); + RETURN_IF_ERR(TRITONSERVER_InferenceRequestAppendInputData( + triton_request_.get(), name.c_str(), &serialized[0], serialized.size(), + TRITONSERVER_MEMORY_CPU, 0 /* memory_type_id */)); + } + return nullptr; // success +} + +void +HTTPAPIServer::HandleV1Infer( + evhtp_request_t* req, const std::string& model_name, + const std::string& model_version_str) +{ + RETURN_AND_RESPOND_IF_RESTRICTED( + req, RestrictedCategory::INFERENCE, restricted_apis_); + + AddContentTypeHeader(req, "application/json"); + if (req->method != htp_method_POST) { + RETURN_AND_RESPOND_WITH_ERR( + req, EVHTP_RES_METHNALLOWED, "Method Not Allowed"); + } + + int64_t requested_model_version; + RETURN_AND_RESPOND_IF_ERR( + req, + GetModelVersionFromString(model_version_str, &requested_model_version)); + + // If tracing is enabled see if this request should be traced. + TRITONSERVER_InferenceTrace* triton_trace = nullptr; + std::shared_ptr trace; + if (trace_manager_) { + trace = StartTrace(req, model_name, &triton_trace); + } + + std::map input_metadata; + triton::common::TritonJson::Value meta_data_root; + RETURN_AND_RESPOND_IF_ERR( + req, ModelInputMetadata( + model_name, requested_model_version, &input_metadata, + &meta_data_root)); + + if (GetRequestCompressionType(req) != DataCompressor::Type::IDENTITY) { + RETURN_AND_RESPOND_IF_ERR( + req, + TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + "Unsupported content-encoding, only 'identity' is supported.")); + } + + // Create inference request + TRITONSERVER_InferenceRequest* irequest = nullptr; + RETURN_AND_RESPOND_IF_ERR( + req, TRITONSERVER_InferenceRequestNew( + &irequest, server_.get(), model_name.c_str(), + requested_model_version)); + + std::shared_ptr irequest_shared = { + irequest, [](TRITONSERVER_InferenceRequest* request) { + LOG_TRITONSERVER_ERROR( + TRITONSERVER_InferenceRequestDelete(request), + "deleting HTTP/REST inference request"); + }}; + + // Create V1 request class for custom response handling + std::unique_ptr v1_request(new V1InferRequestClass( + server_.get(), req, GetResponseCompressionType(req), irequest_shared, + shm_manager_, max_input_size_)); + v1_request->trace_ = trace; + + const char* request_id = ""; + auto error_callback = [&, trace](TRITONSERVER_Error* error) { + if (error != nullptr) { + if (irequest != nullptr) { + LOG_TRITONSERVER_ERROR( + TRITONSERVER_InferenceRequestId(irequest, &request_id), + "unable to retrieve request ID string"); + } + if (!strncmp(request_id, "", 1)) { + request_id = ""; + } + + LOG_VERBOSE(1) << "[request id: " << request_id << "] " + << "V1 infer failed: " << TRITONSERVER_ErrorMessage(error); + AddContentTypeHeader(req, "application/json"); + EVBufferAddErrorJson(req->buffer_out, error); + evhtp_send_reply(req, HttpCodeFromError(error)); + evhtp_request_resume(req); + +#ifdef TRITON_ENABLE_TRACING + if ((trace != nullptr) && (trace->trace_ != nullptr)) { + TraceManager::TraceRelease(trace->trace_, trace->trace_userp_); + } +#endif // TRITON_ENABLE_TRACING + } + }; + + // Parse request body + triton::common::TritonJson::Value request_json; + size_t buffer_len = 0; + RETURN_AND_CALLBACK_IF_ERR( + EVRequestToJson(req, "v1 predict", &request_json, &buffer_len), + error_callback); + + // Check for instances (row) or inputs (column) format + triton::common::TritonJson::Value instances_json; + triton::common::TritonJson::Value inputs_json; + bool use_instances = request_json.Find("instances", &instances_json); + bool use_inputs = request_json.Find("inputs", &inputs_json); + + if (!use_instances && !use_inputs) { + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + "V1 predict request must have 'instances' or 'inputs' field"), + error_callback); + } + + if (use_instances) { + // Row-oriented "instances" format — batch all instances into one request + size_t batch_size = instances_json.ArraySize(); + if (batch_size == 0) { + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + "V1 predict request must have at least one instance"), + error_callback); + } + + // For models with max_batch_size==0 (no batching support), the V1 + // instances path must NOT prepend a batch dimension. When there is + // exactly one instance, unwrap it and route through the single- + // instance MapInstanceField path which produces the correct shape. + bool no_batch_model = false; + { + TRITONSERVER_Message* config_msg = nullptr; + auto cfg_err = TRITONSERVER_ServerModelConfig( + server_.get(), model_name.c_str(), requested_model_version, + 1 /* config_version */, &config_msg); + if (cfg_err == nullptr && config_msg != nullptr) { + const char* cfg_buf; + size_t cfg_sz; + cfg_err = TRITONSERVER_MessageSerializeToJson( + config_msg, &cfg_buf, &cfg_sz); + if (cfg_err == nullptr) { + triton::common::TritonJson::Value cfg_json; + if (cfg_json.Parse(cfg_buf, cfg_sz) == nullptr) { + int64_t max_bs = -1; + if (cfg_json.MemberAsInt("max_batch_size", &max_bs) == nullptr) { + no_batch_model = (max_bs == 0); + } + } + } + TRITONSERVER_MessageDelete(config_msg); + } + if (cfg_err != nullptr) { + TRITONSERVER_ErrorDelete(cfg_err); + } + } + + if (no_batch_model && batch_size == 1) { + // Unwrap single instance and process without batch dimension + LOG_VERBOSE(1) << "V1 instances: max_batch_size=0 with 1 instance, " + "using non-batched path"; + v1_request->SetBatchSize(1); + + triton::common::TritonJson::Value single_instance; + RETURN_AND_CALLBACK_IF_ERR( + instances_json.IndexAsObject(0, &single_instance), error_callback); + + std::vector members; + RETURN_AND_CALLBACK_IF_ERR( + single_instance.Members(&members), error_callback); + + size_t consumed_input_size{0}; + for (const auto& field : members) { + RETURN_AND_CALLBACK_IF_ERR( + v1_request->MapInstanceField( + field, single_instance, input_metadata, consumed_input_size), + error_callback); + } + } else { + + v1_request->SetBatchSize(batch_size); + + // Get field names from first instance + triton::common::TritonJson::Value first_instance; + RETURN_AND_CALLBACK_IF_ERR( + instances_json.IndexAsObject(0, &first_instance), error_callback); + + std::vector members; + RETURN_AND_CALLBACK_IF_ERR(first_instance.Members(&members), error_callback); + + size_t consumed_input_size{0}; + + for (const auto& field : members) { + auto it = input_metadata.find(field); + if (it == input_metadata.end()) { + // Not a known input — treat as parameter from first instance only + RETURN_AND_CALLBACK_IF_ERR( + SetTritonParameterFromJsonParameter( + field, first_instance, irequest), + error_callback); + continue; + } + + // This field is a model input — batch across all instances + std::string dtype_str; + it->second.MemberAsString("datatype", &dtype_str); + auto dtype = TRITONSERVER_StringToDataType(dtype_str.c_str()); + + // Compute per-instance shape from model metadata + std::vector per_instance_shape; + { + triton::common::TritonJson::Value shape_value; + if (!it->second.Find("shape", &shape_value)) { + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, + (std::string("'shape' not found in model metadata for " + "input '") + + field + "'") + .c_str()), + error_callback); + } + for (size_t si = 0; si < shape_value.ArraySize(); ++si) { + int64_t d = 0; + RETURN_AND_CALLBACK_IF_ERR( + shape_value.IndexAsInt(si, &d), error_callback); + per_instance_shape.push_back(d); + } + } + + // Process first instance to determine element count and shape resolution + triton::common::TritonJson::Value tensor_data_0; + if (!first_instance.Find(field.c_str(), &tensor_data_0)) { + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + (std::string("key '") + field + + "' not found in V1 instance") + .c_str()), + error_callback); + } + size_t per_element_cnt = + tensor_data_0.IsArray() ? tensor_data_0.ArraySize() : 1; + size_t per_byte_size{0}; + + // For TYPE_BYTES the serialised size is variable per instance + // (4-byte length prefix + blob). Compute each instance's size + // individually so we can allocate the right total and write at + // correct offsets. + std::vector inst_byte_sizes; // only used for TYPE_BYTES + size_t total_byte_size{0}; + + if (dtype == TRITONSERVER_TYPE_BYTES) { + inst_byte_sizes.resize(batch_size); + for (size_t inst = 0; inst < batch_size; ++inst) { + triton::common::TritonJson::Value cur_inst; + RETURN_AND_CALLBACK_IF_ERR( + instances_json.IndexAsObject(inst, &cur_inst), error_callback); + triton::common::TritonJson::Value cur_tensor; + if (!cur_inst.Find(field.c_str(), &cur_tensor)) { + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + (std::string("key '") + field + + "' missing in instance " + std::to_string(inst)) + .c_str()), + error_callback); + } + RETURN_AND_CALLBACK_IF_ERR( + JsonBytesArrayByteSize(cur_tensor, &inst_byte_sizes[inst]), + error_callback); + total_byte_size += inst_byte_sizes[inst]; + } + // per_byte_size is unused for TYPE_BYTES in the write loop below, + // but set it from instance 0 for the shape-resolution logic. + per_byte_size = inst_byte_sizes[0]; + } else { + size_t element_size = TRITONSERVER_DataTypeByteSize(dtype); + if (element_size == 0) { + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + (std::string("input '") + field + + "' has unsupported datatype " + dtype_str) + .c_str()), + error_callback); + } + per_byte_size = per_element_cnt * element_size; + total_byte_size = per_byte_size * batch_size; + } + + // Resolve dynamic dimensions using per-instance element count + std::vector resolved_shape = per_instance_shape; + { + size_t tmp_cnt = per_element_cnt; + for (auto rit = resolved_shape.rbegin(); rit != resolved_shape.rend(); + ++rit) { + if (*rit != -1) { + if (tmp_cnt % *rit) { + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + (std::string("Cannot convert V1 input '") + field + + "' to tensor with proper shape") + .c_str()), + error_callback); + } + tmp_cnt /= *rit; + } + } + for (auto rit = resolved_shape.rbegin(); rit != resolved_shape.rend(); + ++rit) { + if (*rit == -1) { + *rit = tmp_cnt; + tmp_cnt = 1; + } + } + } + + // Build batched shape. + // + // Per Triton docs (model_configuration.html#maximum-batch-size): + // max_batch_size > 0 → full shape = [-1] + dims (implicit batch dim) + // max_batch_size == 0 → full shape = dims (no batch dim) + // + // For batched models (max_batch_size > 0): prepend [N, ...dims] so + // Triton's scheduler sees the batch dimension it expects. + // For unbatched models (max_batch_size == 0): multiply first dim by N + // so data concatenates along the existing axis without adding a + // dimension the model doesn't expect. + std::vector batched_shape; + if (no_batch_model) { + // No implicit batch dimension — concatenate along first dim + batched_shape = resolved_shape; + batched_shape[0] *= static_cast(batch_size); + } else { + // Standard: prepend batch dimension (existing behavior, unchanged) + batched_shape.push_back(static_cast(batch_size)); + batched_shape.insert( + batched_shape.end(), resolved_shape.begin(), resolved_shape.end()); + } + + // total_byte_size is already computed above for TYPE_BYTES; + // for fixed-size types it was set to per_byte_size * batch_size. + + if (total_byte_size + consumed_input_size > max_input_size_ || + total_byte_size + consumed_input_size < consumed_input_size) { + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + (std::string("batched input '") + field + + "' exceeds --http-max-input-size") + .c_str()), + error_callback); + } + consumed_input_size += total_byte_size; + + if (total_byte_size == 0) { + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_InferenceRequestAddInput( + irequest, field.c_str(), dtype, &batched_shape[0], + batched_shape.size()), + error_callback); + continue; + } + + // Allocate batched buffer and fill from each instance + v1_request->serialized_data_.emplace_back(); + std::vector& batched_buf = v1_request->serialized_data_.back(); + batched_buf.resize(total_byte_size); + + // For TYPE_BYTES, build cumulative offsets from the per-instance + // sizes computed earlier. For fixed-size types, offset is simply + // inst * per_byte_size. + std::vector inst_offsets; + if (dtype == TRITONSERVER_TYPE_BYTES) { + inst_offsets.resize(batch_size); + inst_offsets[0] = 0; + for (size_t i = 1; i < batch_size; ++i) { + inst_offsets[i] = inst_offsets[i - 1] + inst_byte_sizes[i - 1]; + } + } + + for (size_t inst = 0; inst < batch_size; ++inst) { + triton::common::TritonJson::Value cur_instance; + RETURN_AND_CALLBACK_IF_ERR( + instances_json.IndexAsObject(inst, &cur_instance), error_callback); + + triton::common::TritonJson::Value cur_tensor; + if (!cur_instance.Find(field.c_str(), &cur_tensor)) { + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + (std::string("key '") + field + + "' missing in instance " + std::to_string(inst)) + .c_str()), + error_callback); + } + + size_t write_offset = (dtype == TRITONSERVER_TYPE_BYTES) + ? inst_offsets[inst] + : inst * per_byte_size; + size_t write_limit = (dtype == TRITONSERVER_TYPE_BYTES) + ? inst_byte_sizes[inst] + : per_element_cnt; + + RETURN_AND_CALLBACK_IF_ERR( + ReadDataFromJson( + field.c_str(), cur_tensor, + &batched_buf[write_offset], dtype, write_limit), + error_callback); + } + + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_InferenceRequestAddInput( + irequest, field.c_str(), dtype, &batched_shape[0], + batched_shape.size()), + error_callback); + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_InferenceRequestAppendInputData( + irequest, field.c_str(), &batched_buf[0], batched_buf.size(), + TRITONSERVER_MEMORY_CPU, 0), + error_callback); + } + } // end of batched (else) path for instances with max_batch_size > 0 + } else { + // Column-oriented "inputs" format — each key maps directly to a tensor + v1_request->SetBatchSize(1); + + std::vector members; + RETURN_AND_CALLBACK_IF_ERR(inputs_json.Members(&members), error_callback); + + size_t consumed_input_size{0}; + for (const auto& field : members) { + RETURN_AND_CALLBACK_IF_ERR( + v1_request->MapInstanceField( + field, inputs_json, input_metadata, consumed_input_size), + error_callback); + } + } + + // Handle top-level parameters (if present) + triton::common::TritonJson::Value params_json; + if (request_json.Find("parameters", ¶ms_json)) { + std::vector param_keys; + RETURN_AND_CALLBACK_IF_ERR(params_json.Members(¶m_keys), error_callback); + for (const auto& pk : param_keys) { + RETURN_AND_CALLBACK_IF_ERR( + SetTritonParameterFromJsonParameter(pk, params_json, irequest), + error_callback); + } + } + + // Set callbacks and submit inference + auto request_release_payload = + std::make_unique(irequest_shared, nullptr); + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_InferenceRequestSetReleaseCallback( + irequest, InferRequestClass::InferRequestComplete, + request_release_payload.get()), + error_callback); + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_InferenceRequestSetResponseCallback( + irequest, allocator_, + reinterpret_cast(&v1_request->alloc_payload_), + V1InferRequestClass::InferResponseComplete, + reinterpret_cast(v1_request.get())), + error_callback); + + RETURN_AND_CALLBACK_IF_ERR( + TRITONSERVER_ServerInferAsync(server_.get(), irequest, triton_trace), + error_callback); + +#ifdef TRITON_ENABLE_TRACING + if (trace != nullptr) { + trace->trace_ = nullptr; + } +#endif // TRITON_ENABLE_TRACING + v1_request.release(); + request_release_payload.release(); +} + + +void +HTTPAPIServer::HandleV1ListModels(evhtp_request_t* req) +{ + AddContentTypeHeader(req, "application/json"); + + if (req->method != htp_method_GET) { + RETURN_AND_RESPOND_WITH_ERR( + req, EVHTP_RES_METHNALLOWED, "Method Not Allowed"); + } + + TRITONSERVER_Message* message = nullptr; + TRITONSERVER_Error* err = TRITONSERVER_ServerModelIndex( + server_.get(), TRITONSERVER_INDEX_FLAG_READY, &message); + if (err != nullptr) { + EVBufferAddErrorJson(req->buffer_out, err); + evhtp_send_reply(req, HttpCodeFromError(err)); + TRITONSERVER_ErrorDelete(err); + return; + } + + const char* buffer; + size_t byte_size; + err = TRITONSERVER_MessageSerializeToJson(message, &buffer, &byte_size); + if (err != nullptr) { + TRITONSERVER_MessageDelete(message); + EVBufferAddErrorJson(req->buffer_out, err); + evhtp_send_reply(req, HttpCodeFromError(err)); + TRITONSERVER_ErrorDelete(err); + return; + } + + // Parse the index array and re-format to V1 style + triton::common::TritonJson::Value index_json; + err = index_json.Parse(buffer, byte_size); + TRITONSERVER_MessageDelete(message); + if (err != nullptr) { + EVBufferAddErrorJson(req->buffer_out, err); + evhtp_send_reply(req, HttpCodeFromError(err)); + TRITONSERVER_ErrorDelete(err); + return; + } + + triton::common::TritonJson::Value response_json( + triton::common::TritonJson::ValueType::OBJECT); + triton::common::TritonJson::Value models_json( + response_json, triton::common::TritonJson::ValueType::ARRAY); + + for (size_t i = 0; i < index_json.ArraySize(); ++i) { + triton::common::TritonJson::Value entry; + index_json.IndexAsObject(i, &entry); + + std::string name; + entry.MemberAsString("name", &name); + + std::string state; + entry.MemberAsString("state", &state); + + triton::common::TritonJson::Value model_obj( + response_json, triton::common::TritonJson::ValueType::OBJECT); + model_obj.AddString("name", name); + model_obj.AddBool("ready", state == "READY"); + models_json.Append(std::move(model_obj)); + } + + response_json.Add("models", std::move(models_json)); + + triton::common::TritonJson::WriteBuffer wbuf; + response_json.Write(&wbuf); + evbuffer_add(req->buffer_out, wbuf.Base(), wbuf.Size()); + evhtp_send_reply(req, EVHTP_RES_OK); +} + +void +HTTPAPIServer::HandleV1ModelMetadata( + evhtp_request_t* req, const std::string& model_name, + const std::string& model_version_str) +{ + AddContentTypeHeader(req, "application/json"); + + if (req->method != htp_method_GET) { + RETURN_AND_RESPOND_WITH_ERR( + req, EVHTP_RES_METHNALLOWED, "Method Not Allowed"); + } + + if (model_name.empty()) { + RETURN_AND_RESPOND_WITH_ERR( + req, EVHTP_RES_BADREQ, "Missing model name in ModelMetadata request"); + } + + TRITONSERVER_Message* message = nullptr; + + int64_t requested_model_version; + auto err = + GetModelVersionFromString(model_version_str, &requested_model_version); + if (err == nullptr) { + err = TRITONSERVER_ServerModelMetadata( + server_.get(), model_name.c_str(), requested_model_version, &message); + if (err == nullptr) { + const char* buffer; + size_t byte_size; + err = TRITONSERVER_MessageSerializeToJson(message, &buffer, &byte_size); + if (err == nullptr) { + evbuffer_add(req->buffer_out, buffer, byte_size); + evhtp_send_reply(req, EVHTP_RES_OK); + } + TRITONSERVER_MessageDelete(message); + } + } + + RETURN_AND_RESPOND_IF_ERR(req, err); +} + +void +HTTPAPIServer::HandleV1ModelReady( + evhtp_request_t* req, const std::string& model_name, + const std::string& model_version_str) +{ + if (req->method != htp_method_GET) { + RETURN_AND_RESPOND_WITH_ERR( + req, EVHTP_RES_METHNALLOWED, "Method Not Allowed"); + } + + AddContentTypeHeader(req, "application/json"); + + int64_t requested_model_version; + RETURN_AND_RESPOND_IF_ERR( + req, + GetModelVersionFromString(model_version_str, &requested_model_version)); + + bool is_ready = false; + TRITONSERVER_Error* err = TRITONSERVER_ServerModelIsReady( + server_.get(), model_name.c_str(), requested_model_version, &is_ready); + + if (err != nullptr) { + EVBufferAddErrorJson(req->buffer_out, err); + evhtp_send_reply(req, HttpCodeFromError(err)); + TRITONSERVER_ErrorDelete(err); + return; + } + + triton::common::TritonJson::Value response_json( + triton::common::TritonJson::ValueType::OBJECT); + response_json.AddString("name", std::string(model_name)); + response_json.AddBool("ready", is_ready); + + triton::common::TritonJson::WriteBuffer buffer; + response_json.Write(&buffer); + evbuffer_add(req->buffer_out, buffer.Base(), buffer.Size()); + + evhtp_send_reply(req, EVHTP_RES_OK); +} + +void +HTTPAPIServer::Handle(evhtp_request_t* req) +{ + LOG_VERBOSE(1) << "HTTP request: " << req->method << " " + << req->uri->path->full; + + // KServe V1 Protocol routes + std::string path(req->uri->path->full); + + // V1 Health endpoints: /v1/health/live, /v1/health/ready + if (path == "/v1/health/live" || path == "/v1/health/ready") { + std::string kind = (path.back() == 'e') ? "live" : "ready"; + HandleServerHealth(req, kind); + return; + } + + // V1 List models: GET /v1/models + if (path == "/v1/models") { + HandleV1ListModels(req); + return; + } + + // V1 Model-specific routes + std::string v1_model_name, v1_version, v1_metadata, v1_action; + if (RE2::FullMatch( + path, v1_model_regex_, &v1_model_name, &v1_version, &v1_metadata, + &v1_action)) { + if (v1_action == "predict") { + HandleV1Infer(req, v1_model_name, v1_version); + return; + } else if (v1_metadata == "metadata") { + HandleV1ModelMetadata(req, v1_model_name, v1_version); + return; + } else if (v1_action.empty() && v1_metadata.empty()) { + // GET /v1/models/{name} or /v1/models/{name}/versions/{ver} + HandleV1ModelReady(req, v1_model_name, v1_version); + return; + } + } if (std::string(req->uri->path->full) == "/v2/models/stats") { // model statistics diff --git a/src/http_server.h b/src/http_server.h index a3f4e6b93a..3b91b6377a 100644 --- a/src/http_server.h +++ b/src/http_server.h @@ -384,6 +384,51 @@ class HTTPAPIServer : public HTTPServer { evhtp_res response_code_{EVHTP_RES_OK}; }; + // KServe V1 protocol request class — translates V1 (instances/predictions) + // format to/from Triton's native V2 typed tensor format. + class V1InferRequestClass : public InferRequestClass { + public: + explicit V1InferRequestClass( + TRITONSERVER_Server* server, evhtp_request_t* req, + DataCompressor::Type response_compression_type, + const std::shared_ptr& triton_request, + const std::shared_ptr& shm_manager, + const size_t max_input_size) + : InferRequestClass( + server, req, response_compression_type, triton_request, + shm_manager), + max_input_size_(max_input_size) + { + } + + TRITONSERVER_Error* FinalizeResponse( + TRITONSERVER_InferenceResponse* response) override; + + void SetResponseHeader( + const bool has_binary_data, const size_t header_length) override; + + static void InferResponseComplete( + TRITONSERVER_InferenceResponse* response, const uint32_t flags, + void* userp); + + // Map a V1 instance field to a Triton input tensor, following the + // ExactMappingInput pattern from GenerateRequestClass. + TRITONSERVER_Error* MapInstanceField( + const std::string& name, + triton::common::TritonJson::Value& instance, + std::map& + input_metadata, + size_t& consumed_input_byte_size); + + // Set the batch size (number of V1 instances) for response splitting + void SetBatchSize(size_t n) { batch_size_ = n; } + size_t GetBatchSize() const { return batch_size_; } + + private: + const size_t max_input_size_{0}; + size_t batch_size_{1}; + }; + class GenerateRequestClass : public InferRequestClass { public: explicit GenerateRequestClass( @@ -609,6 +654,18 @@ class HTTPAPIServer : public HTTPServer { void HandleTrace(evhtp_request_t* req, const std::string& model_name = ""); void HandleLogging(evhtp_request_t* req); + // KServe V1 protocol + void HandleV1Infer( + evhtp_request_t* req, const std::string& model_name, + const std::string& model_version_str); + void HandleV1ModelReady( + evhtp_request_t* req, const std::string& model_name, + const std::string& model_version_str); + void HandleV1ListModels(evhtp_request_t* req); + void HandleV1ModelMetadata( + evhtp_request_t* req, const std::string& model_name, + const std::string& model_version_str); + // Text Generation / LLM format //'streaming' selects the schema pair to convert request / response. // 'streaming' also controls the response convention, if true, @@ -702,6 +759,7 @@ class HTTPAPIServer : public HTTPServer { re2::RE2 systemsharedmemory_regex_; re2::RE2 cudasharedmemory_regex_; re2::RE2 trace_regex_; + re2::RE2 v1_model_regex_; // [DLIS-5551] currently always performs basic conversion, only maps schema // of EXACT_MAPPING kind. MAPPING_SCHEMA and upcoming kinds are for diff --git a/src/vertex_ai_server.cc b/src/vertex_ai_server.cc index ec4fd6802f..bbac677047 100644 --- a/src/vertex_ai_server.cc +++ b/src/vertex_ai_server.cc @@ -25,7 +25,10 @@ // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "vertex_ai_server.h" +#include #include +#include +#include #include "common.h" @@ -120,8 +123,8 @@ VertexAiAPIServer::Handle(evhtp_request_t* req) const char* redirect_c_str = evhtp_kv_find(req->headers_in, redirect_header_.c_str()); if (redirect_c_str == nullptr) { - // Infer the default model - HandleInfer(req, model_name_, model_version_str_); + // Auto-detect V1 vs V2 format and route accordingly + HandleVertexAiPredict(req); return; } else { // Endpoint redirection is requested @@ -241,6 +244,54 @@ VertexAiAPIServer::Handle(evhtp_request_t* req) evhtp_send_reply(req, EVHTP_RES_BADREQ); } +void +VertexAiAPIServer::HandleVertexAiPredict(evhtp_request_t* req) +{ + if (req->method != htp_method_POST) { + evhtp_send_reply(req, EVHTP_RES_METHNALLOWED); + return; + } + + // Non-destructive peek at the request body to detect V1 vs V2 format. + // evbuffer_copyout copies without draining, so HandleInfer/HandleV1Infer + // can still read the full body. + // + // V1 requests have "instances" as a top-level JSON key. + // V2 requests have "inputs" as a top-level JSON key. + // We parse the full body as JSON and check for the key to avoid false + // positives from substring matching (e.g. "instances" appearing inside + // a data value). + evbuffer* input_buffer = req->buffer_in; + size_t buffer_len = evbuffer_get_length(input_buffer); + + bool is_v1 = false; + if (buffer_len > 0) { + std::vector body_buf(buffer_len); + ev_ssize_t copied = + evbuffer_copyout(input_buffer, body_buf.data(), buffer_len); + if (copied > 0) { + triton::common::TritonJson::Value body_json; + auto parse_err = body_json.Parse(body_buf.data(), copied); + if (parse_err == nullptr) { + triton::common::TritonJson::Value dummy; + is_v1 = body_json.Find("instances", &dummy); + } else { + TRITONSERVER_ErrorDelete(parse_err); + } + } + } + + if (is_v1) { + LOG_VERBOSE(1) << "Vertex AI: detected V1 (instances) format, routing to " + "HandleV1Infer"; + HandleV1Infer(req, model_name_, model_version_str_); + } else { + LOG_VERBOSE(1) << "Vertex AI: detected V2 (inputs) format, routing to " + "HandleInfer"; + HandleInfer(req, model_name_, model_version_str_); + } +} + void VertexAiAPIServer::HandleMetrics(evhtp_request_t* req) { diff --git a/src/vertex_ai_server.h b/src/vertex_ai_server.h index 3221c5ccd6..be608e547f 100644 --- a/src/vertex_ai_server.h +++ b/src/vertex_ai_server.h @@ -53,6 +53,9 @@ class VertexAiAPIServer : public HTTPAPIServer { void Handle(evhtp_request_t* req) override; + // Auto-detect V1 (instances) vs V2 (inputs) format and route accordingly + void HandleVertexAiPredict(evhtp_request_t* req); + void HandleMetrics(evhtp_request_t* req); TRITONSERVER_Error* GetInferenceHeaderLength(