CS144-check3

本文最后更新于:2024年3月23日 下午

本次代码存在面向测试开发的嫌疑,亟待重构

作为端到端的双工协议,有了接收器当然要有对等端的发送器了。本节课我们试着写一个TCP协议的发送端。

总览

你的TCPSender的责任包括:

  • 跟踪接收方的窗口(接收带有它们的确认号和窗口大小的TCPReceiverMessages)。
  • 尽可能填充窗口,通过从ByteStream读取数据,创建新的TCP段(必要时包括SYN和FIN标志),并发送它们。发送方应该继续发送段,直到窗口被填满或者ByteStream没有更多数据可以发送。
  • 跟踪已发送但尚未被接收方确认的段——我们称这些为“未确认”段。
  • 如果发送后过了足够的时间且它们还没有被确认,则重新发送未确认的段。

TCPSender如何知道一个段是否丢失

你的TCPSender将发送一系列的TCPSenderMessages。每个消息将包含一个(可能为空的)来自出站ByteStream的子字符串,该字符串通过序列号进行索引,以指示其在流中的位置,并在流的开始处用SYN标志标记,在结束处用FIN标志标记。

除了发送这些段之外,TCPSender还必须跟踪其未确认的段,直到它们所占用的序列号被完全确认。定期地,TCPSender的所有者将调用TCPSender的tick方法,表示时间的流逝。TCPSender负责检查其未确认的TCPSenderMessages集合,并决定最早发送的段是否已经等待了太长时间而没有收到确认(即,其所有序列号都未被确认)。如果是这样,它需要被重传(再次发送)。

以下是“等待了太长时间”的规则。你将实现这个逻辑,它有些细节,但我们不希望你担心隐藏的测试用例会使你犯错,或者把这看作SAT考试中的文字题。我们将在本周给你一些合理的单元测试,并在实验室4中提供更完整的集成测试,一旦你完成了整个TCP实现。只要你100%通过这些测试,并且你的实现是合理的,你就会没问题。

我为什么要做这个?

总体目标是让发送方及时检测到丢失的段,并需要重新发送。等待重发前的时间长度很重要:你不希望发送方等待太久再发送一个段(因为这会延迟字节流向接收应用程序的传输),但你也不希望它重发一个本来如果发送方再等一会儿就会被确认的段——那会浪费互联网的宝贵容量。

  1. 每隔几毫秒,你的TCPSender的tick方法将被调用,并带有一个参数,告诉它自上次调用该方法以来已经过了多少毫秒。使用这个来维护TCPSender已经存活的总毫秒数。请不要尝试从操作系统或CPU调用任何“时间”或“时钟”功能——tick方法是你唯一接触时间流逝的方式。这使得事情保持确定性和可测试性。

  2. 当TCPSender被构造时,它被给定一个参数,告诉它重传超时(RTO)的“初始值”。RTO是在重发未确认的TCP段之前等待的毫秒数。RTO的值会随着时间变化,但“初始值”保持不变。启动代码将RTO的“初始值”保存在一个名为initial RTO ms的成员变量中。

  3. 你将实现重传计时器:一个可以在特定时间启动的闹钟,一旦RTO时间流逝,闹钟就会响起(或“到期”)。我们强调,这个时间流逝的概念来自于tick方法的调用——而不是获取实际的时间。

  4. 每次发送包含数据的段(在序列空间中长度非零)时(无论是第一次发送还是重传),如果计时器没有运行,就启动它,使它在RTO毫秒后到期(对于当前的RTO值)。所谓的“到期”,我们的意思是在未来的某个时间点,时间将会用完。

  5. 当所有未确认的数据都已经得到确认时,停止重传计时器。

  6. 如果调用了tick并且重传计时器已经到期:
    (a) 重传最早的(序列号最低的)还没有被TCP接收方完全确认的段。你需要在某种内部数据结构中存储未确认的段,以便能够做到这一点。
    (b) 如果窗口大小非零:
    i. 跟踪连续重传的次数,并且因为你刚刚重传了某些东西,所以将其增加。你的TCPConnection将使用这个信息来决定连接是否无望(连续重传次数太多)并需要中止。
    ii. 将RTO的值加倍。这称为“指数退避”——它通过在糟糕的网络上减慢重传速度来避免进一步添乱。
    (c) 重置重传计时器,并启动它,使其在RTO毫秒后到期(考虑到你可能刚刚加倍了RTO的值!)。

  7. 当接收方给发送方一个ackno,确认成功接收到新数据(ackno反映的绝对序列号比任何之前的ackno都要大)时:
    (a) 将RTO重置为其“初始值”。
    (b) 如果发送方有任何未确认的数据,重新启动重传计时器,使其在RTO毫秒后到期(对于当前的RTO值)。
    (c) 将“连续重传”的计数重置为零。

实现

好的!我们已经讨论了TCP发送方所做的基本工作(给定一个出站的ByteStream,将其分割成多个段,发送给接收方,如果这些段没有被及时确认,就继续重发它们)。我们还讨论了如何判断一个未确认的段丢失并需要重发。

