CS144-check2

本文最后更新于:2024年3月23日 下午

在上一课我们完成了对乱序到达字符串的重组器,接下来我们要完成TCP的接收端。要实现的功能同样很简单:接收数据,抽出数据将有效信息塞到重组器里,返回响应信息。

看一下本次我们实现的部分吧

Overview

Wrap32

从这一课开始,我们也正式见到了TCP协议。和我们上节课理想的reassembler相比,遇到的第一个问题是:TCP的数据寸土寸金,在数据包的头部我们只有一个32位的uint来索引当前数据的位置,
而我们的reassembler是64bit,所以呢,我们需要在64位的数据索引(stream index)和32位的seqno(sequence number)之间转换。

为了实现TCP的索引表示方式,我们需要处理以下三个难题:

  1. 32位整数的转换:TCP流可以非常长,没有长度限制。但是$2^{32}$字节只有4 GiB,这并不大。一旦32位序列号计数到$2^{32} - 1$,流中的下一个字节序列号将重新回到0。

  2. 序列号从随机值开始:为了提高鲁棒性并避免与早期连接的旧段混淆,TCP尝试确保序列号不能被猜测且不太可能重复。因此,流的序列号不是从零开始的。流中的第一个序列号是一个随机的32位数,称为初始序列号(ISN)。这是表示“零点”或SYN(流的开始)的序列号。之后的序列号表现正常:数据的第一个字节将有序列号$ISN+1 (\mod 2^{32})$,第二个字节将有$ISN+2 (\mod 2^{32})$,等等。

  3. 逻辑开始和结束各占用一个序列号:除了确保数据的所有字节都被接收外,TCP还确保流的开始和结束可靠地接收。因此,在TCP中,SYN(流开始)和FIN(流结束)控制标志被分配了序列号(seqno)。每个都占用一个序列号。(SYN标志占用的序列号是ISN。)流中的每个数据字节也占用一个序列号。请记住,SYN和FIN不是流本身的一部分,它们不是“字节”——它们代表字节流本身的开始和结束。

考虑只包含三个字母字符串‘cat’的字节流。如果SYN恰好有序列号$2^{32} - 2$,则每个字节的序列号、绝对序列号和流索引分别是:

SYN标志的序列号是$2^{32} - 2$。

字符‘c’的序列号是$2^{32} - 1 (ISN+1,\mod 2^{32})$。

字符‘a’的序列号是$0 (ISN+2,\mod 2^{32})$。

字符‘t’的序列号是$1 (ISN+3,\mod 2^{32})$。

绝对序列号始终从0开始,没有周期,但是包括了SYN和FIN的占用,而流索引是绝对序列号去除SYN和FIN影响,只表示流传输的数据,也就是传入Reassembler中每个字节的索引。如图:

seqno,abs seqno,stream idx

注意对于32bit到64bit的转换而言,在零点zeropoint以外需要一个检查点,因为任何给定的序列号对应多个绝对序列号。例如,假设初始序列号(ISN)为零,那么序列号 “17” 对应的绝对序列号为 17,但也可以是 $2^{32} + 17$,或者 $2^{33} + 17$,或者 $2^{33} + 2^{32} + 17$,或者 $2^{34} + 17$,或者 $2^{34} + 2^{32} + 17$,等等。检查点有助于解决这种歧义:它是一个绝对序列号,这个类的用户知道它在正确答案的“附近”。在你的 TCP 实现中,你会使用第一个未组装的索引作为检查点。

读懂了上面的内容以后,接下来我们就可以开始实现了。64bit->32bit的转换是显然的,而32bit->64bit的转换我们需要考虑到底多了几个$2^{32}$,考虑到使zeropoint最接近,我们需要一个$2^31$作为补正。例如zeropoint = 1, seqno = 1, checkpoint = $2^{32}-1$,此时虽然checkpoint与前面的差小于$2^32$,但是显然已经走过了一个周期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Wrap32 Wrap32::wrap( uint64_t n, Wrap32 zero_point )
{
return Wrap32 { static_cast<uint32_t>( ( ( n + zero_point.raw_value_ ) % ( 1ul << 32 ) ) ) };
}

uint64_t Wrap32::unwrap( Wrap32 zero_point, uint64_t checkpoint ) const
{
uint64_t tmp = raw_value_ >= zero_point.raw_value_ ? 0 : ( 1ul << 32 );
uint64_t base = raw_value_ + tmp - zero_point.raw_value_;
if (base + ( 1ul << 31 ) < checkpoint){
base += ((checkpoint - base + (1ul << 31)) / (1ul << 32)) * (1ul << 32);
}
return base;
}

Receiver

首先,祝贺你正确实现了包装和解包的逻辑!如果你在实验室环节完成了这项任务,我们会握手表示祝贺(或者在疫情后,用肘部碰你)。在这个实验的剩余部分,你将实现TCP接收器。它将

  1. 从其对等方的发送器接收消息,并使用Reassembler重新组装ByteStream
  2. 向对等方的发送器发送包含确认号(ackno)和窗口大小的消息。

我们预计这部分总共需要大约15行代码。(我自己就是没有理解这句话然后犯病了w)

首先,让我们回顾一下TCP“发送器消息”的格式,它包含了关于ByteStream的信息。这些消息是从TCPSender发送到其对等方的TCPReceiver的:

