基于netty写的一个异步Rpc调用小框架,欢迎拍砖,新手。
客户端与服务端通信的类
package cc.ymsoft.Framework; import java.io.Serializable; @SuppressWarnings("serial") public class MethodAndArgs implements Serializable{ private String methodName;//调用的方法名称 private Class<?>[] types;//参数类型 private Object[] objects;//参数列表 public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<?>[] getTypes() { return types; } public void setTypes(Class<?>[] types) { this.types = types; } public Object[] getObjects() { return objects; } public void setObjects(Object[] objects) { this.objects = objects; } public MethodAndArgs() { super(); // TODO Auto-generated constructor stub } public MethodAndArgs(String methodName, Class<?>[] types, Object[] objects) { this.methodName = methodName; this.types = types; this.objects = objects; } }
框架类,有两个静态方法,regist(在服务器上注册服务)和getobjt(获得接口的代理类)
/** * @author xulang */ package cc.ymsoft.Framework; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.HashMap; import java.util.Map; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; /** * 服务端处理 * @author hadoop * */ class TcpServerHandler extends ChannelInboundHandlerAdapter { private Object obj; private Object response; public TcpServerHandler(Object obj) { super(); this.obj = obj; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub MethodAndArgs methodAndArgs=(MethodAndArgs) msg; Method method=obj.getClass().getMethod(methodAndArgs.getMethodName(), methodAndArgs.getTypes()); ctx.writeAndFlush(method.invoke(obj, methodAndArgs.getObjects())); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub System.out.println("client die"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub System.out.println("channelActive>>>>>>>>"); ctx.writeAndFlush("调用异常"); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("服务器异常"); } } /** * 客户端处理 * @author hadoop * */ class TcpClientHander extends ChannelInboundHandlerAdapter { private Object response; public Object getResponse() { return response; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { response=msg; System.out.println("client接收到服务器返回的消息:" + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exception is general"); } } public class RpcFramework { /** * 服务注册 * @param obj 需要注册的服务对象 * @param port 端口 * @param ip 地址 * @throws InterruptedException */ public static void regist(final Object obj,int port,String ip) throws InterruptedException { int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; int BIZTHREADSIZE = 100; EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE); EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE); if (obj == null) throw new IllegalArgumentException("对象不能为null"); if (port <= 0 || port > 65535) throw new IllegalArgumentException("错误的端口" + port); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { // TODO Auto-generated method stub ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast("encoder", new ObjectEncoder()); pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); // pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new TcpServerHandler(obj)); } }); ChannelFuture f = bootstrap.bind(ip, port).sync(); f.channel().closeFuture().sync(); System.out.println("TCP服务器已启动"); } @SuppressWarnings("unchecked") public static <T>T getObj(Class<T> interfaceClass,final String host,final int port) { if (interfaceClass == null) throw new IllegalArgumentException("接口类型不能为空"); if (!interfaceClass.isInterface()) throw new IllegalArgumentException("类名" + interfaceClass.getName() + "必须是接口"); if (host == null || host.length() == 0) throw new IllegalArgumentException("目标主机不能为空"); if (port <= 0 || port > 65535) throw new IllegalArgumentException("端口错误:" + port); return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { MethodAndArgs mArgs=new MethodAndArgs(method.getName(), method.getParameterTypes(), args); final TcpClientHander tcpClientHander=new TcpClientHander(); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group); // b.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true); b.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("encoder", new ObjectEncoder()); pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); pipeline.addLast("handler",tcpClientHander); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().writeAndFlush(mArgs).sync(); f.channel().closeFuture().sync(); } catch (Exception e) { } finally { group.shutdownGracefully(); } return tcpClientHander.getResponse(); } }); } }
测试
接口
package cc.ymsoft.test; interface HelloService { String SayHello(String name); }
接口实现类
package cc.ymsoft.test; public class HelloImp implements HelloService { @Override public String SayHello(String name) { // TODO Auto-generated method stub return "你好:"+name; } }
客户端
package cc.ymsoft.test; import cc.ymsoft.Framework.RpcFramework; public class HelloInvoke { public static void main(String[] args) throws Exception { final HelloService helloService = RpcFramework.getObj(HelloService.class, "127.0.0.1", 1717); System.out.println(helloService.SayHello("XL")); } }
服务端
package cc.ymsoft.test; import cc.ymsoft.Framework.RpcFramework; public class HelloPro { public static void main(String[] args) throws Exception { HelloService hello=new HelloImp(); RpcFramework.regist(hello, 1717, "127.0.0.1"); } }
完整代码在github https://github.com/xulang/NettyRpc
相关推荐
基于Netty,ZooKeeper和Spring的RPC框架中文详情: 特征: 简单的代码和框架 ZooKeeper的服务注册表/发现支持 高可用性,负载平衡和故障转移 支持不同的负载均衡策略 支持异步/同步调用 支持不同版本的服务 支持...
NettyRPC, 在联网的,但另一个RPC框架基于 NettyRPC另外一个基于网联网的RPC框架。特性简单,小代码库,易于学习 API非常快速,高性能完全非阻塞异步调用,同步调用,单向调用。长期持久连接,自动重新连接到服务器...
RPC服务端采用线程池对RPC调用进行异步回调处理。 服务定义、实现,通过Spring容器进行加载、卸载。 消息网络传输除了JDK原生的对象序列化方式,还支持目前主流的编码解码器:kryo、hessian。 Netty网络模型采用主从...
EasyRpc EasyRpc是基于Netty,ZooKeeper和ProtoStuff开发的一个简单易用,便于学习的RPC框架。 1特性简单易用;注释完善,方便学习;低延迟,基于Netty 4;解决TCP粘包/拆包问题;支持非双向的同步/异步调用;基于...
基于Netty,ZooKeeper和Spring的RPC框架中文详情: 特征: 简单的代码和框架 无阻塞异步调用和同步调用支持 持久的持久连接 高可用性,负载平衡和故障转移 ZooKeeper对服务发现的支持 设计: 如何使用 定义一个...
Prostostuff:它基于Protobuf序列化框架,面向POJO,无需编写.proto文件。 Quartz时间调度 Redis Cluster集群高可用方案(未整合) RPC(远程过程调用) ZooKeeper(未实现):提供服务注册与发现功能,开发分布式系统的...
理论上并发数量接近服务器带宽,客户端采用thrift协议,服务端支持netty和thrift的TThreadedSelectorServer半同步半异步线程模型,支持动态扩容,服务上下线,权重动态,可用性配置,泛化调用,页面流量统计,泛化...
如果使用tcp / udp,则应自己发送心跳包,然后调用session.go:(Session)UpdateActive来更新其活动时间。 请通过session.go:(Session)GetActive在codec.go:(Codec)OnCron中检查tcp会话是否已超时。 无论...
20_通过Apache Thrift实现Java与Python的RPC调用 21_gRPC深入详解 22_gRPC实践 23_Gradle Wrapper在Gradle项目构建中的最佳实践 24_gRPC整合Gradle与代码生成 25_gRPC通信示例与JVM回调钩子 26_gRPC服务器流式调用...
第31讲:gRPC在Nodejs领域中的静态代码生成及与Java之间的RPC调用 第32讲:IO体系架构系统回顾与装饰模式的具体应用 第33讲:Java NIO深入详解与体系分析 第34讲:Buffer中各重要状态属性的含义与关系图解 第35...
MyRPC是一个可用于生产环境的轻量级,高可用,高性能,高易用分布式远程调用框架,参考dubbo的设计,是一个五脏俱全的简易版dubbo,支持同步调用,异步调用,服务自动注册,定时调度系统等。 1.架构 2.特性: 连通性 ...
surging 是一个分布式微服务框架,提供高性能RPC远程服务调用,采用Zookeeper作为surging服务的注册中心,集成了哈希,随机,轮询作为负载均衡的算法,RPC集成采用的是netty框架,采用异步传输
理论上并发数量接近服务器带宽,客户端采用thrift协议,服务端支持netty和thrift的TThreadedSelectorServer半同步半异步线程模型,支持动态扩容,服务上下线,权重动态,可用性配置,泛化调用,页面流量统计,泛化...
一个建立在 Netty 之上的高效 RPC 框架。 支持连接池和请求重试。 使用进行完全在您控制下的高效序列化 使用进行富有表现力的异步操作,这增加了从客户端到服务器的透明度。 #设置 将 microrpc 作为依赖项添加...
服务端支持netty和thrift的TThreadedSelectorServer半同步半异步线程模型,支持动态扩容,服务上下线,权重动态,可用性配置,页面流量统计等,QPS统计,TP90,TP99,TP95等丰富可视化数据,持续为个人以及中小型公司...
Dubbo是一款基于Netty的高性能、轻量级的RPC框架,其主要功能包括:面向接口的远程方法调用、智能容错和负载均衡、以及服务自动注册和发现、依赖分析与降级。支持dubbo、rmi、hessian、http、webservice、thrift、...
Netty引用计数的实现机制与自旋锁的使用技巧 82_Netty引用计数原子更新揭秘与AtomicIntegerFieldUpdater深度剖析 83_AtomicIntegerFieldUpdater实例演练与volatile关键字分析 84_Netty引用计数注意事项与内存泄露...
基于Netty,ZooKeeper和Spring的RPC框架中文详情: 特征: 简单的代码和框架 无阻塞异步调用和同步调用支持 持久的持久连接 高可用性,负载平衡和故障转移 ZooKeeper对服务发现的支持 设计: 如何使用 定义一个...
本人学习Netty后决定自己写1个基于Netty、Zookeeper、Spring的轻量级RPC框架,收获颇丰,不过本人才疏学浅,难免有所疏漏,若有批评和建议请发到邮箱 Features 支持长连接 支持异步调用 支持心跳检测 支持JSON序列化...
HRPC HRPC是一款基于Netty和Zookeeper设计的轻量级高性能RPC框架。特性采用Protostuff序列化;高性能,负载均衡;支持服务的注册和订阅;支持同步及异步2种调用方式;长连接,自动重连;采用cglib动态代理;代码...