From de857c9c15f2a03a461f6df42d8038e94592da39 Mon Sep 17 00:00:00 2001 From: Benedikt Schesch Date: Mon, 19 Jan 2026 12:09:33 +0100 Subject: [PATCH] feat: implement APPE (append) command per RFC 959 Add support for the FTP APPE command which appends data to an existing file or creates a new file if it doesn't exist. Changes: - Add Appe variant to Command enum and parser - Add Appe variant to DataChanCmd for data channel communication - Implement Appe command handler - Implement exec_appe in datachan (gets file size, appends at end) - Add integration tests for append to existing and new files --- README.md | 2 +- src/server/chancomms.rs | 5 + src/server/controlchan/command.rs | 4 + src/server/controlchan/commands/appe.rs | 60 +++++++ src/server/controlchan/commands/mod.rs | 2 + src/server/controlchan/control_loop.rs | 1 + src/server/controlchan/line_parser/parser.rs | 8 + src/server/controlchan/line_parser/tests.rs | 35 +++++ src/server/datachan.rs | 57 +++++++ tests/appe.rs | 155 +++++++++++++++++++ 10 files changed, 328 insertions(+), 1 deletion(-) create mode 100644 src/server/controlchan/commands/appe.rs create mode 100644 tests/appe.rs diff --git a/README.md b/README.md index 55e33bb9..f9b45d60 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ possible. Feature highlights: -* 41 Supported FTP commands (see [commands directory](./src/server/controlchan/commands)) and growing +* 42 Supported FTP commands (see [commands directory](./src/server/controlchan/commands)) and growing * Ability to implement own storage back-ends * Ability to implement own authentication back-ends * Explicit FTPS (TLS) diff --git a/src/server/chancomms.rs b/src/server/chancomms.rs index 43e2dd2d..486cfd3f 100644 --- a/src/server/chancomms.rs +++ b/src/server/chancomms.rs @@ -31,6 +31,10 @@ pub enum DataChanCmd { /// The path to the file the client would like to store. path: String, }, + Appe { + /// The path to the file the client would like to append to. + path: String, + }, List { /// Arguments passed along with the list command. options: Option, @@ -53,6 +57,7 @@ impl DataChanCmd { match self { DataChanCmd::Retr { path, .. } => Some(path.clone()), DataChanCmd::Stor { path, .. } => Some(path.clone()), + DataChanCmd::Appe { path, .. } => Some(path.clone()), DataChanCmd::List { path, .. } => path.clone(), DataChanCmd::Mlsd { path } => path.clone(), DataChanCmd::Nlst { path } => path.clone(), diff --git a/src/server/controlchan/command.rs b/src/server/controlchan/command.rs index 69412373..9c7479bf 100644 --- a/src/server/controlchan/command.rs +++ b/src/server/controlchan/command.rs @@ -52,6 +52,10 @@ pub enum Command { /// The path to the file the client would like to store. path: String, }, + Appe { + /// The path to the file the client would like to append to. + path: String, + }, List { /// Arguments passed along with the list command. options: Option, diff --git a/src/server/controlchan/commands/appe.rs b/src/server/controlchan/commands/appe.rs new file mode 100644 index 00000000..55e3c6a1 --- /dev/null +++ b/src/server/controlchan/commands/appe.rs @@ -0,0 +1,60 @@ +//! The RFC 959 Append (`APPE`) command +// +// This command causes the server-DTP to accept the data +// transferred via the data connection and to store the data in +// a file at the server site. If the file specified in the +// pathname exists at the server site, the data shall be +// appended to that file; otherwise the file shall be created. + +use crate::server::chancomms::DataChanCmd; +use crate::{ + auth::UserDetail, + server::controlchan::{ + Reply, ReplyCode, + command::Command, + error::ControlChanError, + handler::{CommandContext, CommandHandler}, + }, + storage::{Metadata, StorageBackend}, +}; +use async_trait::async_trait; + +#[derive(Debug)] +pub struct Appe; + +#[async_trait] +impl CommandHandler for Appe +where + User: UserDetail + 'static, + Storage: StorageBackend + 'static, + Storage::Metadata: Metadata, +{ + #[tracing_attributes::instrument] + async fn handle(&self, args: CommandContext) -> Result { + let mut session = args.session.lock().await; + + let (cmd, path): (DataChanCmd, String) = match args.parsed_command.clone() { + Command::Appe { path } => { + let path_clone = path.clone(); + (DataChanCmd::Appe { path }, path_clone) + } + _ => panic!("Programmer error, expected command to be APPE"), + }; + + let logger = args.logger; + match session.data_cmd_tx.take() { + Some(tx) => { + tokio::spawn(async move { + if let Err(err) = tx.send(cmd).await { + slog::warn!(logger, "APPE: could not notify data channel. {}", err); + } + }); + Ok(Reply::new(ReplyCode::FileStatusOkay, "Ready to receive data")) + } + None => { + slog::warn!(logger, "APPE: no data connection established for APPEing {:?}", path); + Ok(Reply::new(ReplyCode::CantOpenDataConnection, "No data connection established")) + } + } + } +} diff --git a/src/server/controlchan/commands/mod.rs b/src/server/controlchan/commands/mod.rs index a4e7fce6..7c4c1c9a 100644 --- a/src/server/controlchan/commands/mod.rs +++ b/src/server/controlchan/commands/mod.rs @@ -7,6 +7,7 @@ mod abor; mod acct; mod allo; +mod appe; mod auth; mod ccc; mod cdup; @@ -50,6 +51,7 @@ pub use self::md5::Md5; pub use abor::Abor; pub use acct::Acct; pub use allo::Allo; +pub use appe::Appe; pub use auth::{Auth, AuthParam}; pub use ccc::Ccc; pub use cdup::Cdup; diff --git a/src/server/controlchan/control_loop.rs b/src/server/controlchan/control_loop.rs index 5d5406d8..98db384f 100644 --- a/src/server/controlchan/control_loop.rs +++ b/src/server/controlchan/control_loop.rs @@ -472,6 +472,7 @@ where Command::Md5 { file } => Box::new(commands::Md5::new(file)), Command::Mlst { path } => Box::new(commands::Mlst::new(path)), Command::Mlsd { .. } => Box::new(commands::Mlsd), + Command::Appe { .. } => Box::new(commands::Appe), Command::Other { .. } => return Ok(Reply::new(ReplyCode::CommandSyntaxError, "Command not implemented")), }; diff --git a/src/server/controlchan/line_parser/parser.rs b/src/server/controlchan/line_parser/parser.rs index c9bb30f0..a265d76f 100644 --- a/src/server/controlchan/line_parser/parser.rs +++ b/src/server/controlchan/line_parser/parser.rs @@ -114,6 +114,14 @@ where let path = String::from_utf8_lossy(&path); Command::Stor { path: path.to_string() } } + "APPE" => { + let path = parse_to_eol(cmd_params)?; + if path.is_empty() { + return Err(ParseErrorKind::InvalidCommand.into()); + } + let path = String::from_utf8_lossy(&path); + Command::Appe { path: path.to_string() } + } "LIST" => { let line = parse_to_eol(cmd_params)?; let path = line diff --git a/src/server/controlchan/line_parser/tests.rs b/src/server/controlchan/line_parser/tests.rs index d61b02a4..d8a9b2f7 100644 --- a/src/server/controlchan/line_parser/tests.rs +++ b/src/server/controlchan/line_parser/tests.rs @@ -600,3 +600,38 @@ fn parse_site() { assert_eq!(parse(test.input), test.expected); } } + +#[test] +fn parse_appe() { + struct Test { + input: &'static str, + expected: Result, + } + let tests = [ + Test { + input: "APPE\r\n", + expected: Err(ParseErrorKind::InvalidCommand.into()), + }, + Test { + input: "APPE \r\n", + expected: Err(ParseErrorKind::InvalidCommand.into()), + }, + Test { + input: "APPE file.txt\r\n", + expected: Ok(Command::Appe { path: "file.txt".into() }), + }, + Test { + input: "APPE path/to/file.txt\r\n", + expected: Ok(Command::Appe { + path: "path/to/file.txt".into(), + }), + }, + Test { + input: "appe file.txt\r\n", + expected: Ok(Command::Appe { path: "file.txt".into() }), + }, + ]; + for test in tests.iter() { + assert_eq!(parse(test.input), test.expected); + } +} diff --git a/src/server/datachan.rs b/src/server/datachan.rs index 66e95ef6..2bedb22e 100644 --- a/src/server/datachan.rs +++ b/src/server/datachan.rs @@ -149,6 +149,9 @@ where DataChanCmd::Stor { path } => { self.exec_stor(path, start_pos).await; } + DataChanCmd::Appe { path } => { + self.exec_appe(path).await; + } DataChanCmd::List { path, .. } => { self.exec_list_variant(path, ListCommand::List).await; } @@ -308,6 +311,60 @@ where } } + #[tracing_attributes::instrument] + async fn exec_appe(self, path: String) { + let path_copy = path.clone(); + let full_path = self.cwd.join(&path); + let tx = self.control_msg_tx.clone(); + + // Get current file size, or 0 if file doesn't exist + let start_pos = match self.storage.metadata((*self.user).as_ref().unwrap(), &full_path).await { + Ok(meta) => meta.len(), + Err(_) => 0, + }; + + let start_time = Instant::now(); + let put_result = self + .storage + .put( + (*self.user).as_ref().unwrap(), + Self::reader(self.socket, self.ftps_mode, "appe").await, + full_path, + start_pos, + ) + .await; + let duration = start_time.elapsed(); + + match put_result { + Ok(bytes) => { + slog::info!( + self.logger, + "Successful APPE {:?}; Duration {}; Bytes copied {}; Transfer speed {}; start_pos={}", + &path_copy, + HumanDuration(duration), + HumanBytes(bytes), + TransferSpeed(bytes as f64 / duration.as_secs_f64()), + start_pos, + ); + + metrics::inc_transferred("appe", "success"); + + if let Err(err) = tx.send(ControlChanMsg::WrittenData { bytes, path: path_copy }).await { + slog::error!(self.logger, "Could not notify control channel of successful APPE: {:?}", err); + } + } + Err(err) => { + slog::warn!(self.logger, "Error during APPE transfer after {}: {:?}", HumanDuration(duration), err); + + categorize_and_register_error(&self.logger, &err, "appe"); + + if let Err(err) = tx.send(ControlChanMsg::StorageError(err)).await { + slog::error!(self.logger, "Could not notify control channel of error with APPE: {:?}", err); + } + } + } + } + #[tracing_attributes::instrument] async fn exec_list_variant(self, path: Option, command: ListCommand) { let path = self.resolve_path(path); diff --git a/tests/appe.rs b/tests/appe.rs new file mode 100644 index 00000000..98a1c20a --- /dev/null +++ b/tests/appe.rs @@ -0,0 +1,155 @@ +#![allow(missing_docs)] + +pub mod common; + +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use common::{read_from_server, send_to_server, tcp_connect, tcp_pasv_connect}; +use tokio::io::AsyncWriteExt; + +fn parse_pasv(line: &str) -> Result { + let body = line.split_once('(').and_then(|(_, rest)| rest.split_once(')')).ok_or("bad format")?.0; + let nums: Vec = body.split(',').filter_map(|s| s.trim().parse().ok()).collect(); + if nums.len() != 6 { + return Err("need 6 numbers"); + } + let port = u16::from(nums[4]) * 256 + u16::from(nums[5]); + Ok(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(nums[0], nums[1], nums[2], nums[3])), port)) +} + +fn unique_filename(prefix: &str) -> String { + let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos(); + format!("{}_{}.txt", prefix, ts) +} + +async fn login(stream: &tokio::net::TcpStream, buffer: &mut [u8]) { + assert_eq!(read_from_server(buffer, stream).await, "220 Welcome test\r\n"); + send_to_server("USER test\r\n", stream).await; + assert_eq!(read_from_server(buffer, stream).await, "331 Password Required\r\n"); + send_to_server("PASS test\r\n", stream).await; + assert_eq!(read_from_server(buffer, stream).await, "230 User logged in, proceed\r\n"); + send_to_server("TYPE I\r\n", stream).await; + assert_eq!(read_from_server(buffer, stream).await, "200 Always in binary mode\r\n"); +} + +async fn read_data_from_server(stream: &tokio::net::TcpStream) -> Vec { + let mut data = Vec::new(); + let mut buffer = [0u8; 1024]; + loop { + stream.readable().await.unwrap(); + match stream.try_read(&mut buffer) { + Ok(0) => break, + Ok(n) => data.extend_from_slice(&buffer[..n]), + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{}", e), + } + } + data +} + +#[tokio::test(flavor = "current_thread")] +async fn test_appe_to_existing_file() { + common::initialize().await; + + let stream = tcp_connect().await.unwrap(); + let mut buffer = vec![0_u8; 1024]; + let filename = unique_filename("appe_existing"); + + login(&stream, &mut buffer).await; + + // STOR initial content + send_to_server("PASV\r\n", &stream).await; + let resp = read_from_server(&mut buffer, &stream).await; + assert!(resp.starts_with("227 Entering Passive Mode")); + let addr = parse_pasv(resp).unwrap(); + + send_to_server(&format!("STOR {}\r\n", filename), &stream).await; + assert_eq!(read_from_server(&mut buffer, &stream).await, "150 Ready to receive data\r\n"); + + let mut data_stream = tcp_pasv_connect(addr).await.unwrap(); + send_to_server("Hello", &data_stream).await; + data_stream.shutdown().await.unwrap(); + drop(data_stream); + + assert_eq!(read_from_server(&mut buffer, &stream).await, "226 File successfully written\r\n"); + + // APPE additional content + send_to_server("PASV\r\n", &stream).await; + let resp = read_from_server(&mut buffer, &stream).await; + assert!(resp.starts_with("227 Entering Passive Mode")); + let addr = parse_pasv(resp).unwrap(); + + send_to_server(&format!("APPE {}\r\n", filename), &stream).await; + assert_eq!(read_from_server(&mut buffer, &stream).await, "150 Ready to receive data\r\n"); + + let mut data_stream = tcp_pasv_connect(addr).await.unwrap(); + send_to_server(" World", &data_stream).await; + data_stream.shutdown().await.unwrap(); + drop(data_stream); + + assert_eq!(read_from_server(&mut buffer, &stream).await, "226 File successfully written\r\n"); + + // RETR to verify content + send_to_server("PASV\r\n", &stream).await; + let resp = read_from_server(&mut buffer, &stream).await; + assert!(resp.starts_with("227 Entering Passive Mode")); + let addr = parse_pasv(resp).unwrap(); + + send_to_server(&format!("RETR {}\r\n", filename), &stream).await; + assert_eq!(read_from_server(&mut buffer, &stream).await, "150 Sending data\r\n"); + + let data_stream = tcp_pasv_connect(addr).await.unwrap(); + let content = read_data_from_server(&data_stream).await; + drop(data_stream); + + assert_eq!(content, b"Hello World"); + assert_eq!(read_from_server(&mut buffer, &stream).await, "226 Successfully sent\r\n"); + + common::finalize().await; +} + +#[tokio::test(flavor = "current_thread")] +async fn test_appe_to_new_file() { + common::initialize().await; + + let stream = tcp_connect().await.unwrap(); + let mut buffer = vec![0_u8; 1024]; + let filename = unique_filename("appe_new"); + + login(&stream, &mut buffer).await; + + // APPE to non-existent file (should create it) + send_to_server("PASV\r\n", &stream).await; + let resp = read_from_server(&mut buffer, &stream).await; + assert!(resp.starts_with("227 Entering Passive Mode")); + let addr = parse_pasv(resp).unwrap(); + + send_to_server(&format!("APPE {}\r\n", filename), &stream).await; + assert_eq!(read_from_server(&mut buffer, &stream).await, "150 Ready to receive data\r\n"); + + let mut data_stream = tcp_pasv_connect(addr).await.unwrap(); + send_to_server("New content", &data_stream).await; + data_stream.shutdown().await.unwrap(); + drop(data_stream); + + assert_eq!(read_from_server(&mut buffer, &stream).await, "226 File successfully written\r\n"); + + // RETR to verify content + send_to_server("PASV\r\n", &stream).await; + let resp = read_from_server(&mut buffer, &stream).await; + assert!(resp.starts_with("227 Entering Passive Mode")); + let addr = parse_pasv(resp).unwrap(); + + send_to_server(&format!("RETR {}\r\n", filename), &stream).await; + assert_eq!(read_from_server(&mut buffer, &stream).await, "150 Sending data\r\n"); + + let data_stream = tcp_pasv_connect(addr).await.unwrap(); + let content = read_data_from_server(&data_stream).await; + drop(data_stream); + + assert_eq!(content, b"New content"); + assert_eq!(read_from_server(&mut buffer, &stream).await, "226 Successfully sent\r\n"); + + common::finalize().await; +}