代码结构 文件介绍
InetAddress.h
InetAddress类 ip和端口设置
Socket.h
Socket类 设置fd
Epoll.h
epollfd 管理类
Channel.h
Channel类 管理epoll以及对应回调函数实现
EventLoop.h
EventLoop事件循环类
TcpServer.h
服务器类
tcpepoll.cpp 主函数
InetAddress.h
#ifndef _INETADDRESS_H
#define _INETADDRESS_H
#pragma on_INETADDRESS_He
#include
#include
#include
class InetAddress
{
private:
sockaddr_in addr_;
public:
InetAddress(const std::string &ip, uint16_t port);
InetAddress(const sockaddr_in addr);
InetAddress();
~InetAddress();
const char *ip()const;
uint16_t port()const;
const sockaddr *addr()const;
void setaddr(sockaddr_in clientaddr);
};
#endif // _INETADDRESS_H
InetAddress.cpp
#include "InetAddress.h"
InetAddress::InetAddress()
{
}
InetAddress::InetAddress(const std::string &ip, uint16_t port)
{
addr_.sin_family = AF_INET;
addr_.sin_addr.s_addr = inet_addr(ip.c_str());
addr_.sin_port = htons(port);
}
InetAddress::InetAddress(const sockaddr_in addr):addr_(addr)
{
}
InetAddress::~InetAddress()
{
}
const char* InetAddress::ip()const
{
return inet_ntoa(addr_.sin_addr);
}
uint16_t InetAddress::port()const
{
return ntohs(addr_.sin_port);
}
const sockaddr* InetAddress::addr()const
{
return (sockaddr*)&addr_;
}
void InetAddress::setaddr(sockaddr_in clientaddr)
{
addr_ = clientaddr;
}
Socket.h
#ifndef SOCKET_H
#define SOCKET_H
#include
#include
#include
#include
#include
#include
#include // TCP_NODELAY
#include "InetAddress.h"
int createnonblocking();
class Socket
{
public:
Socket(int fd);
~Socket();
int fd() const;
void setreuseaddr(bool on);
void setreuseport(bool on);
void settcpnodelay(bool on);
void setkeepalive(bool on);
void bind(const InetAddress &servaddr);
void listen(int n=128);
int accept(InetAddress &clientaddr);
private:
const int fd_;
};
#endif // !SOCKET_H
Socket.cpp
#include "Socket.h"
int createnonblocking()
{
int listenfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
if(listenfd (sizeof(optval)));
}
void Socket::setreuseport(bool on)
{
int optval = on ? 1 : 0;
setsockopt(fd_, SOL_SOCKET, SO_REUSEPORT, &optval, static_cast(sizeof(optval)));
}
void Socket::settcpnodelay(bool on)
{
int optval = on ? 1 : 0;
setsockopt(fd_, SOL_SOCKET, TCP_NODELAY, &optval, static_cast(sizeof(optval)));
}
void Socket::setkeepalive(bool on)
{
int optval = on ? 1 : 0;
setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &optval, static_cast(sizeof(optval)));
}
void Socket::bind(const InetAddress &servaddr)
{
if(::bind(fd_, servaddr.addr(), sizeof(sockaddr))
Epoll.h
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include // TCP_NODELAY
#include
#include "Channel.h"
class Channel;
class Epoll
{
private:
static const int MaxEvents = 100;
int epollfd_;
epoll_event events_[MaxEvents];
public:
Epoll();
~Epoll();
//void addfd(int fd, uint32_t op);
void updatechannel(Channel *ch);
//std::vector loop(int timeout=-1);
std::vector loop(int timeout=-1);
};
Epoll.cpp
#include "Epoll.h"
/*
class Epoll
{
private:
static const int MaxEvents 服务器托管网= 100;
int epollfd;
epoll_event events_[MaxEvents];
public:
Epoll();
~Epoll();
void addfd(int fd, uint32_t op);
std::vector loop(int timeout=-1);
}
*/
Epoll::Epoll()
{
if((epollfd_ = epoll_create(1)) == -1)
{
printf("epoll_create() failed(%d).n", errno);
exit(-1);
}
}
Epoll::~Epoll()
{
close(epollfd_);
}
/*
void Epoll::addfd(int fd, uint32_t op)
{
struct epoll_event ev;
ev.data.fd = fd;
ev.events = op; //水平
if(epoll_ctl(epollfd_, EPOLL_CTL_ADD, fd, &ev) == -1)
{
printf("epoll_ctl() failed(%d).n", errno);
exit(-1);
}
}
*/
void Epoll::updatechannel(Channel *ch)
{
epoll_event ev;
ev.data.ptr = ch;
ev.events = ch->events();
if(ch->inpoll())
{
if(epoll_ctl(epollfd_, EPOLL_CTL_MOD, ch->fd(),&ev) == -1)
{
printf("epoll_ctl() failed(%d).n", errno);
exit(-1);
}
printf("epoll_ctl() EPOLL_CTL_MOD success. %dn", ch->fd());
}
else
{
if(epoll_ctl(epollfd_, EPO服务器托管网LL_CTL_ADD, ch->fd(),&ev) == -1)
{
printf("epoll_ctl() failed(%d).n", errno);
exit(-1);
}
ch->setinepoll();
printf("epoll_ctl() EPOLL_CTL_ADD success. %dn", ch->fd());
}
}
/*
std::vector Epoll::loop(int timeout)
{
std::vector evs;
bzero(events_, sizeof(events_));
int infds = epoll_wait(epollfd_, events_, MaxEvents, timeout);
if(infds Epoll::loop(int timeout)
{
std::vector channles;
bzero(events_, sizeof(events_));
int infds = epoll_wait(epollfd_, events_, MaxEvents, timeout);
if(infds setrevents(events_[i].events);
channles.push_back(ch);
}
return channles;
}
Channel.h
#ifndef CHANNEL_H
#define CHANNEL_H
#pragma once
#include
#include
#include "Epoll.h"
#include "InetAddress.h"
#include "Socket.h"
class Epoll;
class Channel
{
private:
int fd_=-1;
Epoll *ep_ = nullptr; //channle 对应的红黑树
bool inepoll_=false; // epoll_ctl add mod
uint32_t events_=0; //fd_需要监视的事件
uint32_t revents_=0; //fd 已发生的事件
std::function readcallback_;
public:
Channel(Epoll *ep, int fd);
~Channel();
int fd();
void useet(); //采用边缘触发
void enablereading(); //让epoll_wait()监视fd_的读事件
void setinepoll();
void setrevents(uint32_t ev);
bool inpoll();
uint32_t events();
uint32_t revents(); //返回revents_成员
void handleevent();
void newconnection(Socket* servsock);
void onmessage();
void setreadcallback(std::function fn);
};
#endif // ! CHANNEL_H
Channel.cpp
#include "Channel.h"
/*
class Channel
{
private:
int fd_=-1;
Epoll *ep_ = nullptr; //channle 对应的红黑树
bool inepoll_=false; // epoll_ctl add mod
uint32_t events_=0; //fd_需要监视的事件
uint32_t revents_0; //fd 已发生的事件
public:
Channel(Epoll *ep, intfd);
~Channel();
int fd();
void useet(); //采用边缘触发
void enablereading(); //让epoll_wait()监视fd_的读事件
void setinepoll();
void setrevents(uint32_t ev);
bool inpoll();
uint32_t events();
uint32_t revents(); //返回revents_成员
};*/
Channel::Channel(Epoll *ep, int fd)
:ep_(ep),fd_(fd)
{
}
Channel::~Channel()
{
//在析构函数中,不要销毁ep_ 也不能关闭fd_ 不属于channel类
}
int Channel::fd()
{
return fd_;
}
void Channel::useet()
{
events_ = events_ | EPOLLET;
}
void Channel::enablereading()
{
events_ |= EPOLLIN;
ep_->updatechannel(this);
}
void Channel::setinepoll()
{
inepoll_ = true;
}
void Channel::setrevents(uint32_t ev)
{
revents_= ev;
}
bool Channel::inpoll()
{
return inepoll_;
}
uint32_t Channel::events()
{
return events_;
}
uint32_t Channel::revents()
{
return revents_;
}
//事件处理函数, epoll_wait返回的时候执行它。
void Channel::handleevent()
{
if(revents_ & EPOLLRDHUP)
{
printf("cilent fd =%d disconnectionn", fd_);
close(fd_);
}
else if (revents_ & EPOLLIN|EPOLLPRI)
{
readcallback_();
}
else if (revents_ & EPOLLOUT)
{
}
else
{
printf("cilent fd =%dn", fd_);
close(fd_);
}
}
void Channel::newconnection(Socket* servsock)
{
InetAddress clientaddr;
Socket *clientsock = new Socket(servsock->accept(clientaddr));
printf("FILE(%s)FUNCTION(%s)LINE(%d) accept client fd=%d, ip=%s,port=%d ok.n",
__FILE__, __func__, __LINE__,
clientsock->fd(), clientaddr.ip(), clientaddr.port());
Channel *clientchannel = new Channel(ep_, clientsock->fd());
clientchannel->setreadcallback(std::bind(&Channel::onmessage, clientchannel));
clientchannel->useet();
clientchannel->enablereading();
}
void Channel::onmessage()
{
char buffer[1024];
memset(buffer, 0, sizeof(buffer));
size_t nread = recv(fd_, buffer, sizeof(buffer), 0);
if(nread > 0)
{
printf("tcpepoll Recv:%sn", buffer);
send(fd_, buffer, strlen(buffer), 0);
}else if (nread == -1 && errno == EINTR)
{
}else if(nread == -1 && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
{
}else if (nread == 0)
{
printf("clientfd:%d disconnectedn", fd_);
close(fd_);
}
}
//设置fd的回调函数
void Channel::setreadcallback(std::function fn)
{
readcallback_ = fn;
}
EventLoop.h
#pragma once
#include "Epoll.h"
class EventLoop
{
private:
Epoll *ep_;
public:
EventLoop();
~EventLoop();
void run();
Epoll* ep();
};
EventLoop.cpp
#include "EventLoop.h"
/*
class EventLoop
{
private:
Epoll *ep_;
public:
EventLoop();
~EventLoop();
void run();
};
*/
EventLoop::EventLoop():ep_(new Epoll)
{
}
EventLoop::~EventLoop()
{
delete ep_;
}
void EventLoop::run()
{
while(true)
{
std::vector channles = ep_->loop();
for(auto &ch:channles)
{
ch->handleevent();
}
}
}
Epoll* EventLoop::ep()
{
return ep_;
}
TcpServer.h
#pragma once
#include "EventLoop.h"
#include "Socket.h"
#include "Channel.h"
class TcpServer
{
private:
EventLoop loop_;
public:
TcpServer(const std::string &ip, const uint16_t port);
~TcpServer();
void start();
};
TcpServer.cpp
#include "TcpServer.h"
/*
class TcpServer
{
private:
EventLoop loop_;
public:
TcpServer(const std::string &ip, const uint16_t port);
~TcpServer();
};
*/
TcpServer::TcpServer(const std::string &ip, const uint16_t port)
{
Socket *servsock = new Socket(createnonblocking());
InetAddress servaddr(ip, port);
servsock->setreuseaddr(true);
servsock->setreuseport(true);
servsock->settcpnodelay(true);
servsock->setkeepalive(true);
servsock->bind(servaddr);
servsock->listen();
Channel *servchannel = new Channel(loop_.ep(), servsock->fd());
servchannel->setreadcallback(std::bind(&Channel::newconnection, servchannel, servsock));
servchannel->enablereading();
}
TcpServer::~TcpServer()
{
}
void TcpServer::start()
{
loop_.run();
}
tcpepoll.cpp
#include "TcpServer.h"
int main(int argc, char *argv[])
{
if(argc !=3)
{
printf("usage: ./tcpepoll ip portn");
printf("examples ./tcpepoll 127.0.0.1 6666n");
return -1;
}
TcpServer tcpserver(argv[1], atoi(argv[2]));
tcpserver.start(); //运行事件循环
return 0;
}
client.cpp
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include // TCP_NODELAY
#include
int main(int argc, char *argv[])
{
if(argc !=3)
{
printf("usage: ./client ip port");
return -1;
}
int sockfd;
struct sockaddr_in servaddr;
char buf[1024];
if((sockfd=socket(AF_INET,SOCK_STREAM, 0))
makefile
all: client tcpepoll
client: client.cpp
g++ -g -o client client.cpp
tcpepoll:tcpepoll.cpp InetAddress.cpp Socket.cpp Epoll.cpp Channel.cpp EventLoop.cpp TcpServer.cpp
g++ -g -o tcpepoll tcpepoll.cpp InetAddress.cpp Socket.cpp Epoll.cpp Channel.cpp EventLoop.cpp TcpServer.cpp
clean:
rm -f client tcpepoll
运行
服务器
客户端
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
4.Ribbon负载均衡 上一节中,我们添加了@LoadBalanced注解,即可实现负载均衡功能,这是什么原理呢? 4.1.负载均衡原理 SpringCloud底层其实是利用了一个名为Ribbon的组件,来实现负载均衡功能的。 那么我们发出的请求明明是htt…