基于epoll的多线程网络服务程序设计C语言​采用C语言设计了一个基于epoll的多线程网络服务程序。每个线程都有一个epoll来捕获处于这个线程的socket事件。当子线程数量为0,即只有一个线程,...

基于epoll的多线程网络服务程序设计——C语言​

采用C语言设计了一个基于epoll的多线程网络服务程序。每个线程都有一个epoll来捕获处于这个线程的socket事件。当子线程数量为0,即只有一个线程,则网络监听服务与socket消息处理处于同一个epoll。当子线程数量大于0时,主线程监听socket连接,当有新的连接到来时将其加入到活跃socket数量最小的子线程的epoll中。

server.h

#ifndef EPOLL_C_SERVER_H
#define EPOLL_C_SERVER_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <unistd.h>
#include <errno.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>

#define RESULT_OK 0
#define RESULT_ERROR -1

/**************************************************************************
* Function  : *MSG_HANDLE
* Input    : socket_fd --> socket文件描述符
*       : arg --> void* 参数
* Output   :
* Return   : 1 处理成功,继续等待下次消息;0 处理完毕完毕该连接;-1 异常错误发生
* Description : 消息处理函数指针
****************************************************************************/
typedef int (*MSG_HANDLE)(int socket_fd,void* arg) ; 

typedef struct
{
  int epoll_fd;
  pthread_t thd_fd;
 
  //消息处理函数,各个线程会调用该函数进行消息处理
  MSG_HANDLE data_func;
 
  //一个线程里面的有效socket数量
  unsigned int active_conection_cnt;
  //线程互斥锁,用于实时更新有效socket数量
  pthread_mutex_t thd_mutex; 
}socket_thd_struct;  //表示处理socket的子线程

typedef struct
{
  int epoll_fd;
  unsigned short ip_port;
 
  //消息处理函数,当只有一个线程时,会调用该函数进行消息处理
  MSG_HANDLE data_func;

  //子线程数量,可以为0,为0表示server与socket处理处于同一个线程
  unsigned int socket_pthread_count;
  //子线程结构体指针
  socket_thd_struct* socket_thd; 

}server_struct; //一个网络服务结构体

/**************************************************************************
* Function  : initServerStruct
* Input    : param_port --> 服务端口号
*       : param_thd_count --> 子线程数量,用于处理连接的client
*       : param_handle --> socket数据处理函数指针
* Output   :
* Return   : 初始化好的server结构体
* Description : 初始化server结构体
****************************************************************************/
server_struct* initServerStruct(unsigned short param_port,unsigned int param_thd_count,MSG_HANDLE param_handle);

/**************************************************************************
* Function  : serverRun
* Input    : param_server --> server服务结构体参数
* Output   :
* Return   :RESULT_OK(0):执行成功;RESULT_ERROR(-1):执行失败
* Description : 运行网络服务,监听服务端口
****************************************************************************/
int serverRun(server_struct *param_server);

#endif //EPOLL_C_SERVER_H

server.c

#include "server.h"