现在是时候讨论你的TCPSender将提供的具体接口了。它需要处理四个重要的事件:

1
void push(const TransmitFunction& transmit);

TCPSender被要求填满来自出站字节流的窗口:它从流中读取,并发送尽可能多的TCPSenderMessages,只要有新的字节可读并且窗口中有可用空间。它通过在它们上调用提供的transmit()函数来发送它们。

你将要确保你发送的每个TCPSenderMessage完全适合接收方的窗口。使每个单独的消息尽可能大,但不要超过TCPConfig::MAX_PAYLOAD_SIZE(1452字节)的值。

你可以使用TCPSenderMessage::sequence_length()方法来计算一个段所占用的序列号总数。记住,SYN和FIN标志也各占用一个序列号,这意味着它们在窗口中也占用空间。

  • 如果窗口大小为零我应该怎么做?

    如果接收方宣布了零大小的窗口,push方法应该假装窗口大小为1。发送方可能最终发送一个被接收方拒绝(并且不被确认)的单个字节,但这也可以促使接收方发送一个新的确认段,其中它透露更多空间已经在其窗口中打开。没有这个,发送方将永远不知道它被允许开始再次发送。这是你的实现应该有的零大小窗口情况的唯一特殊行为。TCPSender实际上不应该记住一个假的窗口大小。特殊情况只存在于push方法内部。另外,请注意,即使窗口大小是1(或20,或200),窗口可能仍然是满的。一个“满”的窗口和一个“零大小”的窗口不是一回事。

1
void receive(const TCPReceiverMessage& msg);

从接收方收到一条消息,传达窗口的新左右边界。TCPSender应该检查其未确认段的集合,并移除现在已经完全确认的任何段(ackno大于段中的所有序列号)。

1
void tick(uint64_t ms_since_last_tick, const TransmitFunction& transmit);

时间已经过去了——自上次调用此方法以来的一定数量的毫秒。发送方可能需要重传一个未确认的段;它可以调用transmit()函数来执行此操作。(提醒:请不要在代码中尝试使用现实世界的“时钟”或“gettimeofday”函数;对时间流逝的唯一参考来自于ms_since_last_tick参数。)

1
TCPSenderMessage make_empty_message() const;

TCPSender应该生成并发送一个零长度的消息,序列号设置正确。如果对端想要发送一个TCPReceiverMessage(例如,因为它需要确认来自对端发送方的某些内容)并且需要生成一个TCPSenderMessage来配合它,这会很有用。
注意:像这样的段占用没有序列号,不需要作为“未确认”的跟踪,并且永远不会被重传。

Q&A

  • 在receive方法通知它之前,我的TCPSender应该假设接收方的窗口大小是多少?

    1

  • 如果一个确认只部分确认了一些未确认的段,我应该尝试剪掉已经确认的字节吗?

    一个TCP发送方可以这样做,但是出于这个类的目的,没有必要变得那么复杂。将每个段视为完全未确认,直到它被完全确认——它占用的所有序列号都小于ackno。

  • 如果我发送了包含“a”、“b”和“c”的三个单独段,并且它们从未被确认,我可以稍后在一个包含“abc”的大段中重新传输它们吗?还是我必须单独重传每个段?

    同样:一个TCP发送方可以这样做,但是出于这个类的目的,没有必要变得那么复杂。只需单独跟踪每个未确认的段,当重传计时器到期时,再次发送最早的未确认段。

  • 我应该在我的“未确认”的数据结构中存储空段并在必要时重传它们吗?

    不——只有那些传递了一些数据的段——即消耗了序列空间中的一些长度的段,才应该被跟踪为未确认,并可能被重传。一个不占用序列号(没有SYN、有效载荷或FIN)的段不需要被记住或重传。

Code

正如文章开头提到的,本次完成作业的过程是先写了个大概然后根据check报的错去修补corner cases,所以代码看起来会比较丑陋。

首先是添加的私有变量

1
2
3
4
5
6
7
8
9
10
11
12
uint64_t RTO_ms_ ;
std::queue<TCPSenderMessage> Send_queue {};
std::queue<TCPSenderMessage> Retransmit_queue {};
uint16_t window_size_ {1};
uint64_t last_ack_ {};
uint64_t last_send_ {};
uint64_t consecutive_retransmissions_ {};
uint64_t timer_ {};
bool timer_running_ {};
bool has_SYN {false};
bool has_FIN {false};
bool zero_windowsize_flag_ {false};

zero_windowsize_flag_ 用来处理“如果接收方宣布了零大小的窗口,push方法应该假装窗口大小为1”的情况。为什么需要记录呢,因为这个假装的结果是携带了1大小的数据,在多次调用push时,如果接收方没有更新窗口大小,那么我们不应该再向后读1,而应该一直用这个同样的数据进行试探。所以需要用一个flag来维护我们这一试探行为的状态。

想好了我们要维护的信息以后,帮助函数的实现是简单的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
uint64_t TCPSender::sequence_numbers_in_flight() const
{
return last_send_ - last_ack_;
}

