Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 23 additions & 231 deletions vmm/src/api/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,22 @@
use std::collections::BTreeMap;
use std::error::Error;
use std::fs::File;
use std::io::ErrorKind;
use std::os::fd::AsRawFd;
use std::os::unix::io::{IntoRawFd, RawFd};
use std::os::unix::net::UnixListener;
use std::panic::AssertUnwindSafe;
use std::path::PathBuf;
use std::sync::mpsc::{Receiver, Sender, channel, sync_channel};
use std::sync::{Arc, LazyLock, Mutex};
use std::sync::LazyLock;
use std::sync::mpsc::Sender;
use std::thread;

use hypervisor::HypervisorType;
use log::{debug, error};
use log::error;
use micro_http::{
Body, HttpServer, MediaType, Method, Request, Response, ServerError, ServerRequest,
ServerResponse, StatusCode, Version,
Body, HttpServer, MediaType, Method, Request, Response, ServerError, StatusCode, Version,
};
use seccompiler::{SeccompAction, apply_filter};
use serde_json::Error as SerdeError;
use thiserror::Error;
use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
use vmm_sys_util::eventfd::EventFd;

use self::http_endpoint::{VmActionHandler, VmCreate, VmInfo, VmmPing, VmmShutdown};
Expand Down Expand Up @@ -320,153 +316,10 @@ fn handle_http_request(
response
}

/// Keeps track of the worker threads, and the resources needed to interact
/// with them.
#[derive(Debug)]
struct HttpWorkerThreads {
// The worker threads themselves.
threads: Vec<thread::JoinHandle<Result<()>>>,
// An MPSC channel to send server requests to the workers. We put it into
// an option so we can easily drop it in the destructor.
request_tx: Option<Sender<ServerRequest>>,
// An MPSC channel that the workers use to send responses to the HTTP
// server thread.
response_rx: Receiver<ServerResponse>,
// Workers signal this eventfd when they have a response for the HTTP
// server thread.
response_event: EventFd,
}

impl HttpWorkerThreads {
fn new(
thread_count: usize,
api_notifier: &EventFd,
api_sender: &Sender<ApiRequest>,
seccomp_action: &SeccompAction,
hypervisor_type: HypervisorType,
landlock_enable: bool,
exit_evt: &EventFd,
) -> Result<Self> {
let response_event = EventFd::new(libc::EFD_NONBLOCK).map_err(VmmError::EventFdCreate)?;
let (response_tx, response_rx) = sync_channel::<ServerResponse>(thread_count);

let mut threads = Vec::new();
let (request_tx, request_rx) = channel::<ServerRequest>();

let request_rx = Arc::new(Mutex::new(request_rx));

// We use the same seccomp filter that we already use for the HTTP server thread.
let api_seccomp_filter =
get_seccomp_filter(seccomp_action, Thread::HttpApi, hypervisor_type)
.map_err(VmmError::CreateSeccompFilter)?;

for n in 0..thread_count {
let response_event = response_event.try_clone().map_err(VmmError::EventFdClone)?;

let response_tx = response_tx.clone();
let request_rx = request_rx.clone();

let api_notifier = api_notifier.try_clone().map_err(VmmError::EventFdClone)?;
let api_sender = api_sender.clone();

let api_seccomp_filter = api_seccomp_filter.clone();
let exit_evt = exit_evt.try_clone().map_err(VmmError::EventFdClone)?;

let thread = thread::Builder::new()
.name(format!("http-worker-{n}").to_string())
.spawn(move || {
debug!("Spawned HTTP worker thread with id {n}",);
if !api_seccomp_filter.is_empty() {
apply_filter(&api_seccomp_filter)
.map_err(VmmError::ApplySeccompFilter)
.map_err(|e| {
error!("Error applying seccomp filter: {e:?}");
exit_evt.write(1).ok();
e
})?;
}

if landlock_enable {
Landlock::new()
.map_err(VmmError::CreateLandlock)?
.restrict_self()
.map_err(VmmError::ApplyLandlock)
.map_err(|e| {
error!("Error applying landlock to http-worker thread: {e:?}");
exit_evt.write(1).ok();
e
})?;
}

std::panic::catch_unwind(AssertUnwindSafe(move || {
let id = n;
loop {
let request = request_rx.lock().unwrap().recv();
match request {
Ok(msg) => {
// Process the server request
let response = msg.process(|request| {
handle_http_request(request, &api_notifier, &api_sender)
});

// Send the response to the HTTP server thread together with this
// threads id.
if let Err(e) = response_tx.send(response) {
error!(
"HTTP worker thread {id}: error sending response {e}"
);
break;
}

// Notify the HTTP server thread.
response_event.write(1).ok();
}
Err(_) => {
// We assume that the other side of the channel
// closed because the VMM received a shutdown request.
break;
}
}
}
}))
.map_err(|_| {
error!("http-worker thread {n} panicked");
exit_evt.write(1).ok()
})
.ok();

Ok(())
})
.map_err(VmmError::HttpThreadSpawn)?;

threads.push(thread);
}

Ok(Self {
threads,
request_tx: Some(request_tx),
response_rx,
response_event,
})
}
}

