博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty网络聊天(一) 聊天室的实战(最易懂)
阅读量:6267 次
发布时间:2019-06-22

本文共 11703 字,大约阅读时间需要 39 分钟。

之前做过一个IM的项目,里面涉及了基本的聊天功能,所以注意这系列的文章不是练习,不含基础和逐步学习的部分,直接开始实战和思想引导,基础部分需要额外的去补充,我有精力的话可以后续出一系列的文章。

为什么第一篇是聊天室,聊天室是最容易实现的部分。也是IM结构最简单的一部分,其次作单聊和群聊,业务逻辑层层递增,彻底的拿下聊天室的代码,进阶单聊和群聊就很简单了,后续我还会推出直播间的实现。

如果单纯想实现聊天室很简单,但是我尽量会把流程都走全,为了方便理解。

主要由两个功能类实现:初始化类+响应处理类

0. 准备工作

添加pom.xml

io.netty
netty-all
4.1.2.Final
复制代码

1. 辅助接口实现

辅助接口让服务架构更加清晰,这里有两个接口,一个用来处理Http请求,一个处理Webocket请求。

MyHttpService.java

/** * 处理 http请求 */public interface MyHttpService {    void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request);}复制代码

MyWebSocketService.java

/** * 处理 WebSocket 请求中的frame */public interface MyWebSocketService {    void handleFrame(ChannelHandlerContext ctx, WebSocketFrame frame);}复制代码

那么问题来了,谁来实现这两个类呢,谁来处理这两种请求的分发呢。

下面来看服务响应处理类:WebSocketServerHandler.java

2. 请求处理类

继承SimpleChannelInboundHandler类,实现channelRead0() handlerAdded() handlerRemoved() exceptionCaught()等方法,第一个是必选方法,其他方法供我们做一些标记和后续处理。

WebSocketServerHandler.java

