从零实现 Rust FTP Server

27 minute read Published: 2022-05-28

在看《代码整洁之道》的同时写本项目,希望通过一个小但五脏俱全的工程项目实践书中原则,实践 TDD 及熟悉 Rust。FTP 本身足够简单,且无需关注太多底层细节,在工程上容易设计,因此选择该协议为实现目标

在此记录开发过程以理清思路,项目 Repo


Day0: Echo Server

从最简单的功能开始:Echo server,期望实现一个服务器,可以同时接受多个 Client 的 TCP 连接,并对每个 Client 执行一样的逻辑————接受他们发送的整行输入 (\r\n 结尾的字符串),然后原样返回这行

在 Rust 中,TCP 通信可以通过 std::net 进行,std::net::TcpListener 提供了监听端口并接受连接的方法;std::net::TcpStream 则抽象了一个 socket 连接(的一端),可以通过 TcpStream::connect(addr) 新建一个连接到某 addr 的连接,TcpListener 在成功建立一个连接后也会返回一个 TcpStream

因此这个 Echo server 的代码就像:

fn client_handler(mut stream: TcpStream) {
    let mut line = String::new();
    let mut reader = BufReader::new(stream.try_clone().unwrap());
    while let Ok(_) = reader.read_line(&mut line) {
        stream.write(format!("{}", line.trim()).as_bytes()).unwrap();
        line.clear();
    }
}

fn main() {
    let listener = TcpListener::bind("0.0.0.0:8080").unwrap();
    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                thread::spawn(move || {
                    client_handler(stream);
                });
            },
            Err(e) => {
                println!("Error: {}", e);
            }
        }
    }
}

它只是监听到 8080 端口的所有 TCP 连接,对每个 Client 都生成一个线程,并且用一个 BufReader 来简化 “读取一行” 的操作。我们可以用 nc 127.0.0.1 8080 来测试这个服务器

Day1: Command, Response and Session

考虑 FTP 协议的组成,它要求客户端和服务端保有一个专门用来发送指令的 TCP 连接,并在需要时建立数据连接。先考虑指令连接,一个最简单的模式就是客户端发送一行 \r\n 结尾的指令,然后服务端解析、执行指令对应的操作,并可能给客户端一个类似 220 server is ready\r\n 的回复,接下来考虑如何抽象这两者并实现它们

Response

先从服务器的回复开始说起,服务器对于命令连接的回复往往非常简单,格式是返回码 + 一段描述性的文本,返回码有类似操作成功、操作失败的语义,是协议规定的,描述性的文本可能是默认的(比如服务器的问候语),也可能需要包含一些特定信息(如打开连接的地址)。同时,在我们需要给用户发送回复的地方,我们可能会调用类似 TcpStream::write 的接口来发送回复,因此我们希望这个回复类能方便地转换成字符串

因此我们有了这个 Response 结构该有的两个要素:code 和 message,而将它转换成字符串的格式则是 {code:} {message:}\r\n。为了使用方便,我希望有很多个这种 Response 结构,它们的 code 是固定的,而 message 可变,这样我就能随时取用一个有我期望的 code 结构并加上我想要的 message 来产生一个回复,而不用每次都手写一个 code。当然我们也可以不为某个 Response 设置默认回复,这样就能强制使用者填充它的 message

遵循 TDD 的原则,我们先看看如果要编写一个上述功能的模块,使用场景会是如何,假设我们现在只需要两类回复消息,一个是用户新连接时的问候 Greeting220,一个是命令错误的回复 SyntaxErr500,我把它们的状态码都写到末尾,以供方便地选择回复类型

下面的测试非常简单,仅仅是描述了我们的 Response 结构的几个功能

  1. 可能可以调用 ::default() 来获取一个默认消息
  2. 可能可以调用 ::new() 来自定义消息体
  3. 实现了 to_string() 或是相关的 trait 使其能转换到字符串
#[cfg(test)]
mod response_test {
    use super::*;

    fn assert_response_equal_str<T: ResponseMessage>(resp: T, s: &str) {
        assert_eq!(resp.to_string(), format!("{s:}\r\n"));
    }

    #[test]
    fn test_resp_default_message() {
        assert_response_equal_str(Greeting220::default(), "220 Welcome to the rust FTP Server");
        assert_response_equal_str(
            SyntaxErr500::default(),
            "500 Command not executed: syntax error",
        );
    }

