`
xulang
  • 浏览: 3997 次
文章分类
社区版块
存档分类
最新评论

基于Netty的异步Rpc调用的小框架

阅读更多

基于netty写的一个异步Rpc调用小框架,欢迎拍砖,新手。

 

客户端与服务端通信的类

package cc.ymsoft.Framework;

import java.io.Serializable;

@SuppressWarnings("serial")
public class MethodAndArgs implements Serializable{
	private String methodName;//调用的方法名称
	private Class<?>[] types;//参数类型
	private Object[] objects;//参数列表
	public String getMethodName() {
		return methodName;
	}
	public void setMethodName(String methodName) {
		this.methodName = methodName;
	}
	public Class<?>[] getTypes() {
		return types;
	}
	public void setTypes(Class<?>[] types) {
		this.types = types;
	}
	public Object[] getObjects() {
		return objects;
	}
	public void setObjects(Object[] objects) {
		this.objects = objects;
	}
	public MethodAndArgs() {
		super();
		// TODO Auto-generated constructor stub
	}
	public MethodAndArgs(String methodName, Class<?>[] types, Object[] objects) {
		
		this.methodName = methodName;
		this.types = types;
		this.objects = objects;
	}
	
	
}

 

 框架类,有两个静态方法,regist(在服务器上注册服务)和getobjt(获得接口的代理类)

/**
 * @author xulang
 */
package cc.ymsoft.Framework;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
 * 服务端处理
 * @author hadoop
 *
 */
 class TcpServerHandler extends ChannelInboundHandlerAdapter {
    
    private Object obj;
    private Object response;
    
    public TcpServerHandler(Object obj) {
		super();
		this.obj = obj;
		
	}
	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        // TODO Auto-generated method stub
		MethodAndArgs methodAndArgs=(MethodAndArgs) msg;
		Method method=obj.getClass().getMethod(methodAndArgs.getMethodName(), methodAndArgs.getTypes());
		ctx.writeAndFlush(method.invoke(obj, methodAndArgs.getObjects()));
        ctx.close();
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    	// TODO Auto-generated method stub
    	System.out.println("client die");
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("channelActive>>>>>>>>");
        ctx.writeAndFlush("调用异常");
        ctx.close();
    }
      @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("服务器异常");
    }
}
/**
 * 客户端处理
 * @author hadoop
 *
 */
 class TcpClientHander extends ChannelInboundHandlerAdapter {
	 private Object response;
	 
		public Object getResponse() {
		return response;
	}

		@Override
	    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
			response=msg;
	        System.out.println("client接收到服务器返回的消息:" + msg);
	    }
	    
		@Override
	    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
	        System.out.println("client exception is general");
	    }
	}
 
public class RpcFramework {

	/**
	 * 服务注册
	 * @param obj 需要注册的服务对象
	 * @param port 端口
	 * @param ip 地址
	 * @throws InterruptedException 
	 */
	public static void regist(final Object obj,int port,String ip) throws InterruptedException {
		int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2;
		    
		int BIZTHREADSIZE = 100;
		 EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
		  EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);
		if (obj == null)
			throw new IllegalArgumentException("对象不能为null");
		if (port <= 0 || port > 65535)
			throw new IllegalArgumentException("错误的端口" + port);
		 ServerBootstrap bootstrap = new ServerBootstrap();
	        bootstrap.group(bossGroup, workerGroup);
	        bootstrap.channel(NioServerSocketChannel.class);
	        bootstrap.childHandler(new ChannelInitializer<Channel>() {

	            @Override
	            protected void initChannel(Channel ch) throws Exception {
	                // TODO Auto-generated method stub
	                ChannelPipeline pipeline = ch.pipeline();
	                 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
	                    pipeline.addLast(new LengthFieldPrepender(4));
	                    pipeline.addLast("encoder", new ObjectEncoder());  
	                    pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
	                  //  pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
	                   // pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
	                    pipeline.addLast(new TcpServerHandler(obj));
	            }       
	        });
	          ChannelFuture f = bootstrap.bind(ip, port).sync();
	          f.channel().closeFuture().sync();
	          System.out.println("TCP服务器已启动");
	}
	@SuppressWarnings("unchecked")
	public static <T>T getObj(Class<T> interfaceClass,final String host,final int port) {
		if (interfaceClass == null)
			throw new IllegalArgumentException("接口类型不能为空");
		if (!interfaceClass.isInterface())
			throw new IllegalArgumentException("类名" + interfaceClass.getName() + "必须是接口");
		if (host == null || host.length() == 0)
			throw new IllegalArgumentException("目标主机不能为空");
		if (port <= 0 || port > 65535)
			throw new IllegalArgumentException("端口错误:" + port);
		
	return (T)	Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
			
			@Override
			public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				
				MethodAndArgs mArgs=new MethodAndArgs(method.getName(), method.getParameterTypes(), args);
				 final TcpClientHander tcpClientHander=new TcpClientHander();
				  EventLoopGroup group = new NioEventLoopGroup();
		          try {
		              Bootstrap b = new Bootstrap();
		              b.group(group);
		         //     b.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);
		              b.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);
		             
		              b.handler(new ChannelInitializer<SocketChannel>() {
		                  @Override
		                  protected void initChannel(SocketChannel ch) throws Exception {
		                      ChannelPipeline pipeline = ch.pipeline();
		                      pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
		                      pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
		                      pipeline.addLast("encoder", new ObjectEncoder());  
		                      pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
		                      pipeline.addLast("handler",tcpClientHander);
		                  }
		              });
		              
		                   ChannelFuture f = b.connect(host, port).sync();    
		                
		                  f.channel().writeAndFlush(mArgs).sync();
		                   f.channel().closeFuture().sync();
		                 
		          } catch (Exception e) {

		          } finally {
		        	  
		              group.shutdownGracefully();
		          }
				return tcpClientHander.getResponse();
		     
				
			}
		});
	}
}

  测试

接口

package cc.ymsoft.test;

interface HelloService {
	String SayHello(String name);
}

 接口实现类

package cc.ymsoft.test;

public class HelloImp implements HelloService {

	@Override
	public String SayHello(String name) {
		// TODO Auto-generated method stub
		
		return "你好:"+name;
	}


}

 客户端

package cc.ymsoft.test;

import cc.ymsoft.Framework.RpcFramework;

public class HelloInvoke {
	
	public static void main(String[] args) throws Exception {
		final HelloService helloService = RpcFramework.getObj(HelloService.class, "127.0.0.1", 1717);
	
		
					System.out.println(helloService.SayHello("XL"));
				
		
	}

}

 服务端

 

package cc.ymsoft.test;

import cc.ymsoft.Framework.RpcFramework;

public class HelloPro {
	public static void main(String[] args) throws Exception {
		HelloService hello=new HelloImp();
		RpcFramework.regist(hello, 1717, "127.0.0.1");
	}
	
}

 

完整代码在github https://github.com/xulang/NettyRpc

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics