/** * struct pipe_inode_info - a linux kernel pipe * @mutex: mutex protecting the whole thing * @rd_wait: reader wait point in case of empty pipe * @wr_wait: writer wait point in case of full pipe * @head: The point of buffer production * @tail: The point of buffer consumption * @note_loss: The next read() should insert a data-lost message * @max_usage: The maximum number of slots that may be used in the ring * @ring_size: total number of buffers (should be a power of 2) * @nr_accounted: The amount this pipe accounts for in user->pipe_bufs * @tmp_page: cached released page * @readers: number of current readers of this pipe * @writers: number of current writers of this pipe * @files: number of struct file referring this pipe (protected by ->i_lock) * @r_counter: reader counter * @w_counter: writer counter * @fasync_readers: reader side fasync * @fasync_writers: writer side fasync * @bufs: the circular array of pipe buffers * @user: the user who created this pipe * @watch_queue: If this pipe is a watch_queue, this is the stuff for that **/ structpipe_inode_info { structmutexmutex; wait_queue_head_t rd_wait, wr_wait; unsignedint head; unsignedint tail; unsignedint max_usage; unsignedint ring_size; #ifdef CONFIG_WATCH_QUEUE bool note_loss; #endif unsignedint nr_accounted; unsignedint readers; unsignedint writers; unsignedint files; unsignedint r_counter; unsignedint w_counter; structpage *tmp_page; structfasync_struct *fasync_readers; structfasync_struct *fasync_writers; structpipe_buffer *bufs; structuser_struct *user; #ifdef CONFIG_WATCH_QUEUE structwatch_queue *watch_queue; #endif };
/** * struct pipe_buffer - a linux kernel pipe buffer * @page: the page containing the data for the pipe buffer * @offset: offset of data inside the @page * @len: length of data inside the @page * @ops: operations associated with this buffer. See @pipe_buf_operations. * @flags: pipe buffer flags. See above. * @private: private data owned by the ops. **/ structpipe_buffer { structpage *page; unsignedint offset, len; conststructpipe_buf_operations *ops; unsignedint flags; unsignedlongprivate; };
/* Null write succeeds. */ if (unlikely(total_len == 0)) return0;
__pipe_lock(pipe);
// 确保读者数量不为0 if (!pipe->readers) { send_sig(SIGPIPE, current, 0); ret = -EPIPE; goto out; }
#ifdef CONFIG_WATCH_QUEUE if (pipe->watch_queue) { ret = -EXDEV; goto out; } #endif
/* * Only wake up if the pipe started out empty, since * otherwise there should be no readers waiting. * * If it wasn't empty we try to merge new data into * the last buffer. * * That naturally merges small writes, but it also * page-aligs the rest of the writes for large writes * spanning multiple pages. */ head = pipe->head; was_empty = pipe_empty(head, pipe->tail); chars = total_len & (PAGE_SIZE-1); // 要写入的数据的大小相对页帧大小的余数 // 如果余数不为0,且pipe不为空 if (chars && !was_empty) { unsignedint mask = pipe->ring_size - 1; // 当前头部的上一个缓冲区,因为要尝试将多余的数据与之前的数据合并 structpipe_buffer *buf = &pipe->bufs[(head - 1) & mask]; int offset = buf->offset + buf->len;
// 如果PIPE_BUF_FLAG_CAN_MERGE被置位,且buf能容下chars大小的数据 if ((buf->flags & PIPE_BUF_FLAG_CAN_MERGE) && offset + chars <= PAGE_SIZE) { ret = pipe_buf_confirm(pipe, buf); if (ret) goto out;
// 将chars大小的数据写入缓冲区 ret = copy_page_from_iter(buf->page, offset, chars, from); if (unlikely(ret < chars)) { ret = -EFAULT; goto out; }
for (;;) { // 确保对着数量不为0 if (!pipe->readers) { send_sig(SIGPIPE, current, 0); if (!ret) ret = -EPIPE; break; }
head = pipe->head; // 如果pipe没被填满 if (!pipe_full(head, pipe->tail, pipe->max_usage)) { unsignedint mask = pipe->ring_size - 1; structpipe_buffer *buf = &pipe->bufs[head & mask]; structpage *page = pipe->tmp_page; // tmp_page用来临时存数据 int copied;
// 如果tmp_page还未分配,则用alloc_page分配一个page并赋值 if (!page) { page = alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT); if (unlikely(!page)) { ret = ret ? : -ENOMEM; break; } pipe->tmp_page = page; }
/* Allocate a slot in the ring in advance and attach an * empty buffer. If we fault or otherwise fail to use * it, either the reader will consume it or it'll still * be there for the next write. */ // 自旋锁锁住读者等待队列 spin_lock_irq(&pipe->rd_wait.lock);
head = pipe->head; // 如果pipe已经被填满则进入下一次循环 if (pipe_full(head, pipe->tail, pipe->max_usage)) { spin_unlock_irq(&pipe->rd_wait.lock); continue; }
// 先让头部指针指向下一个缓冲区 pipe->head = head + 1; spin_unlock_irq(&pipe->rd_wait.lock);// 释放自旋锁
// 只有in是pipe if (ipipe) { ...... } // 只有out是pipe if (opipe) { // 处理in和out的偏移 if (off_out) return -ESPIPE; if (off_in) { if (!(in->f_mode & FMODE_PREAD)) return -EINVAL; if (copy_from_user(&offset, off_in, sizeof(loff_t))) return -EFAULT; } else { offset = in->f_pos; }
if (out->f_flags & O_NONBLOCK) flags |= SPLICE_F_NONBLOCK;
pipe_lock(opipe); // 等待pipe有可用的缓冲区 ret = wait_for_space(opipe, flags); if (!ret) { unsignedint p_space;
/* Don't try to read more the pipe has space for. */ p_space = opipe->max_usage - pipe_occupancy(opipe->head, opipe->tail);// pipe可用空间 len = min_t(size_t, len, p_space << PAGE_SHIFT);// 实际读取长度不能超过pipe可用空间
ret = do_splice_to(in, &offset, opipe, len, flags); // 调用do_splice_to完成主要工作 } pipe_unlock(opipe); if (ret > 0) wakeup_pipe_readers(opipe); if (!off_in) in->f_pos = offset; elseif (copy_to_user(off_in, &offset, sizeof(loff_t))) ret = -EFAULT;