    #[test]
    fn test_resp_custom_message() {
        assert_response_equal_str(
            UnknownRespWithoutDefaultMessage999::new("unknown"),
            "999 unknown",
        );
    }
}

实现的思路是,有一个 ResponseMessage trait 提了两个接口,分别获取 code 和 message。所有其他具体的 Response 类都实现它。再针对每个类实现其 Display trait,用于提供 to_string 方法。每个 struct 可能会保存传入的字符串,也可能不用任何输入,直接采用默认消息,前者类比测试代码中的 ::new 后者类比 ::default。所以我们还要为每个 struct 实现 new 方法,为某些有默认消息体的 struct 实现 Default trait

一个完整的例子是

use std::fmt::Display;

/// all response has a response code and a message
pub trait ResponseMessage: Sized + Display {
    fn code(&self) -> u16;
    fn message(&self) -> &str;
}

pub struct Greeting220(Option<String>);
impl ResponseMessage for $structname {
    fn code(&self) -> u16 {
        220
    }
    fn message(&self) -> &str {
        if let Some(s) = &self.0 {
            return s;
        }
        "default message"
    }
}
impl Display for Greeting220 {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{} {}\r\n", self.code(), self.message())
    }
}
impl Greeting220 {
    pub fn new<S: AsRef<str>>(s: S) -> Self {
        Self(Some(s.as_ref().to_string()))
    }
}

impl Default for Greeting220 {
    fn default() -> Self {
        Self(None)
    }
}

真是个艰巨的任务,因为最后这类 struct 会有几十个,上述代码片段需要对每个 struct 都写一遍,马上就会产生一坨复制粘贴的屎。Don't repeat yourself. 不过宏可以解决这个小问题:上述代码在重复出现时,其基本结构是一样的,只不过 struct 名,code 和 message 会不同,因此可以构造这样的宏:

macro_rules! impl_display {
    ($($structname: ty), *) => {
        $(
            impl Display for $structname {
                fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
                    write!(f, "{} {}\r\n", self.code(), self.message())
                }
            }
        )*
    };
}

macro_rules! response {
    ($structname: ident, $code: literal) => {
        pub struct $structname(String);
        impl_display!($structname);
        impl ResponseMessage for $structname {
            fn code(&self) -> u16 {
                $code
            }
            fn message(&self) -> &str {
                &self.0
            }
        }

        impl $structname {
            /// Create response with custom body
            #[allow(dead_code)]
            pub fn new<S: AsRef<str>>(s: S) -> Self {
                Self(s.as_ref().to_string())
            }
        }
    };
    ($structname: ident, $code: literal, $default_message: literal) => {
        pub struct $structname(Option<String>);
        impl_display!($structname);
        impl ResponseMessage for $structname {
            fn code(&self) -> u16 {
                $code
            }
            fn message(&self) -> &str {
                if let Some(s) = &self.0 {
                    return s;
                }
                $default_message
            }
        }

        impl $structname {
            /// Create response with custom body
            #[allow(dead_code)]
            pub fn new<S: AsRef<str>>(s: S) -> Self {
                Self(Some(s.as_ref().to_string()))
            }
        }

        impl Default for $structname {
            fn default() -> Self {
                Self(None)
            }
        }
    };
}

核心在于 response 宏,它的用法是:

response!(Greeting220, 220, "Welcome to the rust FTP Server");
response!(SyntaxErr500, 500, "Command not executed: syntax error");
response!(UnknownRespWithoutDefaultMessage999, 999);

我们声明了两个消息码分别为 220 和 500 的、有默认消息体的 ResponseMessage,又声明了一个没有默认消息体的 ResponseMessage。在后续使用这些 ResponseMessage 的时候,有默认消息体的可以通过 default() 来获取默认消息体,也可以通过 new("custom message body") 来生成自定义消息体,而没有默认消息体的结构则只能使用 new

Command

再看看 Command 的实现,看看我们期望用什么方式来使用它

  1. Command 总是解析自用户发来的一行输入,因此至少要提供一个 parse 方法,这个方法返回一个解析后的 Command,也可以在命令不合法的时候告知调用者
  2. Command 之后需要在某处被执行,因此其内部需要保存诸如命令类型及命令相关的参数等信息
  3. 2 中执行的派发逻辑是每个 Command 类型对应一个 handler,可以用 match + enum 实现这一点,但也可以思考能不能减少这种样板代码