TCPSenderMessage 结构体包含五个字段(在 minnow/util/tcp_sender_message.hh 中):

  1. 段的起始序列号(seqno)。如果设置了 SYN 标志,这就是 SYN 标志的序列号。否则,它就是 payload 的起始序列号。

  2. SYN 标志。如果设置了,这个段就是字节流的开始,seqno 字段包含了初始序列号(ISN)– zeropoint。

  3. 载荷:字节流的一个子串(可能为空)。

  4. FIN 标志。如果设置了,载荷表示字节流的结束。

  5. RST(重置)标志。如果设置了,流已经发生错误,连接应该被中止。

1
2
3
4
5
6
7
8
9
10
11
struct TCPSenderMessage
{
Wrap32 seqno { 0 };
bool SYN {};
std::string payload {};
bool FIN {};
bool RST {};
// 这个段使用了多少个序列号?
size_t sequence_length() const { return SYN + payload.size() + FIN; }
};
TCPReceiver 会向对等方的 TCPSender 生成自己的消息:

TCPReceiverMessage 结构体包含三个字段(在 minnow/util/tcp_receiver_message.hh 中):

  1. 确认号(ackno):TCP 接收器需要的下一个序列号。这是一个可选字段,如果 TCPReceiver 还没有收到初始序列号,它会为空。

  2. 窗口大小。这是 TCP 接收器希望接收的,从 ackno(如果存在)开始的序列号的数量。最大值是 65535。

  3. RST(重置)标志。如果设置了,流已经发生错误,连接应该被中止。

1
2
3
4
5
6
struct TCPReceiverMessage
{
std::optional<Wrap32> ackno {};
uint16_t window_size {};
bool RST {};
};

你的 TCPReceiver 的任务是接收这两种消息中的一种,并发送另一种:

receive()方法会在每次从对等方的发送器接收到新的段时被调用。这个方法需要:

  1. 如果需要,设置初始序列号。第一个到达并设置了SYN标志的段的序列号就是初始序列号。需要存储这个ISN,以便继续在32位的seqno和64位的index中转换。(注意,SYN标志只是头部中的一个标志。同一消息也可能携带数据或设置FIN标志。)

  2. 将任何数据推送到Reassembler。如果在TCPSegment的头部设置了FIN标志,那么意味着载荷的最后一个字节是整个流的最后一个字节。记住,Reassembler期望得到从零开始的流索引;你将需要解包序列号来生成这些。

让我们开始吧。首先,我们需要维护checkpoint。一个包括SYN和FIN的,absolute index意义上的first_unassembled_index,作为32位转64位的checkpoint和TCPReceiverMessage的ackno。

那么我们的checkpoint什么时候会增加呢,首先会在stream index意义上的first_unassembled_index增加时增加相同的长度。其次,SYN到来时增加1。事实上我们可以认为SYN一定是最先到来的,毕竟没握手哪里来的TCP。最后,在FIN有效时增加1,也就是说last_byte被成功的reassemble时加1。

前面两种情况都是好处理的,维护first_unassembled_index,也就是被重组好推入到writer的字节数。

1
2
3
4
5
auto old_cnt = reassembler_.writer().bytes_pushed();
reassembler_.insert( stream_idx, message.payload, message.FIN );
auto offset = reassembler_.writer().bytes_pushed() - old_cnt;
cp += offset;
cp += message.SYN;

第三种情况因为很难处理FIN是否有效,于是一开始写了一个很复杂的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
// 不必要的!! 但是没有bug
auto old_pending = reassembler_.bytes_pending();
if ( message.FIN
&& ( ( reassembler_.bytes_pending() != old_pending ) || ( offset != 0 )
|| message.payload.size() == 0 ) ) {
FIN_idx = stream_idx + message.payload.size() + ( message.SYN || message.FIN );
}
if ( FIN_idx.has_value() ) {
if ( cp == FIN_idx.value() ) {
cp += 1;
}
}

后来重新考虑了那个大概15行的提示,后来发现FIN是否有效完全等价于reassembler_.writer().is_closed(),所以我为什么要把逻辑再重复一遍呢(

接下来再处理一下边角的逻辑,就有了我们的最终代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void TCPReceiver::receive( TCPSenderMessage message )
{
if ( message.RST ) {
reassembler_.reader().set_error();
}
if ( message.SYN ) {
ISN = message.seqno;
}
if ( ISN.has_value() ) {
auto stream_idx = message.seqno.unwrap( ISN.value(), cp );
if ( !message.SYN ) {
stream_idx -= 1;
}
auto old_cnt = reassembler_.writer().bytes_pushed();
reassembler_.insert( stream_idx, message.payload, message.FIN );
auto offset = reassembler_.writer().bytes_pushed() - old_cnt;
cp += offset;
cp += message.SYN;
if (reassembler_.writer().is_closed()){
cp += 1;
}
}
}

TCPReceiverMessage TCPReceiver::send() const
{
TCPReceiverMessage message;
if ( ISN.has_value() ) {
message.ackno = Wrap32::wrap( cp, ISN.value() );
}
if ( reassembler_.reader().has_error() || reassembler_.writer().has_error()) {
message.RST = true;
}
message.window_size = min( reassembler_.writer().available_capacity(), 65535ul );
return message;
}

CS144-check2
http://tzr.icu/20240315/CS144-check2/
发布于
2024年3月15日
更新于
2024年3月23日
许可协议