static void* socketPthreadRun(void* arg)
{
  socket_thd_struct* pa_sock_st=(socket_thd_struct*)arg;
  int active_counts=0;
  struct epoll_event ev;
  struct epoll_event events[5];
  int ret=0;

  while(1)
  {
    //等待读写事件的到来
    active_counts=epoll_wait(pa_sock_st->epoll_fd,events,5,-1);
    fprintf(stdout,"active count:%d\n",active_counts);

    int index=0;
    for(index=0;index<active_counts;index++)
    {
      if(events[index].events&EPOLLERR) //发生异常错误
      {
        fprintf(stderr,"error happened:errno(%d)-%s\n",errno,strerror(errno));
        epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
        close(events[index].data.fd);

        pthread_mutex_lock(&(pa_sock_st->thd_mutex));
        pa_sock_st->active_conection_cnt--;
        pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
      }
      else if(events[index].events&EPOLLRDHUP) //对端异常关闭连接
      {
        fprintf(stdout,"client close this socket\n");
        epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
        close(events[index].data.fd);

        pthread_mutex_lock(&(pa_sock_st->thd_mutex));
        pa_sock_st->active_conection_cnt--;
        pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
      }
      else if(events[index].events&EPOLLIN) //读事件到来,进行消息处理
      {
        ret=pa_sock_st->data_func(events[index].data.fd,NULL);
        if(ret==-1)
        {
          fprintf(stderr,"client socket exception happened\n");
          epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
          close(events[index].data.fd);

          pthread_mutex_lock(&(pa_sock_st->thd_mutex));
          pa_sock_st->active_conection_cnt--;
          pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
        }
        if(ret==0)
        {
          fprintf(stdout,"client close this socket\n");
          epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
          close(events[index].data.fd);

          pthread_mutex_lock(&(pa_sock_st->thd_mutex));
          pa_sock_st->active_conection_cnt--;
          pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
        }
        else if(ret==1)
        {
         
        }
      }
    }
  }

  pthread_exit(NULL);
}

server_struct* initServerStruct(unsigned short param_port,unsigned int param_thd_count,MSG_HANDLE param_handle)
{
  server_struct* serv_st=(server_struct*)malloc(sizeof(server_struct));
  serv_st->ip_port=param_port;
  serv_st->data_func=param_handle;
  serv_st->epoll_fd=epoll_create(256);
  serv_st->socket_pthread_count=param_thd_count;
  serv_st->socket_thd=NULL;

  if(serv_st->socket_pthread_count>0)
  {
    fprintf(stdout,"create client socket sub thread\n");
    serv_st->socket_thd=(socket_thd_struct*)malloc(sizeof(socket_thd_struct)*serv_st->socket_pthread_count);

    int index=0;
    for(index=0;index<serv_st->socket_pthread_count;index++)
    {
      serv_st->socket_thd[index].epoll_fd=epoll_create(256);
      serv_st->socket_thd[index].data_func=param_handle;
      serv_st->socket_thd[index].active_conection_cnt=0;
      serv_st->socket_thd[index].thd_fd=0;
      //创建子线程
      pthread_create(&(serv_st->socket_thd[index].thd_fd),NULL,socketPthreadRun,(void*)&(serv_st->socket_thd[index]));
      //初始化线程互斥锁
      pthread_mutex_init(&(serv_st->socket_thd[index].thd_mutex),NULL);
    }
  }

  return serv_st;
}