对于 1.,我们应该会期望 Command 是一个 Enum,而对于 2.,我们期望这个 Enum 总能保存一个参数数组(或者更严格一点,保存固定数量个参数),因此可能会有这样的设计

enum Command {
    CmdA(Vec<String>),
    CmdB(Vec<String>),
    CmdC(Vec<String>),
}

上面的 Vec<String> 重复太多次了,同样可以用一个宏来简化它

macro_rules! commands {
    ($($cmd: ident), *) => {
        pub enum Command {
            $($cmd(Vec<String>),)*
        }
    };
}

commands!(CmdA, CmdB);

接下来为它实现一个 parse 方法,为了方便地从用户输入的字符串转成 Enum,使用 strum 这个 crate,刚才的宏被修改为

macro_rules! commands {
    ($($cmd: ident), *) => {
        #[derive(EnumString, Debug)]
        #[strum(ascii_case_insensitive)]
        pub enum Command {
            $($cmd(Vec<String>),)*
        }

        impl Command {
            /// Parse string to command,
            /// All arguments will be collected as Strings without checking their validity since
            ///     exec should do it
            /// return `None` if command type is unexist
            pub fn parse<S: AsRef<str>>(s: S) -> Option<Self> {
                let tokens = s.as_ref().split_ascii_whitespace().collect::<Vec<_>>();
                if tokens.is_empty() {
                    return None;
                }

                let parse_result = Command::from_str(tokens[0]);
                match parse_result {
                    Ok(command) => {
                        let args = tokens[1..].iter().map(|s| s.to_string()).collect();
                        match command {
                            $($cmd => Some(Self::$cmd(args)),)*
                        }
                    },
                    _ => None
                }
            }

            pub fn get_args(&self) -> &Vec<String> {
                match self {
                    $(Self::$cmd(v) => &v)*
                }
            }
        }
    };
}

commands!(Quit);

注意到我顺便实现了获取 Command 的参数的 get_args 方法。这段宏展开后会变成大量的 pattern matching 语句。至此需求 1., 2. 都完成了,相关的测试代码如下

#[cfg(test)]
mod command_test {
    use super::*;

    #[test]
    fn test_parse_valid() {
        let quit = Command::parse("QUIT\r\n").unwrap();
        assert!(matches!(quit, Command::Quit(_)));
        assert!(quit.get_args().is_empty());

        let case_insensitive_quit = Command::parse("qUiT\r\n").unwrap();
        assert!(matches!(case_insensitive_quit, Command::Quit(_)));
        assert!(case_insensitive_quit.get_args().is_empty());

        let quit_with_arguments = Command::parse("quit a b c 1 2 3\r\n").unwrap();
        assert!(matches!(quit_with_arguments, Command::Quit(_)));
        assert_eq!(quit_with_arguments.get_args().len(), 6);
        for (i, ch) in ["a", "b", "c", "1", "2", "3"].iter().enumerate() {
            assert_eq!(quit_with_arguments.get_args()[i], *ch);
        }
    }

    #[test]
    fn test_parse_unexist() {
        assert!(Command::parse("\r\n").is_none());
        assert!(Command::parse("NONE arg1 arg2 arg3\r\n").is_none());
    }
}

至于需求 3.,由于 Command 的具体执行与服务端与客户端的会话相关,因此暂时不在 Command 中实现。在这个意义上,Command 只是提供了一个解析字符串的接口,具体执行逻辑放到后面的 Session 中

Session

由于 FTP 协议是有状态的:用户可以登陆认证,也可以通过指令切换自己所在的目录。又由于对于每个客户端,我们都需要生成一个线程来进行交互,因此为每个用户维护一组 “状态” 是有必要的,在这里我将它们抽象为一个 Session,它管理了服务端与某一个用户的一切连接信息,包括其 TcpStream、认证状态、数据传输连接等等。考虑我们希望 Session 具有的功能:

  1. 它保管与客户端的连接
  2. 因为 1.,我们与客户端的通信都通过它进行,如
    1. 接收客户端的命令
    2. 执行命令(命令可能改变 Session 中的状态,如用户从未登录变为登录,或是工作目录改变)
    3. 向客户端回复
    4. 打开数据连接并传送数据
    5. ...

我们与每个客户端的交互应该像:

