微言Netty:手写分布式服务框架
public class NettyServer {/*** 服务端带参构造* @param serverAddress* @param serviceRegistry* @param serverBeans*/public NettyServer(String serverAddress, ServerRegistry serviceRegistry, Map<String, Object> serverBeans) {this.serverAddress = serverAddress;this.serviceRegistry = serviceRegistry;this.serverBeans = serverBeans;}/*** 日志记录*/private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);/*** 服务端绑定地址*/private String serverAddress;/*** 服务注册*/private ServerRegistry serviceRegistry;/*** 服务端加载的bean列表*/private Map<String, Object> serverBeans;/*** 主事件池*/private EventLoopGroup bossGroup = new NioEventLoopGroup();/*** 副事件池*/private EventLoopGroup workerGroup = new NioEventLoopGroup();/*** 服务端通道*/private Channel serverChannel;/*** 绑定本机监听** @throws Exception*/public void bind() throws Exception {//启动器ServerBootstrap serverBootstrap = new ServerBootstrap();//为Acceptor设置事件池,为客户端接收设置事件池serverBootstrap.group(bossGroup, workerGroup)//工厂模式,创建NioServerSocketChannel类对象.channel(NioServerSocketChannel.class)//等待队列大小.option(ChannelOption.SO_BACKLOG, 100)//地址复用.option(ChannelOption.SO_REUSEADDR, true)//开启Nagle算法,//网络好的时候:对响应要求比较高的业务,不建议开启,比如玩游戏,键盘数据,鼠标响应等,需要实时呈现;// 对响应比较低的业务,建议开启,可以有效减少小数据包传输。//网络差的时候:不建议开启,否则会导致整体效果更差。.option(ChannelOption.TCP_NODELAY, true)//日志记录组件的level.handler(new LoggingHandler(LogLevel.INFO))//各种业务处理handler.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {//空闲检测handler,用于检测通道空闲状态channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(45, 45, 120));//编码器channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024, 4, 4));//解码器channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder());//心跳包业务处理,一般需要配置idleStateHandler一起使用channel.pipeline().addLast("heartBeatHandler", new HeartBeatResponseHandler());//服务端先进行鉴权,然后处理业务channel.pipeline().addLast("loginAuthResponseHandler", new LoginAuthResponseHandler());//业务处理handlerchannel.pipeline().addLast("nettyHandler", new ServerHandler(serverBeans));}});//获取ip和端口String[] array = serverAddress.split(":");String host = array[];int port = Integer.parseInt(array[1]);//绑定端口,同步等待成功ChannelFuture future = serverBootstrap.bind(host, port).sync();//注册连接事件监听器future.addListener(cfl -> {if (cfl.isSuccess()) {logger.info("服务端[" + host + ":" + port + "]已上线...");serverChannel = future.channel();}});//注册关闭事件监听器future.channel().closeFuture().addListener(cfl -> {//关闭服务端close();logger.info("服务端[" + host + ":" + port + "]已下线...");});//注册服务地址if (serviceRegistry != null) {serviceRegistry.register(serverBeans.keySet(), host, port);}}/*** 关闭server*/public void close() {//关闭套接字if(serverChannel!=null){serverChannel.close();}//关闭主线程组if (bossGroup != null) {bossGroup.shutdownGracefully();}//关闭副线程组if (workerGroup != null) {workerGroup.shutdownGracefully();}}}
public class NettyClient { /** * 日志记录 */ private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); /** * 客户端请求Future列表 */ private Map<String, TinyWhaleFuture> clientFutures = new ConcurrentHashMap<>(); /** * 客户端业务处理handler */ private ClientHandler clientHandler = new ClientHandler(clientFutures); /** * 事件池 */ private EventLoopGroup group = new NioEventLoopGroup(); /** * 启动器 */ private Bootstrap bootstrap = new Bootstrap(); /** * 客户端通道 */ private Channel clientChannel; /** * 客户端连接 * @param host * @param port * @throws InterruptedException */ public NettyClient(String host, int port) throws InterruptedException { bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { //通道空闲检测 channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(45, 45, 120)); //解码器 channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024 * 1024, 4, 4)); //编码器 channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder()); //心跳处理 channel.pipeline().addLast("heartBeatHandler", new HeartBeatRequestHandler()); //业务处理 channel.pipeline().addLast("clientHandler", clientHandler); //鉴权处理 channel.pipeline().addLast("loginAuthHandler", new LoginAuthRequestHandler()); } }); //发起同步连接操作 ChannelFuture channelFuture = bootstrap.connect(host, port); //注册连接事件 channelFuture.addListener((ChannelFutureListener)future -> { //如果连接成功 if (future.isSuccess()) { logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已连接..."); clientChannel = channelFuture.channel(); } //如果连接失败,尝试重新连接 else{ logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]连接失败,重新连接中..."); future.channel().close(); bootstrap.connect(host, port); } }); //注册关闭事件 channelFuture.channel().closeFuture().addListener(cfl -> { close(); logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已断开..."); }); } /** * 客户端关闭 */ private void close() { //关闭客户端套接字 if(clientChannel!=null){ clientChannel.close(); } //关闭客户端线程组 if (group != null) { group.shutdownGracefully(); } } /** * 客户端发送消息,将获取的Future句柄保存到clientFutures列表 * @return * @throws InterruptedException * @throws ExecutionException */ public TinyWhaleFuture send(NettyMessage<NettyRequest> request) { TinyWhaleFuture rpcFuture = new TinyWhaleFuture(request); rpcFuture.addCallback(new TinyWhaleAsyncCallback() { @Override public void success(Object result) { } @Override public void fail(Exception e) { logger.error("发送失败", e); } }); clientFutures.put(request.getBody().getRequestId(), rpcFuture); clientHandler.sendMessage(request); return rpcFuture; }}朕已阅
相关文章