本文共 16502 字,大约阅读时间需要 55 分钟。
http://simohayha.iteye.com/blog/478025
相比于发送数据,接收数据更复杂一些。接收数据这里和3层的接口是tcp_v4_rcv(我前面的blog有介绍3层和4层的接口的实现).而4层和用户空间,也就是系统调用是socket_recvmsg(其他的读取函数也都会调用这个函数).而这个系统调用会调用__sock_recvmsg.下面我们就先来看下这个函数。
它的主要功能是初始化sock_iocb,以便与将来数据从内核空间拷贝到用户空间。然后调用
recvmsg这个虚函数(tcp协议的话也就是tcp_recvmsg).
- static inline int __sock_recvmsg(struct kiocb *iocb, struct socket *sock,
- struct msghdr *msg, size_t size, int flags)
- {
- int err;
- struct sock_iocb *si = kiocb_to_siocb(iocb);
-
- si->sock = sock;
- si->scm = NULL;
- si->msg = msg;
- si->size = size;
- si->flags = flags;
-
- err = security_socket_recvmsg(sock, msg, size, flags);
- if (err)
- return err;
-
- return sock->ops->recvmsg(iocb, sock, msg, size, flags);
- }
内核对待数据的接收分为2部分,一部分是当用户是阻塞的读取数据时,这时如果有数据则是直接拷贝到用户空间。而另一方面,如果是非阻塞,则会先把数据拷贝到接收队列。 而在内核中这个队列分为3种形式。分别是: 1 sock域结构的 sk_backlog队列。 2 tcp_sock的ucopy.prequeue队列。 3 sock结构的 receive_queue队列。 我们先来看两个主要的结构体,然后再来解释这3各队列的区别,首先是ucopy结构. 这个结构表示将要直接复制到用户空间的数据。 -
- struct {
-
- struct sk_buff_head prequeue;
-
- struct task_struct *task;
-
- struct iovec *iov;
-
- int memory;
-
- int len;
- ........................
- } ucopy;
接下来是sock的sock_lock结构. 内核的注释很详细,这个锁主要是用来对软中断和进程上下文之间提供一个同步。 -
-
-
-
- typedef struct {
-
- spinlock_t slock;
-
- int owned;
-
- wait_queue_head_t wq;
-
-
-
-
-
-
- #ifdef CONFIG_DEBUG_LOCK_ALLOC
- struct lockdep_map dep_map;
- #endif
- } socket_lock_t;
然后来看3个队列的区别。 首先sk_backlog队列是当当前的sock在进程上下文中被使用时,如果这个时候有数据到来,则将数据拷贝到sk_backlog. prequeue则是数据buffer第一站一般都是这里,如果prequeue已满,则会拷贝数据到receive_queue队列种。 最后一个receive_queue也就是进程上下文第一个取buffer的队列。(后面介绍tcp_recvmsg时会再介绍这3个队列). 这里为什么要有prequeue呢,直接放到receive_queue不就好了.这里我是认receive_queue的处理比较繁琐(看tcp_rcv_established的实现就知道了,分为slow path和fast path),而软中断每次只能处理一个数据包(在一个cpu上),因此为了软中断能尽快完成,我们就可以先将数据放到prequeue中(tcp_prequeue),然后软中断就直接返回.而处理prequeue就放到进程上下文去处理了. 最后在分析tcp_v4_rcv和tcp_recvmsg之前,我们要知道tcp_v4_rcv还是处于软中断上下文,而tcp_recvmsg是处于进程上下文,因此比如socket_lock_t才会提供一个owned来锁住对应的sock。而我们也就是需要这3个队列来进行软中断上下文和进程上下文之间的通信。最终当数据拷贝到对应队列,则软中断调用返回。这里要注意的是相同的函数在软中断上下文和进程上下文种调用是不同的,我们下面就会看到(比如tcp_rcv_established函数) ok,现在来看tcp_v4_rcv的源码。这个函数是在软中断上下文中被调用的,我们这里来看下她的代码片断: - int tcp_v4_rcv(struct sk_buff *skb)
- {
-
- const struct iphdr *iph;
- struct tcphdr *th;
- struct sock *sk;
- int ret;
- struct net *net = dev_net(skb->dev);
- ............................
-
-
- sk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest);
- if (!sk)
- goto no_tcp_socket;
-
- process:
-
-
- if (sk->sk_state == TCP_TIME_WAIT)
- goto do_time_wait;
- .................................
-
- bh_lock_sock_nested(sk);
- ret = 0;
-
- if (!sock_owned_by_user(sk)) {
- 。........................
- {
-
- if (!tcp_prequeue(sk, skb))
-
- ret = tcp_v4_do_rcv(sk, skb);
- }
- } else
-
- sk_add_backlog(sk, skb);
-
- bh_unlock_sock(sk);
-
- sock_put(sk);
-
- return ret;
- ...................................................
上面的流程很简单,我们接下来来看几个跳过的函数,第一个是tcp_prequeue。 这里我们可以看到sysctl_tcp_low_latency可以决定我们是否使用prequeue队列. - static inline int tcp_prequeue(struct sock *sk, struct sk_buff *skb)
- {
- struct tcp_sock *tp = tcp_sk(sk);
-
-
- if (sysctl_tcp_low_latency || !tp->ucopy.task)
- return 0;
-
- __skb_queue_tail(&tp->ucopy.prequeue, skb);
-
- tp->ucopy.memory += skb->truesize;
-
- if (tp->ucopy.memory > sk->sk_rcvbuf) {
- struct sk_buff *skb1;
-
- BUG_ON(sock_owned_by_user(sk));
-
- while ((skb1 = __skb_dequeue(&tp->ucopy.prequeue)) != NULL) {
-
- sk_backlog_rcv(sk, skb1);
- NET_INC_STATS_BH(sock_net(sk),
- LINUX_MIB_TCPPREQUEUEDROPPED);
- }
-
- tp->ucopy.memory = 0;
- } else if (skb_queue_len(&tp->ucopy.prequeue) == 1) {
-
- wake_up_interruptible_poll(sk->sk_sleep,
- POLLIN | POLLRDNORM | POLLRDBAND);
-
-
- if (!inet_csk_ack_scheduled(sk))
- inet_csk_reset_xmit_timer(sk, ICSK_TIME_DACK,
- (3 * tcp_rto_min(sk)) / 4,
- TCP_RTO_MAX);
- }
- return 1;
- }
我们这里只关注TCP_ESTABLISHED状态,来看tcp_v4_do_rcv:它主要是通过判断相应的tcp状态来进入相关的处理函数。 - int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
- {
- struct sock *rsk;
- ...................................
- if (sk->sk_state == TCP_ESTABLISHED) {
- TCP_CHECK_TIMER(sk);
-
- if (tcp_rcv_established(sk, skb, tcp_hdr(skb), skb->len)) {
- rsk = sk;
- goto reset;
- }
- TCP_CHECK_TIMER(sk);
- return 0;
- }
-
- ........................................
- }
因此我们这里重点要看的函数就是tcp_rcv_established,当它在软中断上下文中被调用时,主要的目的是将skb加入到receive_queue队列中。因此这里我们只看这一部分,等下面分析tcp_recvmsg时,我们再来看进程上下文才会处理的一部分。 -
- if (!eaten) {
-
- if (tcp_checksum_complete_user(sk, skb))
-
- goto csum_error;
- ..................................................
-
- __skb_pull(skb, tcp_header_len);
-
-
- __skb_queue_tail(&sk->sk_receive_queue, skb);
- skb_set_owner_r(skb, sk);
-
- tp->rcv_nxt = TCP_SKB_CB(skb)->end_seq;
- }
- ............................................
接下来来看tcp_rcvmsg函数。 通过上面我们知道有找个队列可供我们取得skbuf,那么具体的次序是什么呢,我这里摘抄内核的注释,它讲的非常清楚: Look: we have the following (pseudo)queues:
1. packets in flight
2. backlog
3. prequeue
4. receive_queue
Each queue can be processed only if the next ones are empty. At this point we have empty receive_queue.But prequeue _can_ be not empty after 2nd iteration, when we jumped to start of loop because backlog
processing added something to receive_queue. We cannot release_sock(), because backlog containd packets arrived _after_ prequeued ones.
Shortly, algorithm is clear --- to process all the queues in order. We could make it more directly,requeueing packets from backlog to prequeue, if is not empty. It is more elegant, but eats cycles,
由于这个函数比较复杂,因此我们分段来分析这个函数。 首先是处理包之前的一些合法性判断,以及取得一些有用的值。 - int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
- size_t len, int nonblock, int flags, int *addr_len)
- {
- ...................................
-
-
- lock_sock(sk);
-
- TCP_CHECK_TIMER(sk);
-
- err = -ENOTCONN;
- if (sk->sk_state == TCP_LISTEN)
- goto out;
-
-
-
- timeo = sock_rcvtimeo(sk, nonblock);
-
-
- if (flags & MSG_OOB)
- goto recv_urg;
-
-
- seq = &tp->copied_seq;
- if (flags & MSG_PEEK) {
- peek_seq = tp->copied_seq;
- seq = &peek_seq;
- }
-
-
- target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
- }
在上面我们看到了lock_sock,这个函数是用来锁住当前的sock, 我们来看它的详细实现,它最终会调用lock_sock_nested: - void lock_sock_nested(struct sock *sk, int subclass)
- {
- might_sleep();
-
- spin_lock_bh(&sk->sk_lock.slock);
-
- if (sk->sk_lock.owned)
- __lock_sock(sk);
-
- sk->sk_lock.owned = 1;
-
- spin_unlock(&sk->sk_lock.slock);
-
-
-
- mutex_acquire(&sk->sk_lock.dep_map, subclass, 0, _RET_IP_);
- local_bh_enable();
- }
我们再来看__lock_sock如何来处理的。 - static void __lock_sock(struct sock *sk)
- {
- DEFINE_WAIT(wait);
-
- for (;;) {
-
- prepare_to_wait_exclusive(&sk->sk_lock.wq, &wait,
- TASK_UNINTERRUPTIBLE);
-
- spin_unlock_bh(&sk->sk_lock.slock);
-
- schedule();
- spin_lock_bh(&sk->sk_lock.slock);
-
- if (!sock_owned_by_user(sk))
- break;
- }
- finish_wait(&sk->sk_lock.wq, &wait);
- }
ok,再回到tcp_recvmsg.接下来我们来看如何处理数据包。 下面这一段主要是用来从receive队列中读取数据。 - do {
- u32 offset;
-
-
- if (tp->urg_data && tp->urg_seq == *seq) {
- if (copied)
- break;
- if (signal_pending(current)) {
- copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
- break;
- }
- }
-
-
-
- skb_queue_walk(&sk->sk_receive_queue, skb) {
-
- if (before(*seq, TCP_SKB_CB(skb)->seq)) {
- printk(KERN_INFO "recvmsg bug: copied %X "
- "seq %X\n", *seq, TCP_SKB_CB(skb)->seq);
- break;
- }
-
- offset = *seq - TCP_SKB_CB(skb)->seq;
-
- if (tcp_hdr(skb)->syn)
- offset--;
-
- if (offset < skb->len)
- goto found_ok_skb;
- if (tcp_hdr(skb)->fin)
- goto found_fin_ok;
- WARN_ON(!(flags & MSG_PEEK));
- }
- ....................................
- }while(len > 0)
接下来是对tcp状态做一些校验。这里要注意,copied表示的是已经复制到用户空间的skb的大小。而len表示还需要拷贝多少数据。 -
- if (copied >= target && !sk->sk_backlog.tail)
- break;
-
- if (copied) {
- if (sk->sk_err ||
- sk->sk_state == TCP_CLOSE ||
- (sk->sk_shutdown & RCV_SHUTDOWN) ||
- !timeo ||
- signal_pending(current))
- break;
- } else {
-
-
- if (sock_flag(sk, SOCK_DONE))
- break;
-
- if (sk->sk_err) {
- copied = sock_error(sk);
- break;
- }
-
- if (sk->sk_shutdown & RCV_SHUTDOWN)
- break;
-
- if (sk->sk_state == TCP_CLOSE) {
- if (!sock_flag(sk, SOCK_DONE)) {
- copied = -ENOTCONN;
- break;
- }
- break;
- }
-
- if (!timeo) {
- copied = -EAGAIN;
- break;
- }
-
- if (signal_pending(current)) {
- copied = sock_intr_errno(timeo);
- break;
- }
- }
然后就是根据已经复制的数据大小来清理receive队列中的数据,并且发送ACK给对端。然后就是给tcp_socket的ucopy域赋值,主要是iov域和task域。一个是数据区,一个是当前从属的进程。 - tcp_cleanup_rbuf(sk, copied);
-
- if (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) {
-
-
- if (!user_recv && !(flags & (MSG_TRUNC | MSG_PEEK))) {
-
- user_recv = current;
- tp->ucopy.task = user_recv;
- tp->ucopy.iov = msg->msg_iov;
- }
-
- tp->ucopy.len = len;
-
- WARN_ON(tp->copied_seq != tp->rcv_nxt &&
- !(flags & (MSG_PEEK | MSG_TRUNC)));
-
- if (!skb_queue_empty(&tp->ucopy.prequeue))
- goto do_prequeue;
-
-
- }
-
- if (copied >= target) {
-
- release_sock(sk);
- lock_sock(sk);
- } else
-
- sk_wait_data(sk, &timeo);
上面的分析中有release_sock函数,这个函数用来release这个sock,也就是对这个sock解除锁定。然后唤醒等待队列。 这里要注意,sock一共有两个等待队列,一个是sock的sk_sleep等待队列,这个等待队列用来等待数据的到来。一个是ucopy域的等待队列wq,这个表示等待使用这个sock。 - void release_sock(struct sock *sk)
- {
-
- mutex_release(&sk->sk_lock.dep_map, 1, _RET_IP_);
-
- spin_lock_bh(&sk->sk_lock.slock);
-
- if (sk->sk_backlog.tail)
- __release_sock(sk);
-
- sk->sk_lock.owned = 0;
-
- if (waitqueue_active(&sk->sk_lock.wq))
- wake_up(&sk->sk_lock.wq);
- spin_unlock_bh(&sk->sk_lock.slock);
- }
然后来看主要的处理函数__release_sock,它主要是遍历backlog队列,然后处理skb。这里它有两个循环,外部循环是遍历backlog,而内部循环是遍历skb(也就是数据)。 - static void __release_sock(struct sock *sk)
- {
- struct sk_buff *skb = sk->sk_backlog.head;
-
-
- do {
- sk->sk_backlog.head = sk->sk_backlog.tail = NULL;
- bh_unlock_sock(sk);
-
- do {
- struct sk_buff *next = skb->next;
-
- skb->next = NULL;
-
-
- sk_backlog_rcv(sk, skb);
- cond_resched_softirq();
-
- skb = next;
- } while (skb != NULL);
-
- bh_lock_sock(sk);
- } while ((skb = sk->sk_backlog.head) != NULL);
- }
而当数据tp->ucopy.prequeue为空,并且所复制的数据不能达到所期望的值,此时我们进入sk_wait_data等待数据的到来。 - #define sk_wait_event(__sk, __timeo, condition) \
- ({ int __rc; \
-
- release_sock(__sk); \
- __rc = condition;
-
- if (!__rc) {
-
- *(__timeo) = schedule_timeout(*(__timeo)); \
- } \
- lock_sock(__sk); \
- __rc = condition; \
- __rc; \
- })
-
-
-
- int sk_wait_data(struct sock *sk, long *timeo)
- {
- int rc;
- DEFINE_WAIT(wait);
-
- prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
- set_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);
-
- rc = sk_wait_event(sk, timeo, !skb_queue_empty(&sk->sk_receive_queue));
- clear_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);
- finish_wait(sk->sk_sleep, &wait);
- return rc;
- }
接下来就是一些域的更新,以及处理prequeue队列: -
-
- if (user_recv) {
- int chunk;
-
-
-
- if ((chunk = len - tp->ucopy.len) != 0) {
- NET_ADD_STATS_USER(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMBACKLOG, chunk);
- len -= chunk;
- copied += chunk;
- }
-
- if (tp->rcv_nxt == tp->copied_seq &&
- !skb_queue_empty(&tp->ucopy.prequeue)) {
- do_prequeue:
-
- tcp_prequeue_process(sk);
-
-
- if ((chunk = len - tp->ucopy.len) != 0) {
- NET_ADD_STATS_USER(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
- len -= chunk;
- copied += chunk;
- }
- }
- }
- ...................................
- continue;
在分析tcp_prequeue_process之前,我们先来看下什么情况下release_sock会直接复制数据到用户空间。我们知道它最终会调用tcp_rcv_established函数,因此来看tcp_rcv_established的代码片断 内核接收到的数据包有可能不是正序的,可是内核传递给用户空间的数据必须是正序的,只有这样才能拷贝给用户空间。 - else {
- int eaten = 0;
- int copied_early = 0;
-
- if (tp->copied_seq == tp->rcv_nxt &&
- len - tcp_header_len <= tp->ucopy.len) {
- .........................
-
-
- if (tp->ucopy.task == current &&
- sock_owned_by_user(sk) && !copied_early) {
- __set_current_state(TASK_RUNNING);
-
- if (!tcp_copy_to_iovec(sk, skb, tcp_header_len))
- eaten = 1;
- }
-
- ...............................
通过上面的判断条件我们很容易看出前面调用release_sock,为何有时将数据拷贝到用户空间,有时拷贝到receive队列。 ok,最后我们来看下tcp_prequeue_process的实现。它的实现很简单,就是遍历prequeue,然后处理buf。这里要注意,它会处理完所有的prequeue,也就是会清空prequeue. - static void tcp_prequeue_process(struct sock *sk)
- {
- struct sk_buff *skb;
- struct tcp_sock *tp = tcp_sk(sk);
-
- NET_INC_STATS_USER(sock_net(sk), LINUX_MIB_TCPPREQUEUED);
-
-
-
- local_bh_disable();
-
- while ((skb = __skb_dequeue(&tp->ucopy.prequeue)) != NULL)
-
- sk_backlog_rcv(sk, skb);
- local_bh_enable();
-
- tp->ucopy.memory = 0;
- }
最后简要的分析下数据如何复制到用户空间。这里的主要函数是skb_copy_datagram_iovec。最终都是通过这个函数复制到用户空间的。 我们知道内核存储数据有两种形式如果支持S/G IO的网卡,它会保存数据到skb_shinfo(skb)->frags(详见前面的blog),否则则会保存在skb的data区中。 因此这里也是分为两部分处理。 还有一个就是这里遍历frags也是遍历两次,第一次遍历是查找刚好 - int skb_copy_datagram_iovec(const struct sk_buff *skb, int offset,
- struct iovec *to, int len)
- {
- int start = skb_headlen(skb);
- int i, copy = start - offset;
- struct sk_buff *frag_iter;
-
- if (copy > 0) {
- if (copy > len)
- copy = len;
-
- if (memcpy_toiovec(to, skb->data + offset, copy))
- goto fault;
- if ((len -= copy) == 0)
- return 0;
- offset += copy;
- }
-
-
- for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
- int end;
-
- WARN_ON(start > offset + len);
-
- end = start + skb_shinfo(skb)->frags[i].size;
-
- if ((copy = end - offset) > 0) {
- int err;
- u8 *vaddr;
- skb_frag_t *frag = &skb_shinfo(skb)->frags[i];
- struct page *page = frag->page;
-
- if (copy > len)
- copy = len;
-
- vaddr = kmap(page);
-
- err = memcpy_toiovec(to, vaddr + frag->page_offset +
- offset - start, copy);
- kunmap(page);
- if (err)
- goto fault;
-
- if (!(len -= copy))
- return 0;
-
- offset += copy;
- }
-
- start = end;
- }
-
-
- skb_walk_frags(skb, frag_iter) {
- int end;
-
- WARN_ON(start > offset + len);
-
- end = start + frag_iter->len;
- if ((copy = end - offset) > 0) {
- if (copy > len)
- copy = len;
-
- if (skb_copy_datagram_iovec(frag_iter,
- offset - start,
- to, copy))
- goto fault;
- if ((len -= copy) == 0)
- return 0;
- offset += copy;
- }
- start = end;
- }
- if (!len)
- return 0;
-
- fault:
- return -EFAULT;
- }
转载地址:http://urobi.baihongyu.com/