uint64_t TCPSender::consecutive_retransmissions() const
{
return consecutive_retransmissions_;
}


TCPSenderMessage TCPSender::make_empty_message() const
{
TCPSenderMessage msg = { .seqno = Wrap32::wrap( last_send_, isn_ ),
.SYN = false,
.payload = string(),
.FIN = false,
.RST = input_.has_error() };
return msg;
}

先从简单的接收逻辑开始说起吧,核心部分就是recv_ack > last_ack_的情况,证明了接收方新确认了一些段。所以我们的试探策略是有效的,于是把zero_windowsize_flag_置0。重置RTO和连续重传计数器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void TCPSender::receive( const TCPReceiverMessage& msg )
{
window_size_ = msg.window_size;
if ( msg.RST ) {
input_.set_error();
}
if ( msg.ackno.has_value() ) {
auto recv_ack = msg.ackno.value().unwrap( isn_, last_ack_ );
if ( recv_ack > last_send_ ) {
return; // 说明数据无效
}
if ( recv_ack > last_ack_ ) {
zero_windowsize_flag_ = false;
last_ack_ = recv_ack;
RTO_ms_ = initial_RTO_ms_;
consecutive_retransmissions_ = 0;
if ( timer_running_ && ( sequence_numbers_in_flight() != 0 ) ) {
timer_ = 0;
}
}
}
}

接下来是发送逻辑。

其实我的push只处理好了一件事情,就是唯一的把需要发的送入queue里,剩下的交由发送和重传逻辑进行。每个if分支的解释写在了注释里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
void TCPSender::push( const TransmitFunction& transmit )
{
if ( input_.reader().is_finished() && ( window_size_ > sequence_numbers_in_flight() ) && ( !has_FIN ) ) {
// 发完了数据但是没发FIN的情况
auto msg = make_empty_message();
msg.SYN = !has_SYN;
msg.FIN = true;
Send_queue.push( msg );
last_send_ += msg.sequence_length();
has_FIN = true;
} else {
auto to_read = min( input_.reader().bytes_buffered(),
static_cast<uint64_t>( window_size_ - sequence_numbers_in_flight() ) );
to_read = min( to_read, static_cast<uint64_t>( window_size_ ) );
// 防止新更新的windowsize过小导致unsigned相减导致溢出
if ( to_read == 0 ) {
auto msg = make_empty_message();
msg.SYN = !has_SYN;
if ( !has_SYN ) {
// 最开始还没有发SYN的情况
has_SYN = true;
Send_queue.push( msg );
last_send_ += msg.sequence_length();
} else {
// 当窗口大小为0时假装为1进行试探发送
if ( window_size_ == 0 && ( !zero_windowsize_flag_ ) ) {
if ( ( last_send_ == last_ack_ ) && input_.reader().is_finished() ) {
msg.FIN = true;
} else {
std::string data( ' ', 1 );
read( input_.reader(), 1, data );
msg.payload = data;
}
Send_queue.push( msg );
last_send_ += msg.sequence_length();
zero_windowsize_flag_ = true;
}
}
}
while ( to_read > 0 ) {
auto length = min( to_read, TCPConfig::MAX_PAYLOAD_SIZE );
std::string data( ' ', length );
read( input_.reader(), length, data );
TCPSenderMessage msg = { .seqno = Wrap32::wrap( last_send_, isn_ ),
.SYN = !has_SYN,
.payload = data,
.FIN = input_.reader().is_finished(),
.RST = input_.has_error() };
if ( !has_SYN )
has_SYN = true;
if ( msg.sequence_length() + sequence_numbers_in_flight() > window_size_ ) {
// 如果窗口大小刚好不够FIN,因为FIN也占用1位size,接下来的行为在push函数的第一行
msg.FIN = false;
}
Send_queue.push( msg );
last_send_ += msg.sequence_length();
to_read -= length;
}
}
// 发送
while ( !Send_queue.empty() ) {
transmit( Send_queue.front() );
if ( !timer_running_ ) {
timer_running_ = true;
timer_ = 0;
}
if ( Send_queue.front().FIN )
has_FIN = true;
Retransmit_queue.push( Send_queue.front() );
Send_queue.pop();
};
}

重传逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void TCPSender::tick( uint64_t ms_since_last_tick, const TransmitFunction& transmit )
{
if ( timer_running_ ) {
timer_ += ms_since_last_tick;
}
if ( timer_ >= RTO_ms_ ) {
while ( !Retransmit_queue.empty() ) {
auto msg = Retransmit_queue.front();
if ( msg.seqno.unwrap( isn_, last_send_ ) + msg.sequence_length() > last_ack_ ) {
transmit( msg );
if ( window_size_ != 0 ) { //对于试探行为,不翻倍RTO
RTO_ms_ = 2 * RTO_ms_;
}
timer_ = 0;
consecutive_retransmissions_++;
break;
} else {
Retransmit_queue.pop();
}
}
}
}

CS144-check3
http://tzr.icu/20240323/CS144-check3/
发布于
2024年3月23日
更新于
2024年3月23日
许可协议