let mut session = Session::new(stream);
session.send_msg(response::Greeting220::default())?; // hello

// receive command and exec it
loop {
    let cmd = session.get_cmd()?;
    if let Some(cmd) = cmd {
        session.exec_cmd(cmd)?;
    } else {
        // parse failed
        session.send_msg(response::SyntaxErr500::default())?;
    }
}

Session 提供了诸如 send_msg,get_cmd,exec_cmd 这样的接口,因此可以有如下单元测试

#[test]
fn test_create_session() {
    let (_, _) = setup::setup_client_and_session();
}

#[test]
fn test_send_msg() {
    let (mut client, mut session) = setup::setup_client_and_session();

    let msg = "message";
    session.send_msg(msg).unwrap();
    assert_string_trim_eq(client.get_msg_trimed().unwrap(), msg);
}

#[test]
fn test_send_resp() {
    let (mut client, mut session) = setup::setup_client_and_session();

    session.send_msg(response::UnknownRespWithoutDefaultMessage999::new("message")).unwrap();
    assert_string_trim_eq(client.get_msg_trimed().unwrap(), "999 message");
}

#[test]
fn test_get_cmd() {
    let (mut client, mut session) = setup::setup_client_and_session();

    client.send_msg_add_crlf("QUIT arg").unwrap();
    let cmd = session.get_cmd().unwrap();
    assert!(cmd.is_some());
    assert!(matches!(cmd.unwrap(), Command::Quit(_)));
}

#[test]
fn test_exec_quit() {
    let (mut client, mut session) = setup::setup_client_and_session();
    client.get_msg_trimed().unwrap(); // skip hello

    session.exec_cmd(Command::Quit(vec![])).unwrap();
    assert!(client.get_msg_trimed().is_err()); // conn closed
}

其中 send_msg, get_cmd 的实现比较简单,而 exec_cmd 最朴素的思想是 pattern matching,即为每个 Command 类型匹配一个 handler,类似:

match cmd {
    Quit(_) => handle_quit(),
    List(args) => hanlde_list(args),
    _ => {}
}

这里同样可以用宏来减少样板代码(用到了 paste 这个 crate,用于在宏中拼接 ident),注册之后只需要为每个 Command 实现 Session::exec_<commandname> 方法即可

macro_rules! register_command_handlers {
    ($($cmd: ident), *) => {
        impl crate::Session {
            pub fn exec_cmd(&mut self, cmd: Command) -> anyhow::Result<()> {
                match cmd {
                    $(
                        Command::$cmd(arg) => paste!{ self.[<exec_ $cmd:lower>](arg) },
                    )*
                }
            }
        }

    }
}

register_command_handlers!(Quit);

至此便基本完成了 Command, Response, Session 三者的抽象,之后只要根据 FTP 协议添加不同类型的 Command 和 Response,并在 Session 中实现执行 Command 的逻辑及维护相关状态即可。到这里为止的代码在 repo v0.1

Day2: USER and PASS

FTP 是有用户概念的,前面提到需要为每个 Session 维护一个登录状态,用户通过 USER, PASS 两个指令来实现登录。这个过程可以被描述为这样的状态机,状态转移上的数字是给用户的返回码:

            |---------------|
            |               |----
            |   Unloggedin  |   | PASS
            |               |<--- 503
            |---------------|
                | WRONG^                            PASS
            USER|  PASS|                           _______
            331 |   530|                           |     | 230
                v      |                           |     v
            |---------------|                  |---------------|
            |               | CORRECT PASS 230 |               |----
            |    Username   |----------------->|   Loggedin    |   | USER
            |               |                  |               |<--- 530
            |---------------|                  |---------------|

而为了避免 cornercase,仍然遵循 TDD 的原则,先写测试再写代码,一些单元测试如下,基本设计思路是测试每个指令在状态机不同状态下的表现(即试图覆盖所有状态转移):

mod test_user {
    use super::*;
    #[test]
    fn test_exec_user_unlogged() {
        let (_, mut session) = setup::setup_client_and_session();

        session
            .exec_cmd(Command::User(vec![USERNAME.into()]))
            .unwrap();
        assert_eq!(session.login_status, LoginStatus::Username(USERNAME.into()));
    }