int serverRun(server_struct *param_server)
{
  int ret=RESULT_OK;
  int server_socket=0;
  struct sockaddr_in server_addr;
  bzero(&server_addr,sizeof(server_addr));
  struct epoll_event ev;
  struct epoll_event events[5];
  int active_count=0;
  int index=0;
  int new_socket=0;
  struct sockaddr_in client_info;
  socklen_t client_info_len=0;

  server_addr.sin_family=AF_INET;
  server_addr.sin_addr.s_addr=htons(INADDR_ANY);
  server_addr.sin_port=htons(param_server->ip_port);

  server_socket=socket(PF_INET,SOCK_STREAM,0);
  if(server_socket<0)
  {
    fprintf(stderr,"create socket error:errno(%d)-%s\n",errno,strerror(errno));
    return RESULT_ERROR;
  }
  fprintf(stdout,"create server socket ssuccessful\n");

  param_server->epoll_fd=epoll_create(256);

  ev.data.fd=server_socket;
  ev.events=EPOLLIN|EPOLLET;
  if(epoll_ctl(param_server->epoll_fd,EPOLL_CTL_ADD,server_socket,&ev)!=0)
  {
    fprintf(stderr,"server socket add to epoll error:errno(%d)-%s\n",errno,strerror(errno));
    return RESULT_ERROR;
  }
  fprintf(stdout,"server socket add to epoll successful\n");

  if(bind(server_socket,(struct sockaddr*)&server_addr,sizeof(server_addr))!=0)
  {
    fprintf(stderr,"server bind failed:errno(%d)-%s\n",errno,strerror(errno));
    return RESULT_ERROR;
  }
  fprintf(stdout,"server socket bind successful\n");

  if(listen(server_socket,param_server->ip_port)!=0)
  {
    fprintf(stderr,"server listen failed:errno(%d)-%s\n",errno,strerror(errno));
    return RESULT_ERROR;
  }
  fprintf(stdout,"server socket listen successful\n");

  while(1)
  {
    active_count=epoll_wait(param_server->epoll_fd,events,5,-1);
    fprintf(stdout,"active count:%d\n",active_count);

    for(index=0;index<active_count;index++)
    {
      if(events[index].data.fd==server_socket) //新连接过来
      {
        fprintf(stdout,"new socket comming\n");
        client_info_len=sizeof(client_info);
        new_socket=accept(server_socket,(struct sockaddr*)&client_info,&client_info_len);
        if(new_socket<0)
        {
          fprintf(stderr,"server accept failed:errno(%d)-%s\n",errno,strerror(errno));
          continue;
        }

        fprintf(stdout,"new socket:%d.%d.%d.%d:%d-->connected\n",((unsigned char*)&(client_info.sin_addr))[0],((unsigned char*)&(client_info.sin_addr))[1],((unsigned char*)&(client_info.sin_addr))[2],((unsigned char*)&(client_info.sin_addr))[3],client_info.sin_port);

        ev.data.fd=new_socket;
        ev.events=EPOLLIN|EPOLLERR|EPOLLRDHUP;

        if(param_server->socket_pthread_count==0)
        {
          epoll_ctl(param_server->epoll_fd,EPOLL_CTL_ADD,new_socket,&ev);
        }
        else
        {
          int tmp_index=0;
          int mix_cnt_thread_id=0;
          unsigned int act_cnts=0;
          for(tmp_index=0;tmp_index<param_server->socket_pthread_count;tmp_index++)
          {
            pthread_mutex_lock(&(param_server->socket_thd[tmp_index].thd_mutex));
            act_cnts=param_server->socket_thd[tmp_index].active_conection_cnt;
            pthread_mutex_unlock(&(param_server->socket_thd[tmp_index].thd_mutex));
            if(mix_cnt_thread_id>act_cnts)
            {
              mix_cnt_thread_id=tmp_index;
            }
          }

          epoll_ctl(param_server->socket_thd[mix_cnt_thread_id].epoll_fd,EPOLL_CTL_ADD,new_socket,&ev);

          pthread_mutex_lock(&(param_server->socket_thd[mix_cnt_thread_id].thd_mutex));
          param_server->socket_thd[mix_cnt_thread_id].active_conection_cnt++;
          pthread_mutex_unlock(&(param_server->socket_thd[mix_cnt_thread_id].thd_mutex));
        }

        fprintf(stdout,"add new client socket to epoll\n");
      }
      else if(events[index].events&EPOLLERR || events[index].events&EPOLLRDHUP) //对端关闭连接
      {
        fprintf(stdout,"client close this socket\n");
        epoll_ctl(param_server->epoll_fd,E编程学习网POLL_CTL_DEL,events[index].data.fd,NULL);
        close(events[index].data.fd);
      }
      else if(events[index].events&EPOLLIN) //读事件到来,进行消息处理
      {
        fprintf(stdout,"begin recv client data\n");
        ret=param_server->data_func(events[index].data.fd,NULL);
        if(ret==-1)
        {
          fprintf(stderr,"client socket exception happened\n");
          epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
          close(events[index].data.fd);
        }
        if(ret==0)
        {
          fprintf(stdout,"client close this socket\n");
          epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
          close(events[index].data.fd);
        }
        else if(ret==1)
        {
         
        }
      }
    }
  }

  close(server_socket);
  return RESULT_OK;
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。