如何让Netty管理任意客户端连接而非oracle服务端和客户端连接

如何让Netty管理任意客户端连接而非服务端连接Netty实现服务端客户端长连接通讯及心跳检测
通过netty实现服务端与客户端的长连接通讯,及心跳检测。
& && & 基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。
& && &&&环境JDK1.8 和netty5
& && &&&以下是具体的代码实现和介绍:
1公共的Share部分(主要包含消息协议类型的定义)
& &&&设计消息类型:
public enum&&MsgType {
& & PING,ASK,REPLY,LOGIN
& & Message基类:
//必须实现序列,serialVersionUID 一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!
public abstract class BaseMsg&&implements Serializable {
& & private static final long serialVersionUID = 1L;
& & private MsgT
& & //必须唯一,否者会出现channel调用混乱
& & private String clientId;
& & //初始化客户端id
& & public BaseMsg() {
& && &&&this.clientId = Constants.getClientId();
& & public String getClientId() {
& && &&&return clientId;
& & public void setClientId(String clientId) {
& && &&&this.clientId = clientId;
& & public MsgType getType() {
& & public void setType(MsgType type) {
& && &&&this.type =
常量设置:
public class Constants {
& & private static String clientId;
& & public static String getClientId() {
& && &&&return clientId;
& & public static void setClientId(String clientId) {
& && &&&Constants.clientId = clientId;
登录类型消息:
public class LoginMsg extends BaseMsg {
& & private String userN
& & private S
& & public LoginMsg() {
& && &&&super();
& && &&&setType(MsgType.LOGIN);
& & public String getUserName() {
& && &&&return userN
& & public void setUserName(String userName) {
& && &&&this.userName = userN
& & public String getPassword() {
& & public void setPassword(String password) {
& && &&&this.password =
心跳检测Ping类型消息:
public class PingMsg extends BaseMsg {
& & public PingMsg() {
& && &&&super();
& && &&&setType(MsgType.PING);
请求类型消息:
public class AskMsg extends BaseMsg {
& & public AskMsg() {
& && &&&super();
& && &&&setType(MsgType.ASK);
& & private AskP
& & public AskParams getParams() {
& & public void setParams(AskParams params) {
& && &&&this.params =
//请求类型参数
//必须实现序列化接口
public class AskParams implements Serializable {
& & private static final long serialVersionUID = 1L;
& & private S
& & public String getAuth() {
& & public void setAuth(String auth) {
& && &&&this.auth =
响应类型消息:
public class ReplyMsg extends BaseMsg {
& & public ReplyMsg() {
& && &&&super();
& && &&&setType(MsgType.REPLY);
& & private ReplyB
& & public ReplyBody getBody() {
& & public void setBody(ReplyBody body) {
& && &&&this.body =
//相应类型body对像
public class ReplyBody implements Serializable {
& & private static final long serialVersionUID = 1L;
public class ReplyClientBody extends ReplyBody {
& & private String clientI
& & public ReplyClientBody(String clientInfo) {
& && &&&this.clientInfo = clientI
& & public String getClientInfo() {
& && &&&return clientI
& & public void setClientInfo(String clientInfo) {
& && &&&this.clientInfo = clientI
public class ReplyServerBody extends ReplyBody {
& & private String serverI
& & public ReplyServerBody(String serverInfo) {
& && &&&this.serverInfo = serverI
& & public String getServerInfo() {
& && &&&return serverI
& & public void setServerInfo(String serverInfo) {
& && &&&this.serverInfo = serverI
2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.
public class NettyChannelMap {
& & private static Map&String,SocketChannel& map=new ConcurrentHashMap&String, SocketChannel&();
& & public static void add(String clientId,SocketChannel socketChannel){
& && &&&map.put(clientId,socketChannel);
& & public static Channel get(String clientId){
& && & return map.get(clientId);
& & public static void remove(SocketChannel socketChannel){
& && &&&for (Map.Entry entry:map.entrySet()){
& && && && &if (entry.getValue()==socketChannel){
& && && && && & map.remove(entry.getKey());
& && && && &}
public class NettyServerHandler extends SimpleChannelInboundHandler&BaseMsg& {
& & @Override
& & public void channelInactive(ChannelHandlerContext ctx) throws Exception {
& && &&&//channel失效,从Map中移除
& && &&&NettyChannelMap.remove((SocketChannel)ctx.channel());
& & @Override
& & protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
& && &&&if(MsgType.LOGIN.equals(baseMsg.getType())){
& && && && &LoginMsg loginMsg=(LoginMsg)baseM
& && && && &if(&robin&.equals(loginMsg.getUserName())&&&yao&.equals(loginMsg.getPassword())){
& && && && && & //登录成功,把channel存到服务端的map中
& && && && && & NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
& && && && && & System.out.println(&client&+loginMsg.getClientId()+& 登录成功&);
& && && && &}
& && &&&}else{
& && && && &if(NettyChannelMap.get(baseMsg.getClientId())==null){
& && && && && && &&&//说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
& && && && && && &&&LoginMsg loginMsg=new LoginMsg();
& && && && && && &&&channelHandlerContext.channel().writeAndFlush(loginMsg);
& && && && &}
& && &&&switch (baseMsg.getType()){
& && && && &case PING:{
& && && && && & PingMsg pingMsg=(PingMsg)baseM
& && && && && & PingMsg replyPing=new PingMsg();
& && && && && & NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
& && && && &}
& && && && &case ASK:{
& && && && && & //收到客户端的请求
& && && && && & AskMsg askMsg=(AskMsg)baseM
& && && && && & if(&authToken&.equals(askMsg.getParams().getAuth())){
& && && && && && &&&ReplyServerBody replyBody=new ReplyServerBody(&server info $$$$ !!!&);
& && && && && && &&&ReplyMsg replyMsg=new ReplyMsg();
& && && && && && &&&replyMsg.setBody(replyBody);
& && && && && && &&&NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
& && && && && & }
& && && && &}
& && && && &case REPLY:{
& && && && && & //收到客户端回复
& && && && && & ReplyMsg replyMsg=(ReplyMsg)baseM
& && && && && & ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
& && && && && & System.out.println(&receive client msg: &+clientBody.getClientInfo());
& && && && &}
& && && && &default:
& && &&&ReferenceCountUtil.release(baseMsg);
ServerBootstrap:
public class NettyServerBootstrap {
& & private SocketChannel socketC
& & public NettyServerBootstrap(int port) throws InterruptedException {
& && &&&this.port =
& && &&&bind();
& & private void bind() throws InterruptedException {
& && &&&EventLoopGroup boss=new NioEventLoopGroup();
& && &&&EventLoopGroup worker=new NioEventLoopGroup();
& && &&&ServerBootstrap bootstrap=new ServerBootstrap();
& && &&&bootstrap.group(boss,worker);
& && &&&bootstrap.channel(NioServerSocketChannel.class);
& && &&&bootstrap.option(ChannelOption.SO_BACKLOG, 128);
& && &&&//通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
& && &&&bootstrap.option(ChannelOption.TCP_NODELAY, true);
& && &&&//保持长连接状态
& && &&&bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
& && &&&bootstrap.childHandler(new ChannelInitializer&SocketChannel&() {
& && && && &@Override
& && && && &protected void initChannel(SocketChannel socketChannel) throws Exception {
& && && && && & ChannelPipeline p = socketChannel.pipeline();
& && && && && & p.addLast(new ObjectEncoder());
& && && && && & p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
& && && && && & p.addLast(new NettyServerHandler());
& && && && &}
& && &&&});
& && &&&ChannelFuture f= bootstrap.bind(port).sync();
& && &&&if(f.isSuccess()){
& && && && &System.out.println(&server start---------------&);
& & public static void main(String []args) throws InterruptedException {
& && &&&NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999);
& && &&&while (true){
& && && && &SocketChannel channel=(SocketChannel)NettyChannelMap.get(&001&);
& && && && &if(channel!=null){
& && && && && & AskMsg askMsg=new AskMsg();
& && && && && & channel.writeAndFlush(askMsg);
& && && && &}
& && && && &TimeUnit.SECONDS.sleep(5);
3 Client端:包含发起登录,发送心跳,及对应消息处理
public class NettyClientHandler extends SimpleChannelInboundHandler&BaseMsg& {
& & //利用写空闲发送心跳检测消息
& & @Override
& & public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
& && &&&if (evt instanceof IdleStateEvent) {
& && && && &IdleStateEvent e = (IdleStateEvent)
& && && && &switch (e.state()) {
& && && && && & case WRITER_IDLE:
& && && && && && &&&PingMsg pingMsg=new PingMsg();
& && && && && && &&&ctx.writeAndFlush(pingMsg);
& && && && && && &&&System.out.println(&send ping to server----------&);
& && && && && && &&&
& && && && && & default:
& && && && && && &&&
& && && && &}
& & @Override
& & protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
& && &&&MsgType msgType=baseMsg.getType();
& && &&&switch (msgType){
& && && && &case LOGIN:{
& && && && && & //向服务器发起登录
& && && && && & LoginMsg loginMsg=new LoginMsg();
& && && && && & loginMsg.setPassword(&yao&);
& && && && && & loginMsg.setUserName(&robin&);
& && && && && & channelHandlerContext.writeAndFlush(loginMsg);
& && && && &}
& && && && &case PING:{
& && && && && & System.out.println(&receive ping from server----------&);
& && && && &}
& && && && &case ASK:{
& && && && && & ReplyClientBody replyClientBody=new ReplyClientBody(&client info **** !!!&);
& && && && && & ReplyMsg replyMsg=new ReplyMsg();
& && && && && & replyMsg.setBody(replyClientBody);
& && && && && & channelHandlerContext.writeAndFlush(replyMsg);
& && && && &}
& && && && &case REPLY:{
& && && && && & ReplyMsg replyMsg=(ReplyMsg)baseM
& && && && && & ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();
& && && && && & System.out.println(&receive client msg: &+replyServerBody.getServerInfo());
& && && && &}
& && && && &default:
& && &&&ReferenceCountUtil.release(msgType);
public class NettyClientBootstrap {
& & private S
& & private SocketChannel socketC
& & private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
& & public NettyClientBootstrap(int port, String host) throws InterruptedException {
& && &&&this.port =
& && &&&this.host =
& && &&&start();
& & private void start() throws InterruptedException {
& && &&&EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
& && &&&Bootstrap bootstrap=new Bootstrap();
& && &&&bootstrap.channel(NioSocketChannel.class);
& && &&&bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
& && &&&bootstrap.group(eventLoopGroup);
& && &&&bootstrap.remoteAddress(host,port);
& && &&&bootstrap.handler(new ChannelInitializer&SocketChannel&() {
& && && && &@Override
& && && && &protected void initChannel(SocketChannel socketChannel) throws Exception {
& && && && && & socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
& && && && && & socketChannel.pipeline().addLast(new ObjectEncoder());
& && && && && & socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
& && && && && & socketChannel.pipeline().addLast(new NettyClientHandler());
& && && && &}
& && &&&});
& && &&&ChannelFuture future =bootstrap.connect(host,port).sync();
& && &&&if (future.isSuccess()) {
& && && && &socketChannel = (SocketChannel)future.channel();
& && && && &System.out.println(&connect server&&成功---------&);
& & public static void main(String[]args) throws InterruptedException {
& && &&&Constants.setClientId(&001&);
& && &&&NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,&localhost&);
& && &&&LoginMsg loginMsg=new LoginMsg();
& && &&&loginMsg.setPassword(&yao&);
& && &&&loginMsg.setUserName(&robin&);
& && &&&bootstrap.socketChannel.writeAndFlush(loginMsg);
& && &&&while (true){
& && && && &TimeUnit.SECONDS.sleep(3);
& && && && &AskMsg askMsg=new AskMsg();
& && && && &AskParams askParams=new AskParams();
& && && && &askParams.setAuth(&authToken&);
& && && && &askMsg.setParams(askParams);
& && && && &bootstrap.socketChannel.writeAndFlush(askMsg);
具体的例子和相应pom.xml 见&
没有更多推荐了,
加入CSDN,享受更精准的内容推荐,与500万程序员共同成长!所有文章皆为原创,若转载请标明出处,谢谢~
新浪微博,欢迎关注:
28293012345678910111214151617181920212223242526272829303112345678
随笔 - 148
评论 - 310
留言簿(53)
随笔分类(125)
随笔档案(147)
个人一直在使用中,在线观看www.youtube.com上高清视频一点都不卡。
每个月才18块钱,一个季度48,再超值提供一个9折优惠码:freevpnssh。
我平常在Linux下安装/更新软件,压根不用担心GFW阻挠了,
更不用提windows 7了。
阅读排行榜
评论排行榜netty 客户端断开连接服务端如何监听
[问题点数:20分,结帖人youqiong1217]
本版专家分:30
结帖率 66.67%
CSDN今日推荐
本版专家分:30
结帖率 66.67%
本版专家分:0
本版专家分:0
匿名用户不能发表回复!|
CSDN今日推荐如何让Netty管理任意客户端连接而非服务端连接
-爱问知识网

我要回帖

更多关于 客户端连接服务端 的文章

 

随机推荐