    #[test]
    fn test_exec_user_username() {
        let (_, mut session) = setup::setup_client_and_session();

        session.login_status = LoginStatus::Username("oldusername".into());
        session
            .exec_cmd(Command::User(vec!["newusername".into()]))
            .unwrap();

        // can change username
        assert_eq!(
            session.login_status,
            LoginStatus::Username("newusername".into())
        );
    }

    #[test]
    fn test_exec_user_loggedin() {
        let (_, mut session) = setup::setup_client_and_session();

        session.login_status = LoginStatus::Loggedin("oldusername".into());
        session
            .exec_cmd(Command::User(vec!["newusername".into()]))
            .unwrap();

        // cannot change user
        assert_eq!(
            session.login_status,
            LoginStatus::Loggedin("oldusername".into())
        );
    }
}

同时,在集成测试层级也增加测例,模仿一个客户端做登录操作:

#[test]
fn test_login_success() {
    let mut client = setup_client();

    client.get_msg_trimed().unwrap();

    client.send_msg_add_crlf("USER anonymous").unwrap();
    assert_eq!(client.get_msg_code().unwrap(), 331);

    client.send_msg_add_crlf("PASS anonymous").unwrap();
    assert_eq!(client.get_msg_code().unwrap(), 230);
}

#[test]
fn test_login_fail() {
    let mut client = setup_client();

    client.get_msg_trimed().unwrap();

    client.send_msg_add_crlf("USER anonymous").unwrap();
    assert_eq!(client.get_msg_code().unwrap(), 331);

    client.send_msg_add_crlf("PASS wrong").unwrap();
    assert_eq!(client.get_msg_code().unwrap(), 530);

    client.send_msg_add_crlf("PASS anonymous").unwrap();
    assert_eq!(client.get_msg_code().unwrap(), 503);
}

写好单元测试之后就可以面向单元测试开发了,主要开发工作在于:增加几个 ResponseMessage,增加 PASS, USER Command,在 Session 里维护一个状态,并实现对应的 exec_ 函数,完整的代码在 repo v0.2

Day3: Command 参数检查

之前的版本中,Command::parse 只会解析 Command 的类别,并将命令剩余部分按 ascii_white_space 分割,产生一组参数,没有为每个命令检查参数个数,这样做让每个 Command 在执行时有了自己检查参数的灵活性,但也引入了许多重复代码————每个 exec 函数都需要检查一遍参数。因此我决定重构这一部分:把参数量检查的工作挪到 Command::parse 中进行,并保证如果一个 parse 成功,则它只会返回规定数目个非空参数。这样 exec 就只需要放心地取出参数并检查其值的合法性即可

为达到此目的,有以下几个修改点:

  1. parse 应当从返回 Option 变成返回 Result, 因为我们需要更细节的解析失败的原因,None 无法传递这一信息
  2. parse 里应该增加参数数量的检查,为此还需要在定义 Command 时记录每个 Command 需要几个参数,同时参数解析的规则变为:如果需要 n 个参数,则在前 n-1 个空白符处将参数字符串分割成 n 部分,如 n = 2 且参数字符串为 A B C D,则最后的参数为 AB C D

为此先编写 parse 的单元测试:

#[test]
fn test_parse_right_arguments() {
    let user_with_arguments = Command::parse("user username\r\n").unwrap();
    assert!(matches!(user_with_arguments, Command::User(_)));
    assert_eq!(user_with_arguments.get_args().len(), 1);
    assert_eq!(user_with_arguments.get_args()[0], "username");
}

#[test]
fn test_parse_too_many_arguments() {
    let quit_with_arguments = Command::parse("quit a b c 1 2 3\r\n").unwrap();
    assert!(matches!(quit_with_arguments, Command::Quit(_)));
    assert_eq!(quit_with_arguments.get_args().len(), 0);

    let user_with_arguments = Command::parse("user username1 username2\r\n").unwrap();
    assert!(matches!(user_with_arguments, Command::User(_)));
    // arguments will be concat if they are too many
    assert_eq!(user_with_arguments.get_args().len(), 1);
    assert_eq!(user_with_arguments.get_args()[0], "username1 username2");
}

#[test]
fn test_parse_too_less_arguments() {
    let user_with_no_arguments = Command::parse("user\r\n");
    // is error and response should be 501
    let err = user_with_no_arguments.err().unwrap();
    assert!(err.to_string().starts_with("501"));
}

为了修改 Command 使得其包含参数数目信息,我需要修改一下之前的声明 Command 的宏定义:

// From
macro_rules! commands {
    ($($cmd: ident), *) => {
    }
}
commands!(Quit, User, Pass);

// To
macro_rules! commands {
    ($($cmd: ident ($argc: literal)), *) => {
    }
}
commands!(Quit(0), User(1), Pass(1));

然后就可以添加参数数目检查相关的逻辑:

let parse_result = Command::from_str(tokens[0]);
match parse_result {
    Ok(command) => {
        match command {
            $(
                Self::$cmd(_) => {
                    // TODO: deal with escape char or space in arguments
                    if $argc == 0 {
                        return Ok(Self::$cmd(vec![]));
                    }
                    let mut tokens = tokens.into_iter().skip(1).map(|s| s.to_string());
                    let mut args = Vec::with_capacity($argc);
                    loop {
                        if args.len() + 1 == $argc {
                            let last_argument = tokens.collect::<Vec<String>>().join(" ");
                            if !last_argument.is_empty() {
                                args.push(last_argument);
                            }
                            break;
                        }
                        if let Some(token) = tokens.next() {
                            args.push(token);
                        } else {
                            break;
                        }
                    };

                    #[allow(unused_comparisons)]
                    if args.len() < $argc {
                        return Err(anyhow!(response::InvalidParameter501::new("Invalid number of arguments.").to_string()));
                    }
                    return Ok(Self::$cmd(args));
                }
            )*
        }
    },
    _ => Err(anyhow!(response::SyntaxErr500::new("Command not understood.").to_string())),
}

完整的代码在 repo v0.3

Day4: PASV, PORT and LIST

FTP 的数据传输需要通过另一个 TCP 连接进行,这个连接的建立分成两种模式:PORT 和 PASV

PORT 和 PASV 都是一次性的,每次数据传输前都需要指定。服务端完成数据传输之后就需要关闭相应的数据传输连接,并通过 226 告知告知客户端关闭数据连接。出于简化考虑,暂时先只实现 PASV,在 Session 中添加一个记录当前传输状态的枚举:

#[derive(Debug)]
enum TransferMode {
    NotSpecified,
    // Pasv(port_listening, listener)
    Pasv(u16, TcpListener),
}
pub struct Session {
    // ...
    transfer_mode: TransferMode,
}

只有 PASV 的情况下状态机比较简单————每次接受 PASV 指令时都选择监听一个端口,并建立对该端口的监听,每次数据传输完毕后都将 Transfer Mode 改回 Not Specified。建立 PASV 状态的逻辑如下,在已经是 PASV 状态的时候替换掉 Transfer Mode 会导致之前监听的端口自动关闭(因为 TcpListener 被 drop 掉了):

fn exec_pasv(&mut self, _args: Vec<String>) -> Result<String> {
    check_permission_or_return!(self);

    // Does nothing when is in pasv mode already
    if let Some(port) = portpicker::pick_unused_port() {
        if let Ok(listener) = TcpListener::bind(format!("{LISTENING_HOST:}:{port:}")) {
            debug!("Entering pasv mode, listening client on {port:}");
            self.transfer_mode = TransferMode::Pasv(port, listener);

            let (p1, p2) = (port / 256, port % 256);
            let comma_hostname = hostname_to_comma_hostname(get_local_hostname());
            return Ok(response::PasvMode227::new(format!("({comma_hostname:},{p1:},{p2:})")).to_string());    
        }
    }
    error!("No avalible port for pasv or cannot establish listener.");
    Err(anyhow!(response::ServiceNotAvalible421::default().to_string()))
}

在此基础上,数据传输指令便很容易实现,一个例子是 LIST:

fn exec_list(&mut self, _args: Vec<String>) -> Result<String> {
    check_permission_or_return!(self);
    let transfer_mode = std::mem::replace(&mut self.transfer_mode, TransferMode::NotSpecified);
    match transfer_mode {
        TransferMode::NotSpecified => Ok(response::NoModeSpecified425::default().to_string()),
        TransferMode::Pasv(_, listener) => {
            if let Ok((mut stream, _)) = listener.accept() {
                self.send_msg_check_crlf(response::DataTransferStarts150::default())?;
                
                // LIST LOGIC 
                stream.write_all(".\r\n..\r\nthis\r\noutput\r\nis\r\nfake\r\n".as_bytes())?;
                stream.flush()?;
                // END
                
                return Ok(response::DataTransferFinished226::default().to_string());
            }
            Err(anyhow!(response::ServiceNotAvalible421::default().to_string()))
        },
    }
}

