须知
1. 需要的jar包jboss-marshalling-serial-1.3.0.CR9.jar、jboss-marshalling-1.3.0.CR9.jar、netty-all-5.jar
2. 传输对象需要实现Serializable接口
Server
package com.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class Server { public static void main(String[] args) throws Exception { //1.第一个线程是用于接收client连接的 EventLoopGroup bossGroup = new NioEventLoopGroup(); //2.第二个线程是用于实际的业务处理操作的 EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup). channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHandler()); } }) .option(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(1234).sync(); f.channel().closeFuture().sync(); bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); }}
Client
package com.netty;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture f = b.connect("127.0.0.1", 1234).sync(); for(int i=0;i<5;i++){ Req req = new Req(); req.setId(i+""); req.setName("Hello Word"+i); req.setRequestMessage("数据信息"+i); f.channel().writeAndFlush(req); } f.channel().closeFuture().sync(); group.shutdownGracefully(); }}
ClientHandler
package com.netty;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Resp resp = (Resp) msg; System.out.println(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub super.exceptionCaught(ctx, cause); }}
ServerHandler
package com.netty;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.util.ReferenceCountUtil;public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Req req = (Req) msg; System.out.println(req); Resp resp = new Resp(); resp.setId(req.getId()); resp.setName(req.getName()); resp.setResponseMessage("响应消息"+req.getId()); ctx.writeAndFlush(resp); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub super.exceptionCaught(ctx, cause); }}
MarshallingCodeCFactory
package com.netty;import org.jboss.marshalling.MarshallerFactory;import org.jboss.marshalling.Marshalling;import org.jboss.marshalling.MarshallingConfiguration;import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;import io.netty.handler.codec.marshalling.MarshallerProvider;import io.netty.handler.codec.marshalling.MarshallingDecoder;import io.netty.handler.codec.marshalling.MarshallingEncoder;import io.netty.handler.codec.marshalling.UnmarshallerProvider;public class MarshallingCodeCFactory { /** * 创建Jboss Marshalling解码器MarshallingDecoder * @return */ public static MarshallingDecoder buildMarshallingDecoder(){ //首先通过Marshalling工具类的方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //创建MarshallingConfiguration对象,配置版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根据MarshallerFactory和configuration创建provide UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //构建netty的marshallingDecoder,两个参数分别为provider和单个消息序列化后的最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider,1024*1024*1); return decoder; } /** * 创建Jboss Marshalling编码器MarshallingEncoder * @return */ public static MarshallingEncoder buildMarshallingEncoder(){ //首先通过Marshalling工具类的方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //创建MarshallingConfiguration对象,配置版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根据MarshallerFactory和configuration创建provide MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; }}
Req
package com.netty;import java.io.Serializable;public class Req implements Serializable { private static final long serialVersionUID = 1L; private String id; private String name; private String requestMessage; private byte[] attachment; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } public byte[] getAttachment() { return attachment; } public void setAttachment(byte[] attachment) { this.attachment = attachment; } @Override public String toString() { return "Req [id=" + id + ", name=" + name + ", requestMessage=" + requestMessage + "]"; } }
Resp
package com.netty;import java.io.Serializable;public class Resp implements Serializable { private static final long serialVersionUID = 1L; private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } @Override public String toString() { return "Resp [id=" + id + ", name=" + name + ", responseMessage=" + responseMessage + "]"; } }
输出的数据
server端
Req [id=0, name=Hello Word0, requestMessage=数据信息0]
Req [id=1, name=Hello Word1, requestMessage=数据信息1] Req [id=2, name=Hello Word2, requestMessage=数据信息2] Req [id=3, name=Hello Word3, requestMessage=数据信息3] Req [id=4, name=Hello Word4, requestMessage=数据信息4]client端
Resp [id=0, name=Hello Word0, responseMessage=响应消息0]
Resp [id=1, name=Hello Word1, responseMessage=响应消息1] Resp [id=2, name=Hello Word2, responseMessage=响应消息2] Resp [id=3, name=Hello Word3, responseMessage=响应消息3] Resp [id=4, name=Hello Word4, responseMessage=响应消息4]