impl Drop for HttpWorkerThreads {
fn drop(&mut self) {
// Dropping the Sender side of the request channels to throw the worker
// threads out of their loops.
drop(self.request_tx.take());
// Now we can join each thread.
self.threads
.drain(..)
.for_each(|thread| thread.join().unwrap().unwrap());
}
}

fn start_http_thread(
mut server: HttpServer,
api_notifier: &EventFd,
api_sender: &Sender<ApiRequest>,
api_notifier: EventFd,
api_sender: Sender<ApiRequest>,
seccomp_action: &SeccompAction,
exit_evt: EventFd,
hypervisor_type: HypervisorType,
Expand All @@ -483,42 +336,6 @@ fn start_http_thread(
.add_kill_switch(api_shutdown_fd_clone)
.map_err(VmmError::CreateApiServer)?;

// We use the epoll mechanism to parallelize this. The epoll tokens are
// attached when registering the FDs with epoll. That way we can later
// check why we were notified.
const HTTP_EPOLL_TOKEN: u64 = 1;
const WORKER_EPOLL_TOKEN: u64 = 2;

// The epoll instance our HTTP server thread will wait on.
let outer_epoll = Epoll::new().unwrap();
let worker_threads = HttpWorkerThreads::new(
2,
api_notifier,
api_sender,
seccomp_action,
hypervisor_type,
landlock_enable,
&exit_evt,
)?;

// Register the fd that the worker threads will signal.
outer_epoll
.ctl(
ControlOperation::Add,
worker_threads.response_event.as_raw_fd(),
EpollEvent::new(EventSet::IN, WORKER_EPOLL_TOKEN),
)
.unwrap();

// Register the HttpServer's fd.
outer_epoll
.ctl(
ControlOperation::Add,
server.epoll().as_raw_fd(),
EpollEvent::new(EventSet::IN, HTTP_EPOLL_TOKEN),
)
.unwrap();

let thread = thread::Builder::new()
.name("http-server".to_string())
.spawn(move || {
Expand Down Expand Up @@ -546,47 +363,24 @@ fn start_http_thread(
}

std::panic::catch_unwind(AssertUnwindSafe(move || {
let mut events = vec![EpollEvent::default(); 32];
server.start_server().unwrap();

loop {
let n = match outer_epoll.wait(-1, &mut events) {
Ok(n) => n,
// Can for example happen when connecting a debugger.
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => panic!("failed to wait for events: {e}"),
};
for ev in events.iter().take(n) {
match ev.data() {
HTTP_EPOLL_TOKEN => {
// The HttpServer got a request, handle that.
match server.requests() {
Ok(request_vec) => {
for server_request in request_vec {
worker_threads.request_tx.as_ref().unwrap().send(server_request).unwrap();
}
}
Err(ServerError::ShutdownEvent) => {
server.flush_outgoing_writes();
return;
}
Err(e) => {
error!(
"HTTP server error on retrieving incoming request. Error: {e}"
);
}
}
}
WORKER_EPOLL_TOKEN => {
// One of the worker threads has a response.
// We clear the eventfd first.
let _ = worker_threads.response_event.read().unwrap();
let response = worker_threads.response_rx.recv().unwrap();
if let Err(e) = server.respond(response){
match server.requests() {
Ok(request_vec) => {
for server_request in request_vec {
if let Err(e) = server.respond(server_request.process(|request| {
handle_http_request(request, &api_notifier, &api_sender)
})) {
error!("HTTP server error on response: {e}");
}
}
_ => { }
}
Err(ServerError::ShutdownEvent) => {
server.flush_outgoing_writes();
return;
}
Err(e) => {
error!("HTTP server error on retrieving incoming request. Error: {e}");
}
}
}
Expand All @@ -604,7 +398,6 @@ fn start_http_thread(
Ok((thread, api_shutdown_fd))
}

#[allow(clippy::needless_pass_by_value)]
pub fn start_http_path_thread(
path: &str,
api_notifier: EventFd,
Expand All @@ -622,16 +415,15 @@ pub fn start_http_path_thread(

start_http_thread(
server,
&api_notifier,
&api_sender,
api_notifier,
api_sender,
seccomp_action,
exit_evt,
hypervisor_type,
landlock_enable,
)
}

#[allow(clippy::needless_pass_by_value)]
pub fn start_http_fd_thread(
fd: RawFd,
api_notifier: EventFd,
Expand All @@ -645,8 +437,8 @@ pub fn start_http_fd_thread(
let server = unsafe { HttpServer::new_from_fd(fd) }.map_err(VmmError::CreateApiServer)?;
start_http_thread(
server,
&api_notifier,
&api_sender,
api_notifier,
api_sender,
seccomp_action,
exit_evt,
hypervisor_type,
Expand Down
1 change: 0 additions & 1 deletion vmm/src/seccomp_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,6 @@ fn http_api_thread_rules() -> Result<Vec<(i64, Vec<SeccompRule>)>, BackendError>
(libc::SYS_rt_sigprocmask, vec![]),
(libc::SYS_getcwd, vec![]),
(libc::SYS_clock_nanosleep, vec![]),
(libc::SYS_read, vec![]),
])
}

Expand Down
Loading