同时也很容易发现,对于 LIST 而言,只有 LIST LOGIC 那两行代码是 LIST 独有的逻辑,而其他代码都是所有数据传输指令的公共逻辑:建立数据连接。因此可以将外层代码抽象出来(类似装饰器):

/// decorate the data_transfer_logic with data conn management logic, so the inner logic don't need to care about it
fn data_connection_wrapper<F: Fn(&mut TcpStream) -> Result<()>>(&mut self, data_transfer_logic: F) -> Result<String> {
    let transfer_mode = std::mem::replace(&mut self.transfer_mode, TransferMode::NotSpecified);
    match transfer_mode {
        TransferMode::NotSpecified => Ok(response::NoModeSpecified425::default().to_string()),
        TransferMode::Pasv(_, listener) => {
            if let Ok((mut stream, _)) = listener.accept() {
                self.send_msg_check_crlf(response::DataTransferStarts150::default())?;
                data_transfer_logic(&mut stream)?;
                return Ok(response::DataTransferFinished226::default().to_string());
            }
            Err(anyhow!(response::ServiceNotAvalible421::default().to_string()))
        },
    }
}

fn exec_list(&mut self, _args: Vec<String>) -> Result<String> {
    check_permission_or_return!(self);
    self.data_connection_wrapper(|stream| -> Result<()> {
        stream.write_all(".\r\n..\r\nthis\r\noutput\r\nis\r\nfake\r\n".as_bytes())?;
        stream.flush()?;
        Ok(())
    })
}

真正的数据传输逻辑被抽象成了一个 closure,传递给处理数据连接建立的包装函数(本来期望用一个接受数据传输的逻辑块的宏实现,但是由于 macro hygiene 的存在而失败了):

macro_rules! data_connection_wrapper {
    ($self: ident, $logic_block: block) => {
        let transfer_mode = std::mem::replace(&mut self.transfer_mode, TransferMode::NotSpecified);
        match transfer_mode {
            TransferMode::NotSpecified => Ok(response::NoModeSpecified425::default().to_string()),
            TransferMode::Pasv(_, listener) => {
                if let Ok((mut stream, _)) = listener.accept() {
                    self.send_msg_check_crlf(response::DataTransferStarts150::default())?;
                    $logic_block?;
                    return Ok(response::DataTransferFinished226::default().to_string());
                }
                Err(anyhow!(response::ServiceNotAvalible421::default().to_string()))
            },
        }
    };
}

data_connection_wrapper({
    stream.write_all("".as_bytes())?;
    stream.flush()?;
    Ok(()) 
});

这段宏展开后如下,其中 stream 由于 macro hygiene 的存在在传入的代码块中不可见:

let transfer_mode = std::mem::replace(&mut self.transfer_mode, TransferMode::NotSpecified);
match transfer_mode {
    TransferMode::NotSpecified => {
        Ok(response::NoModeSpecified425::default().to_string())
    }
    TransferMode::Pasv(_, listener) => {
        if let Ok((mut stream, _)) = listener.accept() {
            self.send_msg_check_crlf(response::DataTransferStarts150::default())?;
            {
                // seems ok here, 
                // but macro hygiene dis-allow the `stream` to be visible here
                stream.write_all("".as_bytes())?;
                stream.flush()?;
                Ok(())
            }?;
            return Ok(response::DataTransferFinished226::default().to_string());
        }
        Err(::anyhow::private::must_use({
            use ::anyhow::private::kind::*;
            let error = match response::ServiceNotAvalible421::default().to_string() {
                error => (&error).anyhow_kind().new(error),
            };
            error
        }))
    }
};

而上面出现了几次的 check_permission_or_return! 则是一小段检查 Session 的 login_status 是否为已登录的逻辑,如果未登录则直接返回 530:

macro_rules! check_permission_or_return {
    ($self: ident) => {
        match $self.login_status {
            LoginStatus::Username(_) | LoginStatus::Unloggedin => {
                debug!("User not logged in.");
                return Ok(response::NotLoggedin530::default().to_string());
            },
            _ => {}
        };
    };
}

至此就实现了 PASV 和简单的数据传输逻辑(加上一个假的 LIST 指令,用于测试数据连接),这部分代码见 repo v0.4