惊群(thundering herd)问题通俗的说就是多个进程等待同一个事件(比如同一个socket的可读事件),当事件发生时,内核唤醒所有的进程,但该事件只需要被一个进程处理。这显然不是我们希望的,因为白白浪费了CPU资源。本文试图复现这个问题,然后结合nginx的代码给出它的解决办法。最后给出一个使用epoll和reuseport的多进程echo server。
惊群现象的由来
nginx的master-worker模式
nginx采用master-worker进程的模式,master负责解析配置,启动worker进程和处理信号,比如restart重启worker进程,worker负责真正处理请求。当有多个worker进程时,一个请求将被哪个worker进程处理呢?更具体一点,发送请求的客户端会与哪个worker进程建立TCP连接呢?
我们知道服务端socket的典型流程如下:
int listen_socket = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(8000);
int ret = bind(listen_socket, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
listen(listen_socket, 5);
while (1)
{
int cli_sock = accept(listen_socket, (struct sockaddr *) &cli_addr, (socklen_t *) &addr_length);
// todo
}
netstat
命令可以看到只有master进程监听80端口:
$ ps -ef|grep nginx
root 48664 1 0 19:57 ? 00:00:00 nginx: master process /opt/nginx/sbin/nginx
nobody 48665 48664 0 19:57 ? 00:00:00 nginx: worker process
nobody 48666 48664 0 19:57 ? 00:00:00 nginx: worker process
$ netstat -tnlp|grep 80
tcp 0 0 0.0.0.0:80 0.0.0.0:* LISTEN 48664/nginx: master
简单示例
使用这种方式的示例如下:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
void error(char *msg)
{
perror(msg);
exit(1);
}
int main( int argc, char *argv[] )
{
int sockfd; /* server socket */
int newsockfd; /* client socket */
int portno; /* port to listen on */
int clilen; /* byte size of client's address */
int n; /* message byte size */
struct sockaddr_in serv_addr; /* server's addr */
struct sockaddr_in cli_addr; /* client addr */
char buffer[256]; /* message buffer */
if (argc < 2) {
fprintf(stderr,"usage %s port\n", argv[0]);
exit(0);
}
portno = atoi(argv[1]);
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
error("ERROR opening socket");
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(portno);
if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
error("ERROR on binding");
if (listen(sockfd,5) < 0)
error("ERROR on listening");
clilen = sizeof(cli_addr);
int pid = getpid();
printf("parent process %d\n", pid);
for (int i = 0; i < 2; i++)
{
pid = fork();
if (pid < 0)
{
error("failed to fork");
}
else if (pid==0) // child
{
break;
}
else // parent
{
printf("start child process %d\n", pid);
}
}
if (pid > 0 ) // parent
{
int wstatus;
// just suspend until one child process exits
if ((pid = wait(&wstatus)) == -1)
error("Error on wait");
printf("Process %d terminated\n", pid);
return 0;
}
while(1)
{
newsockfd = accept(sockfd, (struct sockaddr *)&cli_addr, &clilen);
if (newsockfd < 0)
error("ERROR on accept");
printf("process %d get connection %d\n", getpid(), newsockfd);
bzero(buffer, 256);
n = recv(newsockfd, buffer, 255, 0);
if (n < 0)
error("ERROR reading from socket");
if (n >0 ) // n==0 means client close connection
{
n = write(newsockfd, buffer, strlen(buffer));
if (n < 0)
error("ERROR writing to socket");
}
close(newsockfd);
}
return 0;
}
测试一下:
在一个shell窗口里编译并运行(如果linux版本比较低,可能需要使用-std=gnu99才能编译通过,下面的例子也一样):
gcc example1.c
./a.out 8000
在另一个shell窗口里使用nc连续发送请求:
for i in {1..5}; do echo "hello" | nc localhost 8000; done
$ ./a.out 8000
parent process 122761
parent process 3327
start child process 3328
start child process 3329
process 3328 get connection 4
process 3329 get connection 4
process 3328 get connection 4
process 3329 get connection 4
process 3328 get connection 4
pkill a.out
结束程序。
上面的测试在linux内核2.6.32和4.11.7上的结果一致。查看linux内核版本的命令:cat /proc/version
。
使用epoll的示例
nginx在linux上是使用epoll进行异步IO的,将上面的例子使用epoll重写成异步的echo server:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#define MAXEVENTS 64
void
error(char *msg)
{
perror(msg);
exit(1);
}
static int
create_and_bind(int port)
{
struct sockaddr_in serv_addr;
int s, sfd;
sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd < 0)
{
perror("socket failed");
return -1;
}
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(port);
if (bind(sfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
{
perror("bind failed");
return -1;
}
return sfd;
}
static int
make_socket_non_blocking(int sfd)
{
int flags, s;
flags = fcntl(sfd, F_GETFL, 0);
if (flags == -1)
{
perror("fcntl get");
return -1;
}
flags |= O_NONBLOCK;
s = fcntl(sfd, F_SETFL, flags);
if (s == -1)
{
perror("fcntl set");
return -1;
}
return 0;
}
int
main(int argc, char *argv[])
{
int sfd, s;
int efd;
struct epoll_event ep;
struct epoll_event *eps;
if (argc != 2)
{
fprintf(stderr, "Usage: %s port\n", argv[0]);
exit(EXIT_FAILURE);
}
sfd = create_and_bind(atoi(argv[1]));
if (sfd == -1)
exit(1);
s = make_socket_non_blocking(sfd);
if (s == -1)
exit(1);
s = listen(sfd, SOMAXCONN);
if (s == -1)
{
error("listen");
}
int pid = getpid();
printf("parent process %d\n", pid);
for(int i=0; i < 2; i++)
{
pid = fork();
if (pid<0)
{
error("fork");
}
else if (pid == 0) // child
{
break;
}
else // parent
{
printf("start child process %d\n", pid);
}
}
if (pid > 0 ) // parent
{
int wstatus;
// just suspend until one child process exits
if ((pid = wait(&wstatus)) == -1)
error("Error on wait");
printf("Process %d terminated\n", pid);
return 0;
}
efd = epoll_create1(0);
if (efd == -1)
{
error("epoll_create");
}
ep.data.fd = sfd;
ep.events = EPOLLIN | EPOLLET;
s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &ep);
if (s == -1)
{
error("epoll_ctl");
}
eps = calloc(MAXEVENTS, sizeof ep);
/* The event loop */
while (1)
{
int n, i;
n = epoll_wait(efd, eps, MAXEVENTS, -1);
for (i = 0; i < n; i++)
{
if ((eps[i].events & EPOLLERR) ||
(eps[i].events & EPOLLHUP) ||
(!(eps[i].events & EPOLLIN)))
{
if (eps[i].events & EPOLLERR)
fprintf(stderr, "epoll error: EPOLLERR\n");
if (eps[i].events & EPOLLHUP)
fprintf(stderr, "epoll error: EPOLLHUP\n");
close(eps[i].data.fd);
continue;
}
else if (sfd == eps[i].data.fd)
{
struct sockaddr in_addr;
socklen_t in_len;
int infd;
in_len = sizeof in_addr;
infd = accept(sfd, &in_addr, &in_len);
printf("Process %d accept return %d\n", getpid(), infd);
if (infd == -1)
{
if (errno != EAGAIN || errno != EWOULDBLOCK)
{
error("accept");
}
continue;
}
s = make_socket_non_blocking(infd);
if (s == -1)
exit(1);
ep.data.fd = infd;
ep.events = EPOLLIN | EPOLLET;
s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &ep);
if (s == -1)
{
error("epoll_ctl");
}
}
else
{
int done = 0;
while (1)
{
ssize_t count;
char buf[512];
count = read(eps[i].data.fd, buf, sizeof buf);
if (count == -1)
{
if (errno != EAGAIN) {
perror("read");
done = 1;
}
break;
}
else if (count == 0)
{
/* End of file. The remote has closed the
connection. */
done = 1;
break;
}
if (!strncmp(buf, "quit\n", 5))
{
done = 1;
break;
}
s = write(eps[i].data.fd, buf, count);
if (s == -1)
{
error("write");
}
}
if (done)
{
close(eps[i].data.fd);
}
}
}
}
free(eps);
close(sfd);
return EXIT_SUCCESS;
}
$ ./a.out 8000
parent process 11648
start child process 11649
start child process 11650
Process 11649 accept return 5
Process 11650 accept return -1
Process 11650 accept return 5
Process 11649 accept return -1
Process 11650 accept return 5
Process 11649 accept return 5
Process 11650 accept return 5
Process 11649 accept return -1
Process 11650 accept return 5
Process 11649 accept return -1
Process 11650 accept return 5
Process 11649 accept return 5
Process 11649 accept return 5
Process 11649 accept return 5
惊群的解决办法
nginx的accept mutex
以下是nginx启动流程中的关键步骤,使用缩进表示函数的调用关系。
main
ngx_init_cycle
ngx_open_listening_sockets
socket
bind
listen
ngx_master_process_cycle
ngx_start_worker_processes
for (i = 0; i < worker_processes; i++) {
ngx_spawn_process
pid = fork();
switch (pid) {
// ...
case 0:
ngx_worker_process_cycle
ngx_worker_process_init
ngx_event_process_init(=module.init_process)
if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) {
ngx_use_accept_mutex = 1;
// ...
} else {
ngx_use_accept_mutex = 0;
}
ngx_epoll_init (=event_module.actions.init)
epoll_create
for ( ;; ) {
ngx_process_events_and_timers
if (ngx_use_accept_mutex) {
ngx_trylock_accept_mutex
if (ngx_shmtx_trylock)
ngx_enable_accept_events
epoll_ctl
ngx_epoll_process_events(=ngx_process_events)
epoll_wait
handler(ev)
将nginx的配置指令accept_mutex设置为on才会启用这种锁机制,它的默认值是off,nginx的文档是这样解释的:
There is no need to enable accept_mutex on systems that support the EPOLLEXCLUSIVE flag (1.11.3) or when using reuseport.
EPOLLEXCLUSIVE是linux 4.5引入的epoll参数:
When a wakeup event occurs and multiple epoll file descriptors are attached to the same target file using EPOLLEXCLUSIVE, one or more of the epoll file descriptors will receive an event with epoll_wait(2). The default in this scenario (when EPOLLEXCLUSIVE is not set) is for all epoll file descriptors to receive an event. EPOLLEXCLUSIVE is thus useful for avoiding thundering herd problems in certain scenarios.
然而在linux 4.11上的实验似乎与文档的解释相悖——并非所有的进程都会被唤醒。
SO_REUSEPORT
SO_REUSEPORT是linux 3.9引入的socket参数,该功能的引入是为了解决两个问题:
- allow multiple servers (processes or threads) to bind to the same port
- when multiple threads are waiting in the accept() call on a single listening socket, traditional wake-ups are not fair, so that, under high load, incoming connections may be distributed across threads in a very unbalanced fashion.
简单的说就是它允许不同进程或线程的listening socket绑定到同一个ip端口对上,内核会将请求在所有进程(线程)中均衡,每次唤醒一个进程(线程)。
nginx启用reuseport
将nginx的listen指令改成
listen 80 reuseport;
当SO_REUSEPORT选项启用时,每个worker进程都创建独立的listening socket,监听相同的ip端口,accept的时候只有一个进程会获得连接。这样就可以避免加锁的开销,提高CPU利用率。
使用epoll和reuseport的示例
将上面的echo server稍作调整,通过使用SO_REUSEPORT参数,每个worker进程创建自己的listening socket,并监听同一个ip端口对
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#define MAXEVENTS 64
void
error(char *msg)
{
perror(msg);
exit(1);
}
static int
create_and_bind(int port)
{
struct sockaddr_in serv_addr;
int s, sfd;
sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd < 0)
{
perror("socket failed");
return -1;
}
int optval = 1;
setsockopt(sfd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(port);
if (bind(sfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
{
perror("bind failed");
return -1;
}
return sfd;
}
static int
make_socket_non_blocking(int sfd)
{
int flags, s;
flags = fcntl(sfd, F_GETFL, 0);
if (flags == -1)
{
perror("fcntl get");
return -1;
}
flags |= O_NONBLOCK;
s = fcntl(sfd, F_SETFL, flags);
if (s == -1)
{
perror("fcntl set");
return -1;
}
return 0;
}
int
main(int argc, char *argv[])
{
int sfd, s;
int efd;
// typedef union epoll_data {
// void *ptr;
// int fd;
// uint32_t u32;
// uint64_t u64;
// } epoll_data_t;
// struct epoll_event {
// uint32_t events; /* Epoll events */
// epoll_data_t data; /* User data variable */
// };
struct epoll_event ep;
struct epoll_event *eps;
if (argc != 2)
{
fprintf(stderr, "Usage: %s port\n", argv[0]);
exit(EXIT_FAILURE);
}
int pid = getpid();
printf("parent process %d\n", pid);
for(int i=0; i < 2; i++)
{
pid = fork();
if (pid<0)
{
error("fork");
}
else if (pid == 0) // child
{
break;
}
else // parent
{
printf("start child process %d\n", pid);
}
}
if (pid > 0 ) // parent
{
int wstatus;
// just suspend until one child process exits
if ((pid = wait(&wstatus)) == -1)
error("Error on wait");
printf("Process %d terminated\n", pid);
return 0;
}
sfd = create_and_bind(atoi(argv[1]));
if (sfd == -1)
exit(1);
s = make_socket_non_blocking(sfd);
if (s == -1)
exit(1);
s = listen(sfd, SOMAXCONN);
if (s == -1)
{
error("listen");
}
efd = epoll_create1(0);
if (efd == -1)
{
error("epoll_create");
}
ep.data.fd = sfd;
ep.events = EPOLLIN | EPOLLET;
s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &ep);
if (s == -1)
{
error("epoll_ctl");
}
/* Buffer where events are returned */
eps = calloc(MAXEVENTS, sizeof ep);
/* The event loop */
while (1)
{
int n, i;
n = epoll_wait(efd, eps, MAXEVENTS, -1);
for (i = 0; i < n; i++)
{
if ((eps[i].events & EPOLLERR) ||
(eps[i].events & EPOLLHUP) ||
(!(eps[i].events & EPOLLIN)))
{
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
if (eps[i].events & EPOLLERR)
fprintf(stderr, "epoll error: EPOLLERR\n");
if (eps[i].events & EPOLLHUP)
fprintf(stderr, "epoll error: EPOLLHUP\n");
close(eps[i].data.fd);
continue;
}
else if (sfd == eps[i].data.fd)
{
/* Is is possible that more than one connections only wake up one time?
https://stackoverflow.com/questions/41582560/how-does-epolls-epollexclusive-mode-interact-with-level-triggering */
struct sockaddr in_addr;
socklen_t in_len;
int infd;
in_len = sizeof in_addr;
infd = accept(sfd, &in_addr, &in_len);
printf("Process %d accept return %d\n", getpid(), infd);
if (infd == -1)
{
if (errno != EAGAIN || errno != EWOULDBLOCK)
{
error("accept");
}
continue;
}
/* Make the incoming socket non-blocking and add it to the
list of fds to monitor. */
s = make_socket_non_blocking(infd);
if (s == -1)
exit(1);
ep.data.fd = infd;
ep.events = EPOLLIN | EPOLLET;
s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &ep);
if (s == -1)
{
error("epoll_ctl");
}
}
else
{
/* We have data on the fd waiting to be read. Read and
display it. We must read whatever data is available
completely, as we are running in edge-triggered mode
and won't get a notification again for the same
data. */
int done = 0;
while (1)
{
ssize_t count;
char buf[512];
count = read(eps[i].data.fd, buf, sizeof buf);
if (count == -1)
{
/* If errno == EAGAIN, that means we have read all
data. So go back to the main loop. */
if (errno != EAGAIN) {
perror("read");
done = 1;
}
break;
}
else if (count == 0)
{
/* End of file. The remote has closed the
connection. */
done = 1;
break;
}
if (!strncmp(buf, "quit\n", 5))
{
done = 1;
break;
}
s = write(eps[i].data.fd, buf, count);
if (s == -1)
{
error("write");
}
}
if (done)
{
// printf("Closed connection on descriptor %d\n", eps[i].data.fd);
/* Closing the descriptor will make epoll remove it
from the set of descriptors which are monitored. */
close(eps[i].data.fd);
}
}
}
}
free(eps);
close(sfd);
return EXIT_SUCCESS;
}
通过netstat
命令可以看到两个子进程同时监听80端口:
$ ps -ef|grep a.out
yanxurui 69376 61940 0 15:22 pts/3 00:00:00 ./a.out 8000
yanxurui 69377 69376 0 15:22 pts/3 00:00:00 ./a.out 8000
yanxurui 69378 69376 0 15:22 pts/3 00:00:00 ./a.out 8000
$ netstat -tunlp|grep 8000
tcp 0 0 0.0.0.0:8000 0.0.0.0:* LISTEN 69377/./a.out
tcp 0 0 0.0.0.0:8000 0.0.0.0:* LISTEN 69378/./a.out