@Slf4jpublic class WebSocketServerHandler extends SimpleChannelInboundHandler {    private MyHttpService httpService;    private MyWebSocketService webSocketService;    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);    public WebSocketServerHandler(MyHttpService httpService, MyWebSocketService webSocketService) {        super();        this.httpService = httpService;        this.webSocketService = webSocketService;    }    @Override    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        if (msg instanceof FullHttpRequest) {            httpService.handleHttpRequest(ctx, (FullHttpRequest) msg);        } else if (msg instanceof WebSocketFrame) {            webSocketService.handleFrame(ctx, (WebSocketFrame) msg);        }    }    @Override    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        channels.add(ctx.channel());        channels.writeAndFlush(new TextWebSocketFrame(ctx.channel() +"上线了"));    }    @Override    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {        channels.remove(ctx.channel());        channels.writeAndFlush(new TextWebSocketFrame(ctx.channel() +"下线了"));    }    /**     * 发生异常时处理     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        channels.remove(ctx.channel());        ctx.close();        log.info("异常信息:{}",cause.getMessage());    }}复制代码
  1. 创建ChannelGroup,来保存每个已经建立连接的channel,在handlerAdded()方法中channels.add(ctx.channel());,相应的在handlerRemoved方法中remove
  2. channelRead0()方法中,实现了对请求的识别和分别处理。
  3. exceptionCaught()方法为发生异常时处理。

3. 初始化类实现

@Slf4jpublic class WebSocketServer implements MyHttpService, MyWebSocketService {    /**     * 握手用的 变量     */    private static final AttributeKey
ATTR_HAND_SHAKER = AttributeKey.newInstance("ATTR_KEY_CHANNEL_ID"); private static final int MAX_CONTENT_LENGTH = 65536; /** * 请求类型常量 */ private static final String WEBSOCKET_UPGRADE = "websocket"; private static final String WEBSOCKET_CONNECTION = "Upgrade"; private static final String WEBSOCKET_URI_ROOT_PATTERN = "ws://%s:%d"; /** * 用户字段 */ private String host; private int port; /** * 保存 所有的连接 */ private Map
channelMap = new HashMap<>(); private final String WEBSOCKET_URI_ROOT; public WebSocketServer(String host, int port) { this.host = host; this.port = port; // 将 ip 和端口 按照格式 赋值给 uri WEBSOCKET_URI_ROOT = String.format(WEBSOCKET_URI_ROOT_PATTERN, host, port); } public void start(){ // 实例化 nio监听事件池 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 实例化 nio工作线程池 EventLoopGroup workerGroup = new NioEventLoopGroup(); // 启动器 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer
() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pl = channel.pipeline(); // 保存 该channel 到map中 channelMap.put(channel.id(),channel); log.info("new channel {}",channel); channel.closeFuture().addListener((ChannelFutureListener) channelFuture -> { log.info("channel close future {}",channelFuture); //关闭后 从map中移除 channelMap.remove(channelFuture.channel().id()); }); //添加 http 编解码 pl.addLast(new HttpServerCodec()); // 聚合器 pl.addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH)); // 支持大数据流 pl.addLast(new ChunkedWriteHandler()); // 设置 websocket 服务处理方式 pl.addLast(new WebSocketServerHandler(WebSocketServer.this, WebSocketServer.this)); } }); /** * 实例化完毕后,需要完成端口绑定 */ try { ChannelFuture channelFuture = bootstrap.bind(host,port).addListener((ChannelFutureListener) channelFuture1 -> { if (channelFuture1.isSuccess()){ log.info("webSocket started"); } }).sync(); channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture12 -> log.info("server channel {} closed.", channelFuture12.channel())).sync(); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("绑定端口失败"); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } log.info("webSocket shutdown"); } @Override public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) { //判断是不是 socket 请求 if (isWebSocketUpgrade(request)){ //如果是webSocket请求 log.info("请求是webSocket协议"); // 获取子协议 String subProtocols = request.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL); //握手工厂 设置 uri+协议+不允许扩展 WebSocketServerHandshakerFactory handshakerFactory = new WebSocketServerHandshakerFactory(WEBSOCKET_URI_ROOT,subProtocols,false); // 从工厂中实例化一个 握手请求 WebSocketServerHandshaker handshaker = handshakerFactory.newHandshaker(request); if (handshaker == null){ //握手失败:不支持的协议 WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); }else { //响应请求:将 握手转交给 channel处理 handshaker.handshake(ctx.channel(),request); //将 channel 与 handshaker 绑定 ctx.channel().attr(ATTR_HAND_SHAKER).set(handshaker); } return; }else { // 不处理 HTTP 请求 log.info("不处理 HTTP 请求"); } } @Override public void handleFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { /** * text frame handler */ if (frame instanceof TextWebSocketFrame){ String text = ((TextWebSocketFrame) frame).text(); TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(text); log.info("receive textWebSocketFrame from channel: {} , 目前一共有{}个在线",ctx.channel(),channelMap.size()); //发给其它的 channel (群聊功能) for (Channel ch: channelMap.values()){ if (ch.equals(ctx.channel())){ continue; } //将 text frame 写出 ch.writeAndFlush(textWebSocketFrame); log.info("消息已发送给{}",ch); log.info("write text: {} to channel: {}",textWebSocketFrame,ctx.channel()); } return; } /** * ping frame , 回复 pong frame */ if (frame instanceof PingWebSocketFrame){ log.info("receive pingWebSocket from channel: {}",ctx.channel()); ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain())); return; } /** * pong frame, do nothing */ if (frame instanceof PongWebSocketFrame){ log.info("receive pongWebSocket from channel: {}",ctx.channel()); return; } /** * close frame, close */ if (frame instanceof CloseWebSocketFrame){ log.info("receive closeWebSocketFrame from channel: {}", ctx.channel()); //获取到握手信息 WebSocketServerHandshaker handshaker = ctx.channel().attr(ATTR_HAND_SHAKER).get(); if (handshaker == null){ log.error("channel: {} has no handShaker", ctx.channel()); return; } handshaker.close(ctx.channel(),((CloseWebSocketFrame) frame).retain()); return; } /** * 剩下的都是 二进制 frame ,忽略 */ log.warn("receive binary frame , ignore to handle"); } /** * 判断是否是 webSocket 请求 */ private boolean isWebSocketUpgrade(FullHttpRequest req) { HttpHeaders headers = req.headers(); return req.method().equals(HttpMethod.GET) && headers.get(HttpHeaderNames.UPGRADE).contains(WEBSOCKET_UPGRADE) && headers.get(HttpHeaderNames.CONNECTION).contains(WEBSOCKET_CONNECTION); }}复制代码
  1. l.addLast(new WebSocketServerHandler(WebSocketServer.this, WebSocketServer.this)); 添加自己的响应处理。WebSocketServerHandler是第二点实现的请求处理类.
  2. private Map<ChannelId, Channel> channelMap = new HashMap<>();来将ChannelId和CHannel对应保存。方便后来对应获取。
  3. bootstrap.bind(host,port)也可以替换成仅bind端口。
public ChannelFuture bind(String inetHost, int inetPort) {        return bind(new InetSocketAddress(inetHost, inetPort));    }复制代码
public synchronized InetAddress anyLocalAddress() {        if (anyLocalAddress == null) {            anyLocalAddress = new Inet4Address(); // {0x00,0x00,0x00,0x00}            anyLocalAddress.holder().hostName = "0.0.0.0";        }        return anyLocalAddress;    }复制代码

它默认会给0.0.0.0端口开放服务。 4. handleHttpRequesthandleFrameMyWebSocketService类的一个实现。 5. 各个细节都有注释,仔细看注释。

4. 启动服务

public class Main {    public static void main(String[] args) {        new WebSocketServer("192.168.1.33",9999).start();    }}复制代码
局域网内如何测试?

我用的是npm 的一个serve 服务来搞局域网。 官网介绍: 我的文章:

这下保证你的手机和电脑都在局域网内,就可以访问你自己的群聊了。

5. 前端页面

要送就送一套。

    
Document
复制代码
  1. socket = new WebSocket("ws://192.168.1.33:9999");注意这里ip和port与服务一一对应。
  2. socket.onmessage()是获取socket信息。socket.onopen是创建连接。socket.onclose是关闭连接。socket.send(message.value);是发送socket信息。

控制台输出:

15:12:42.443 [nioEventLoopGroup-3-6] INFO com.fantj.springbootjpa.netty.WebSocketServer - receive textWebSocketFrame from channel: [id: 0x0d08c657, L:/192.168.1.33:9999 - R:/192.168.1.33:50440] , 目前一共有2个在线15:12:42.443 [nioEventLoopGroup-3-6] INFO com.fantj.springbootjpa.netty.WebSocketServer - 消息已发送给[id: 0xacd5c1ad, L:/192.168.1.33:9999 - R:/192.168.1.33:50438]15:12:42.444 [nioEventLoopGroup-3-5] DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder - Encoding WebSocket Frame opCode=1 length=515:12:42.443 [nioEventLoopGroup-3-6] INFO com.fantj.springbootjpa.netty.WebSocketServer - write text: TextWebSocketFrame(data: UnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 15)) to channel: [id: 0x0d08c657, L:/192.168.1.33:9999 - R:/192.168.1.33:50440]复制代码


如果你喜欢我的文章,那麻烦请关注我的公众号,公众号重点分析架构师技术,该公众号还处于初始阶段,谢谢大家的支持。

关注公众号,回复
java架构获取架构视频资源(后期还会分享不同的优质资源噢)。



转载地址:http://uwcpa.baihongyu.com/

你可能感兴趣的文章
Java Gui 设计模式中的事件监听
查看>>
JavaSE-final关键字
查看>>
python自动化开发-1
查看>>
Remote远程特性的使用
查看>>
orm在django中的简单使用
查看>>
poj 2373 Dividing the Path
查看>>
dplyr 数据操作 常用函数(4)
查看>>
A股实时股票行情,指数行情web API 使用方法
查看>>
大整数算法[04] 位操作
查看>>
C# Parsing 类实现的 PDF 文件分析器
查看>>
汇编学习(1)
查看>>
Google招聘 Lead Software Engineer
查看>>
Bzoj1026 windy数
查看>>
Java07
查看>>
mongodb基础知识_4
查看>>
ROP
查看>>
Windows常用网络命令技巧汇总 [转]
查看>>
【noi 2.6_8787】数的划分(DP){附【转】整数划分的解题方法}
查看>>
Win8 app判断网络连接状态
查看>>
CentOS 6.7下nginx SSL证书部署的方法分享
查看>>