1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| #include "TcpServer.h" #include "Logger.h"
#include <strings.h>
static EventLoop *CheckLoopNotNull(EventLoop *loop) { if (loop == nullptr) LOG_FATAL("%s:%s:%d mainLoop is null!", __FILE__, __FUNCTION__, __LINE__); return loop; }
TcpServer::TcpServer(EventLoop *loop, const InetAddress &listenAddr, const std::string name, Option option) : _mainloop(CheckLoopNotNull(loop)) , _ipPort(listenAddr.toIpPort()) , _name(name) , _acceptor(new Acceptor(loop, listenAddr, option == kReusePort)) , _threadPool(new EventLoopThreadPool(loop, name)) , _connectionCallback() , _messageCallback() , _nextConnId(1) , _started(0) { _acceptor->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2)); }
TcpServer::~TcpServer() { for (auto &item : _connections) { TcpConnectionPtr conn(item.second); item.second.reset();
conn->getLoop()->runInLoop(std::bind(&TcpConnection::connectDestoryed, conn)); } }
void TcpServer::start() { if (_started++ == 0) { _threadPool->start(_threadInitCallback); _mainloop->runInLoop(std::bind(&Acceptor::listen, _acceptor.get())); } }
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr) { EventLoop *subLoop = _threadPool->getNextLoop(); char buf[64] = {0}; snprintf(buf, sizeof buf, "-%s#%d", _ipPort.c_str(), _nextConnId); ++_nextConnId; std::string connName = _name + buf; LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n", _name.c_str(), connName.c_str(), peerAddr.toIpPort().c_str()); sockaddr_in local; bzero(&local, sizeof local); socklen_t addrlen = sizeof local; if (getsockname(sockfd, (sockaddr *)&local, &addrlen) < 0) LOG_ERROR("sockets::getLocalAddr");
InetAddress localAddr(local);
TcpConnectionPtr conn(new TcpConnection(subLoop, connName, sockfd, localAddr, peerAddr)); _connections[connName] = conn; conn->setConnectionCallback(_connectionCallback); conn->setMessageCallback(_messageCallback); conn->setWriteCompleteCallback(_writeCompleteCallback); conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)); subLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); }
void TcpServer::removeConnection(const TcpConnectionPtr &conn) { _mainloop->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn)); }
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn) { LOG_INFO("TcpServer::removeConnectionInLoop [%s] - connection %s\n", _name.c_str(), conn->name().c_str());
_connections.erase(conn->name()); EventLoop *subloop = conn->getLoop(); subloop->queueInLoop(std::bind(&TcpConnection::connectDestoryed, conn)); }
|