当前位置:首页 » 民生新闻 » 正文

分类页和文章页“当前位置”下方广告(PC版)
分类页和文章页“当前位置”下方广告(移动版)

手机维修,谈谈怎么运用Netty开发完成高性能的RPC服务器,十字架

137 人参与  2019年05月06日 17:05  分类:民生新闻  评论:0  
  移步手机端

1、打开你手机的二维码扫描APP
2、扫描左则的二维码
3、点击扫描获得的网址
4、可以在手机端阅读此文章

RPC(Remote Procedure Call Protocol)长途进程调用协议,它是一种经过网络,从长途核算机程序上恳求服务,而不必了解底层网络技能的协议。说的再直白一点,便是客户端在不必知道调用细节的条件之下,调用长途核算机上运转的某个目标,运用起来就像调用本地的目标相同。现在典型的RPC完结结构有:Thrift(facebook开源)、Dubbo(alibaba开源)等等。RPC结构针对网络协议、网络I/O模型的封装是通明的,关于调用的客户端而言,它就以为自己在调用本地的一个目标。至于传输层上,运用的是TCP协议、UDP协议、亦或是HTTP协议,一概不关怀。从网络I/O模型上来看,是依据select、poll、epoll办法、仍是IOCP(I/O Completion Port)办法承载完结的,关于调用者而言也不必关怀。

现在,干流的RPC结构都支撑跨言语调用,即有所谓的IDL(接口界说言语),其实,这个并不是RPC全部必要要求的。假如你的RPC结构没有跨言语的要求,IDL就能够不必包括了。

最终,值得一提的是,衡量一个RPC结构功能的好坏与否,RPC的网络I/O模型的挑选,至关重要。在此基础上,规划出来的RPC服务器,能够考虑支撑堵塞式同步IO、非堵塞式同步IO、当然还有所谓的多路复用IO模型、异步IO模型。支撑不同的网络IO模型,在高并发的状态下,处理功能上会有很大的不同。还有一个衡量的规范,便是挑选的传输协议。是依据TCP协议、仍是HTTP协议、仍是UDP协议?对功能也有必定的影响。可是从我现在了解的状况来看,大多数RPC开源完结结构都是依据TCP、或许HTTP的,目测没有选用UDP协议做为首要的传输协议的。

理解了RPC的运用原理和功能要求。现在,咱们能不能放下那些RPC开源结构,自己着手开发一个高功能的RPC服务器呢?我想,仍是能够的。现在自己就运用Java,依据Netty,开发完结一个高功能的RPC服务器。

怎样完结、依据什么原理?并发处理功能怎样?请持续接着看下文。

咱们有的时分,为了进步单个节点的通讯吞吐量,进步通讯功能。假如是依据Java后端的,一般首选的是NIO结构(No-block IO)。可是问题也来了,Java的NIO把握起来要适当的技能功底,和满足的技能堆集,运用起来才干称心如意。一般的开发人员,假如要运用NIO开发一个后端的TCP/HTTP服务器,顺便考虑TCP粘包、网络通讯反常、音讯链接处理等等网络通讯细节,开发门槛太高,所以比较正确的挑选是,选用业界干流的NIO结构进行服务器后端开发。干流的NIO结构首要有Netty、Mina。它们首要都是依据TCP通讯,非堵塞的IO、灵敏的IO线程池而规划的,应对高并发恳求也是捉襟见肘。跟着Netty、Mina这样优异的NIO结构,规划上日趋完善,Java后端高功能服务器开发,在技能上供给了有力的支撑保证,然后打破了C++在服务器后端,一致天下的局势。因为在此之前,Java的NIO一向受人诟病,让人敬而远之!

已然,这个RPC服务器是依据Netty的,那就在说说Netty吧。实践上Netty是对JAVA NIO结构的再次封装,它的开源网址是http://netty.io/,本文中运用的Netty版别是:4.0版别,能够经过http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2,进行下载运用。那或许你会问,怎样运用Netty进行RPC服务器的开发呢?实践不难,下面我手机修理,谈谈怎样运用Netty开发完结高功能的RPC服务器,十字架就简略的阐明一下技能原理:

1、界说RPC恳求音讯、应对音讯结构,里边要包括RPC的接口界说模块、包括长途调用的类名、办法称号、参数结构、参数值等信息。

2、服务端初始化的时分经过容器加载RPC接口界说和RPC接口完结类目标的映射联系,然后等候客户端建议调用恳求。

3、客户端建议的RPC音讯里边包括,长途调用的类名、办法称号、参数结构、参数值等信息,经过网络,以字节省的办法送给RPC服务端,RPC服务端接纳到字节省的恳求之后,去对应的容器里边,查找客户端接口映射的详细完结目标。

4、RPC服务端找到完结目标的参数信息,经过反射机制创立该目标的实例,并回来调用处理效果,最终封装成RPC应对音讯告诉到客户端。

5、客户端经过网络,收到字节省办法的RPC应对音讯,进行拆包、解析之后,显现长途调用效果。

上面说的是很简略,可是完结的时分,咱们还要考虑如下的问题:

1、RPC服务器的传输层是依据TCP协议的,呈现粘包咋办?这样客户端的恳求,服务端不是会解析失利?好在Netty里边现已供给了处理TCP粘包问题的解码器:LengthFieldBasedFrameDecoder,能够靠它轻松搞定TCP粘包问题。

2、Netty服小冰冰传奇务端的线程模型是单线程、多线程(一个线程担任客户端衔接,衔接成功之后,丢给后端IO的线程池处理)、仍是主从形式(客户端衔接、后端IO处理都是依据线程池的完结)。当然在这儿,我出于功能考虑,运用了Netty主从线程池模型。

3、Netty的IO处理线程池,假如遇到十分耗时的事务,呈现堵塞了咋办?这样不是很简略把后端的NIO线程给挂死、堵塞?本文的处理办法是,关于杂乱的后端事务,分派到专门的事务线程池里边,进行异步回调处理。

4、RPC音讯的传输是经过字节省在NIO的通道(Channel)之间传输,那详细怎样完结呢?本文,是经过依据Java原生目标序列化机制的编码、解码器(ObjectEncoder、ObjectDecoder)进行完结的。当然出于功能考虑,这个或许不是最优的计划。更优的计划是把音讯的编码、解码器,搞成能够装备完结的。详细比方能够经过:protobuf、JBoss Marshalling办法进行解码和编码,以进步网络音讯的传输功率。

5、RPC服务器要考虑多线程、高并发的运用场景,所以线程安满是有必要的。此外尽量不要运用synchronized进行加锁,改用轻量级的ReentrantLock办法进行代码块的条件加锁。比方本文中的RPC音讯处理回调,就有这方面的运用。

6、RPC服务端的服务接口目标和服务接口完结目标要能容易的装备,轻松进行加载、卸载。在这儿,本文是经过Spring容器进行一致的目标办理。

综上所述,本文规划的RPC服务器调用的流程图如下所示:

客户端并发建议RPC调用恳求,然后RPC服务端运用Netty衔接器,分派出N个NIO衔接线程,这个时分Netty衔接器的使命结束。然后NIO衔接线程是一致放到Netty NIO处理线程池进行办理,这个线程池里边会对详细的RPC恳求衔接进行音讯编码、音讯解码、音讯处理等等一系列操作。最终进行音讯处理(Handler)的时分,处于功能考虑,这儿的规划是,直接把杂乱的音讯处理进程,丢给专门的RPC事务处理线程池会集处理,然后Handler对应的NIO线程就当即回来、不会堵塞。这个时分RPC调用结束,客户端会异步等候服务端音讯的处理效果,本文是经过音讯回调机制完结(MessageCallBack)。

再来说一说Netty关于RPC音讯的解码、编码、处理对应的模块和流程,详细如下图所示:

从上图能够看出客户端、服务端对RPC音讯编码、解码、处理调用的模块以及调用次序了。Netty便是把这样一个一个的处理器串在一起,构成一个职责链,一致进行调用。

说了这么多,现在先简略看下,我规划完结的NettyRPC的代码目录层级结构:

其间newlandframework.netty.rpc.core包是NettyRPC的中心完结。newlandframework.netty.rpc.model包里边,则封装了RPC音讯恳求、应对报文结构,以及RPC服务接口与完结绑定联系的容器界说。newlandframework.netty.rpc.config里边界说了NettyRPC的服务端文件装备特点。

下面先来看下newlandframework.netty.rpc.model包中界说的内容。详细是RPC音讯恳求、应对音讯的结构界说:

RPC恳求音讯结构

/**

* @filename:MessageRequest.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:rpc服务恳求结构

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.model;

import java.io.Serializable;

import org.apache.commons.lang.builder.ToStringBuilder;

import org.apache.commons.lang.builder.ToStringStyle;

public class MessageRequest implements Serializable {

private String messageId;

private String className;

private String methodName;

private Class

private Object[] parametersVal;

public String getMessageId() {

return messageId;

}

public void setMessageId(String messageId) {

this.messageId = messageId;

}

public String getClassName() {

return className;

}

public void setClassName(String className) {

this.className = className;

}

public String getMethodName() {

return methodName;

}

public void setMethodName(String methodName) {

this.methodName = methodName;

}

public Class

return typeParameters;

}

public void setTypeParameters(Class

this.typeParameters = typeParameters;

}

public Object[] getParameters() {

return parametersVal;

}

public void setParameters(Object[] parametersVal) {

this.parametersVal = parametersVal;

}

public String toString() {

return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)

.append("messageId", messageId).append("className", className)

.append("methodName", methodName).toString();

}

}

 RPC应对音讯结构

/**

* @filename:MessageResponse.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:rpc服务应对结构

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.model;

import java.io.Serializable;

import org.apache.commons.lang.builder.ToStringBuilder;

import org.apache.commons.lang.builder.ToStringStyle;

public class MessageResponse implements Serializable {

private String messageId;

private String error;

private Object re朴丽萝sultDesc;

public String getMessageId() {

return messageId;

}

public void setMessageId(String messageId) {

this.messageId = messageId;

}

public String getError() {

return error;

}

public void setError(String error) {

this.error = error;

}

public Object getResult() {

return resultDesc;

}

public void setResult(Object resultDesc) {

this.resultDesc = resultDesc;

}

public String toString() {

return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)

.append("messageId", messageId).append("error", error).toString();

}

}

RPC服务接口界说、服务接口完结绑定联系容器界说,供给给spring作为容器运用。

/**

* @filename:MessageKeyVal.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:rpc服务映射容器

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.model;

import java.util.Map;

public class MessageKeyVal {

private Map messageKeyVal;

public void setMessageKeyVal(Map messageKeyVal) {

this.messageKeyVal = messageKeyVal;

}

public Map getMessageKeyVal() {

return messageKeyVal;

}

}

好了,界说好中心模型结构之后,现在再向咱们展现一下NettyRPC中心包:newlandframework.netty.rpc.core的要害部分完结代码,首先是事务线程池相关类的完结代码,详细如下:

线程工厂界说完结

/**

* @filename:NamedThreadFactory.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:线程工厂

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import java.util.concurrent.ThreadFactory;

import java.util.concurrent.atomic.AtomicInteger;

public class NamedThreadFactory implements ThreadFactory {

private static final AtomicInteger threadNumber = new AtomicInteger(1);

priva手机修理,谈谈怎样运用Netty开发完结高功能的RPC服务器,十字架te final AtomicInteger mThreadNum = new AtomicInteger(1);

private final String prefix;

private final boolean daemoThread;

private final ThreadGroup threadGroup;

public NamedThreadFactory() {

this("rpcserver-threadpool-" + threadNumber.getAndIncrement(), false);

}

public NamedThreadFactory(String prefix) {

this(prefix, false);

}

public NamedThreadFactory(String prefix, boolean daemo) {

this.prefix = prefix + "-threaboyfriendtvd-";

daemoThread = daemo;

SecurityManager s = System.getSecurityManager();

threadGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();

}

public Thread newThread(Runnable runnable) {

String name = prefix + mThreadNum.getAndIncrement();

Thread ret = new Thread(threadGroup, runnable, name, 0);

ret.setDaemon(daemoThread);

return ret;

}

public ThreadGroup getThreadGroup() {

return threadGroup;

}

}

 事务线程池界说完结

/**

* @filename:RpcThreadPool.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:rpc线程池封装

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import java.util.concurrent.Executor;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.SynchronousQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

public class RpcThreadPool {

//独立出线程池首要是为了应对杂乱耗I/O操作的事务,不堵塞netty的handler线程而引进

//当然假如事务满足简略,把处理逻辑写入netty的handler(ChannelInboundHandlerAdapter)也未尝不可

public static Executor getExecutor(int threads, int queues) {

String name = "RpcThreadPool";

return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,

queues == 0 ? new SynchronousQueue()

: (queues < 0 ? new LinkedBlockingQueue()

: new LinkedBlockingQueue(queues)),

new NamedThreadFactory(name, true), new AbortPolicyWithReport(毕玉玺抖音name));

}

}

/**

* @filename:AbortPolicyWithReport.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:线程池反常战略

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import java.util.concurrent.RejectedExecutionException;

import java.util.concurrent.ThreadPoolExecutor;

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

private final String threadName;

public AbortPolicyWithReport(String threadName) {

this.threadName = threadName;

}

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

String msg = String.format("RpcServer["

+ " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d),"

+ " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)]",

threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),

e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());

System.out.println(msg);

throw new RejectedExecutionException(msg);

}

}

RPC调用客户端界说完结

/**

* @filename:MessageSendExecutor.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:Rpc客户端履行模块

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import java.lang.reflect.Proxy;

public class MessageSendExecutor {

private RpcServerLoader loader = RpcServerLoader.getInstance();

public MessageSendExecutor(String serverAddress) {

loader.load(serverAddress);

}

public void stop() {

loader.unLoad();

}

public static T execute(Class rpcInterface) {

return (T) Proxy.newProxyInstance(

rpcInterface.getClassLoader(),

new Class

new MessageSendProxy(rpcInterface)

);

}

}

这儿的RPC客户端实践上,是动态署理了MessageSendProxy,当然这儿是使用了,JDK原生的动态署理完结,你还能够改成CGLIB(Code Generation Library)办法。不过自己测试了一下CGLIB办法,在高并发的状况下面会呈现空指针反常,可是相同的状况,JDK原生的动态署理却没有问题。并发程度不高的状况下面,两种署理办法都运转正常。后续再深化研究看看吧!废话不说了,现在给出MessageSendProxy的完结办法

/**

* @filename:MessageSendProxy.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:Rpc客户端音讯处理

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import java.lang.reflect.InvocationHandler;

import java.lang.reflect.Method;

import java.util.UUID;

import newlandframework.netty.rpc.model.MessageRequest;

public class MessageSendProxy implements InvocationHandler {

private Class cls;

public MessageSendProxy(Class cls) {

this.cls = cls;

}

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

MessageRequest request = new MessageRequest();

request.setMessageId(UUID.randomUUID().toString());

request.setClassName(method.getDeclaringClass().getName());

request.setMethodName(method.getName());

request.setTypeParameters(method.getParameterTypes());

request.setParameters(args);

MessageSendHandler handler = RpcServerLoader.getInstance().getMessageSendHandler();

MessageCallBack callBack = handler.sendRequest(request);

return callBack.start();

}

}

 进一步发现MessageSendProxy其实是把音讯发送给RpcServerLoader模块,它的代码如下:

/**

* @filename:RpcServerLoader.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:rpc服务器装备加载

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import java.net.InetSocketAddress;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;

public class RpcServerLoader {

private volatile static RpcServerLoader rpcServerLoader;

private final static String DELIMITER = ":";

private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE;

//办法回来到Java虚拟机的可用的处理器数量

private final static int parallel = Runtime.getRuntime().availableProcessors() * 2;

//netty nio线程池

private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(parallel);

private static ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1);

private MessageSendHandler messageSendHandler = null;

//等候Netty服务端链路树立告诉信号

private Lock lock = new ReentrantLock();

private Condition signal = lock.newCondition();

private RpcServerLoader() {

}

//并发两层确定

public static RpcServerLoader getInstance() {

if (rpcServerLoader == null) {

synchronized (RpcServerLoader.class) {

if (rpcServerLoader == null) {

rpcServerLoader = new RpcServerLoader();

}

}

}

return rpcServerLoader;

}

public void load(String serverAddress, RpcSerializeProtocol serializeProtocol) {

String[] ipAddr = serverAddress.split(RpcServerLoader.DELIMITER);

if (ipAddr.length == 2) {

Stri手机修理,谈谈怎样运用Netty开发完结高功能的RPC服务器,十字架ng host = ipAddr[0];

int port = Integer.parseInt(ipAddr[1]);

final InetSocketAddress remoteAddr = new InetSocketAddress(host, port);

threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, this, serializeProtocol));

}

}

public void setMessageSendHandler(MessageSendHandler messageInHandler) {

try {

lock.lock();

this.messageSendHandler = messageInHandler;

//唤醒全部等候客户端RPC线程

signal.signalAll();

} finally {

lock.unlock();

}

}

public MessageSendHandler getMessageSendHandler() throws InterruptedException {

try {

lock.lock();

//Netty服务端链路没有树立结束之前,先挂起等候

if (messageSendHandler == null) {

signal.await();

}

return messageSendHandler;

} finally {

lock.unlock();

}

}

public void unLoad() {

messageSendHandler.close();

threadPoolExecutor.shutdown();

eventLoopGroup.shutdownGracefully();

}

public void setSerializeProtocol(RpcSerializeProtocol serializeProtocol) {

this.serializeProtocol = serializeProtocol;

}

}

好了,现在一次性给出RPC客户端音讯编码、解码、处理的模块完结代码。

/**

* @filename:MessageSendInitializeTask.java

*

* Newland Co. Ltd. All rights reserved.

*

* @De手机修理,谈谈怎样运用Netty开发完结高功能的RPC服务器,十字架scription:Rpc客户端线程使命处理

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

public class MessageSendInitializeTask implements Runnable {

private EventLoopGroup eventLoopGroup = null;

private InetSocketAddress serverAddress = null;

private RpcServerLoader loader = null;

MessageSendInitializeTask(EventLoopGroup eventLoopGroup, InetSocketAddress serverAddress, RpcServerLoader loader) {

this.eventLoopGroup = eventLoopGroup;

this.serverAddress = serverAddress;

this.loader = loader;

}

public void run() {

Bootstrap b = new Bootstrap();

b.group(eventLoopGroup)

.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);

b.handler(new MessageSendChannelInitializer());

ChannelFuture channelFuture = b.connect(serverAddress);

channelFuture.addListener(new ChannelFutureListener() {

public void operationComplete(fi年光光阴nal ChannelFuture channelFuture) throws Exception {

if (channelFuture.isSuccess()) {

MessageSendHandler hand手机修理,谈谈怎样运用Netty开发完结高功能的RPC服务器,十字架ler = channelFuture.channel().pipeline().get(MessageSendHandler.class);

MessageSendInitializeTask.this.loader.setMessageSendHandler(handler);

}

}

});

}

}

/**

* @filename:MessageSendChannelInitializer.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:Rpc客户端管道初始化

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.handler.codec.LengthFieldPrepender;

impor三点水加元t io.netty.handler.codec.serialization.ClassResolvers;

import io.netty.handler.codec.serialization.ObjectDecoder;

import io.netty.handler.codec.serialization.ObjectEncoder;

public class MessageSendChannelInitializer extends ChannelInitializer {

//ObjectDecoder 底层默许承继半包解码器LengthFieldBasedFrameDecoder处理粘包问题的时分,

//音讯头开端即为长度字段,占有4个字节。这儿出于坚持兼容的考虑

final public static int MESSAGE_LENGTH = 4;

protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline pipeline = socketChannel.pipeline();

//ObjectDecoder的基类半包解码器LengthFieldBasedFrame军区大院Decoder的报文格式坚持兼容。因为底层的父类LengthFieldBasedFrameDecoder

//的初始化参数即为super(maxObjectSize, 0, 4, 0, 4);

pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageSendChannelInitializer.MESSAGE_LENGTH, 0, MessageSendChannelInitializer.MESSAGE_LENGTH));

//运用LengthFieldPrepender回填弥补ObjectDecoder音讯报文头

pipeline.addLast(new LengthFieldPrepender(MessageSendChannelInitializer.MESSAGE_LENGTH));

pipeline.addLast(new ObjectEncoder());

//考虑到并发功能,选用weakCachingConcurrentResolver缓存战略。一般状况运用:cacheDisabled即可

pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));

pipeline.addLast(new MessageSendHandler());

}

}

/**

* @filename:MessageSendHandler.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:Rpc客户端处理模块

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import io.netty.buffer.Unpooled;

import io.netty.channel.Channel;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import io.netty.chunity3dannel.ChannelInboun焦点访谈曝光徐鹤宁dHandlerAdapter;

import java.net.SocketAddress;

import java.util.concurrent.ConcurrentHashMap;

import newlandframework.netty.rpc.mode钊l.MessageRequest;

import newlandframework.netty.rpc.model.MessageResponse;

public class MessageSendHandler extends ChannelInboundHandlerAdapter {

private ConcurrentHashMap mapCallBack = new ConcurrentHashMap();

private volatile Channel channel;

private SocketAddress remoteAddr;

public Channel getChannel() {

return channel;

}

public SocketAddress getRemoteAddr() {

return remoteAddr;

}

public void channelActive(ChannelHandlerContext ctx) throws Exception {

super.channelActive(ctx);

this.remoteAddr = this.channel.remoteAddress();

}

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

super.channelRegistered(ctx);

this.channel = ctx.channel();

}

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

MessageResponse response = (MessageResponse) msg;

String messageId = response.getMessageId();

MessageCallBack callBack = mapCallBack.get(messageId);

if (callBack != null) {

mapCallBack.remove(messageId);

callBack.over(response);

}

}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

public void close() {

channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);

}

public MessageCallBack sendRequest(MessageRequest request) {

MessageCallBack callBack = new MessageCallBack(蛤蛤蛤request);

mapCallBack.put(request.getMessageId(), callBack);

channel.writeAndFlush(request);

return callBack;

}

}

最终给出RPC服务端的完结。首先是经过spring主动加载RPC服务接口、接口完结容器绑定加载,初始化Netty主/从线程池等操作,详细是经过MessageRecvExecutor模块完结的,现在给出完结代码:

/**

* @filename:MessageRecvExecutor.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:Rpc服务器履行模块

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.nio.channels.spi.SelectorProvider;

import java.util.Iterator;

import java.util.Map;

import java.util.Set;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ThreadFactory;

import java.util.concurrent.什么样的山和海能够移动ThreadPoolExecutor;

import java.util.logging.Level;

import newlandframework.netty.rpc.model.MessageKeyVal;

import org.springframework.beans.BeansException;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.context.ApplicationContext;

import org.springframework.context.ApplicationContextAware;

public class MessageRecvExecutor implement大前门s ApplicationContextAware, InitializingBean {

private String serverAddress;

private final static String DELIMITER = ":";

private Map handlerMap = new ConcurrentHashMap();

private static ThreadPoolExecutor threadPoolExecutor;

public MessageRecvExecutor(String serverAddress) {

this.serverAddress = serverAddress;

}

public static void submit(Runnable task) {

if (threadPoolExecutor == null) {

synchronized (MessageRecvExecutor.class) {

if (threadPoolExecutor == null) {

threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1);

}

}

}

threadPoolExecutor.submit(task);

}

public void setApplicationContext(ApplicationContext ctx) throws BeansException {

try {

MessageKeyVal keyVal = (MessageKeyVal) ctx.getBean(Class.forName("newlandframework.netty.rpc.model.MessageKeyVal"));

Map rpcServiceObject = keyVal.getMessageKeyVal();

Set s = rpcServiceObject.entrySet();

Iterator> it = s.iterator();

Map.Entry entry;

while (it.hasNext()) {

entry = it.next();

handl武媚娘传奇erMap.put(entry.getKey(), entry.getValue());

}

} catch (ClassNotFoundException ex) {

java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE, null, ex);

}

}

public void afterPropertiesSet() throws Exception {

//netty的线程池模型设置成主从线程池形式,这样能够应对高并发恳求

//当然netty还支撑单线程、多线程网络IO模型,能够依据事务需求灵敏装备

ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory");

//办法回来到Java虚拟机的可用的处理器数量

int parallel = Runtime.getRuntime().availableProcessors() * 2;

EventLoopGroup boss = new NioEventLoopGroup();

EventLoopGroup worker = new NioEventLoopGroup(parallel,threadRpcFactory,SelectorProvider.provider());

try {

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)

.childHandler(new MessageRecvChannelInitializer(handlerMap))

.option(ChannelOption.SO_BACKLOG, 128)

.childOption(ChannelOption.SO_KEEPALIVE, true);

String[] ipAddr = serverAddress.split(MessageRecvExecutor.DELIMITER);

if (ipAddr.length == 2) {

String host = ipAddr[0];

int port = Integer.parseInt(ipAddr[1]);

ChannelFuture future = bootstrap.bind(host, port).sync();

System.out.printf("[author tangjie] Netty RPC Server start success ip:%s port:%d\n", host, port);

future.channel().closeFuture().sync();

} else {

System.out.printf("[author tangjie] Netty RPC Server start fail!\n");

}

} finally {

worker.shutdownGracefully();

boss.shutdownGracefully();

}

}

}

最终仍是老规矩,给出RPC服务端音讯编码、解码、处理的中心模块代码完结,详细如下:

/**

* @filename:MessageRecvChannelInitializer.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:Rpc服务端管道初始化

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.handler.codec.LengthFieldPrepender;

import io.netty.handler.codec.serialization.ClassResolvers;

import io.netty.handler.codec.serialization.ObjectDecoder;

import io.netty.handler.codec.serialization.ObjectEncoder;

import java.util.Map;

public class MessageRecvChannelInitializer extends ChannelInitializer {

//ObjectDecoder 底层默许承继半包解码器LengthFieldBasedFrameDecoder处理粘包问题的时分,

//音讯头开端即为长度字段,占有4个字节。这儿出于坚持兼容的考虑

final public static int MESSAGE_LENGTH = 4;

private Map handlerMap = null;

MessageRecvChannelInitializer(Map handlerMap) {

this.handlerMap = handlerMap;

}

protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline pipeline = socketChannel.pipeline();

//ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式坚持兼容。因为底层的父类LengthFieldBasedFrameDecoder

//的初始化参数即为super(maxObjectSize, 0, 4, 0, 4);

pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH));

//运用LengthFieldPrepender回填弥补ObjectDecoder音讯报文头

pipeline.addLast(new LengthFieldPrepender(MessageRecvChannelInitializer.MESSAGE_LENGTH));

pipeline.addLast(new ObjectEncoder());

//考虑到并发功能,选用weakCachingConcurrentResolver缓存战略。一般状况运用:cacheDisabled即可

pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));

pipeline.addLast(new MessageRecvHandler(handlerMap));

}

}

/**

* @filename:MessageRecvHandler.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:Rpc服务器音讯处理

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Map;

import newlandframework.netty.rpc.model.MessageRequest;

import newlandframework.netty.rpc.model.MessageResponse;

public class MessageRecvHandler extends ChannelInboundHandlerAdapter {

private final Map handlerMap;

public MessageRecvHandler(Map handlerMap) {

this.handlerMap = handlerMap;

}

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

MessageRequest request = (MessageRequest) msg;

MessageResponse response = new MessageResponse();

MessageRecvInitializeTask recvTask = new MessageRecvInitializeTask(request, response, handlerMap, ctx);

//不要堵塞nio线程,杂乱的事务逻辑丢给专门的线程池

MessageRecvExecutor.submit(recvTask);

}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

//网络有反常要封闭通道

ctx.close();

}

}

/**

* @filename:MessageRecvInitializeTask.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Descrip正月十三tion:Rpc服务器音讯线程使命处理

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import java.util.Map;

import newlandframework.netty.rpc.model.MessageRequest;

import newlandframework.netty.rpc.model.MessageResponse;

import org.apache.commons.beanutils.MethodUtils;

public class MessageRecvInitializeTask implements Runnable {

private MessageRequest request = null;

private MessageResponse response = null;

private Map handlerMap = null;

private ChannelHandlerContext ctx = null;

public MessageResponse getResponse() {

return response;

}

public MessageRequest getRequest() {

return request;

}

public void setRequest(MessageRequest request) {

this.request = request;

}

MessageRecvInitializeTask(MessageRequest request, MessageResponse response, Map哥伦布 handlerMap, ChannelHandlerContext ctx) {

this.request = request;

this.response = response;

this.handlerMap = handlerMap;

this.ctx = ctx;

}

public void run() {

response.setMessageId(request.getM古剑奇谭2essageId());

try {

Object result = reflect(request);

response.setResult(result);

} catch (Throwable t) {

response.setError(t.toString());

t.printStackTrace();

System.err.printf("RPC Server invoke error!\n");

}

ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {

public void operationComplete(ChannelFuture channelFuture) throws Exception {

System.out.println("RPC Server Send message-id respone:" + request.getMessageId());

}

});

}

private Object reflect(MessageRequest request) throws Throwable {

String className = request.getClassName();

Object serviceBean = handlerMap.get(className);

String methodName = request.getMethodName();

Object[] parameters = request.getParameters();

return MethodUtils.invokeMethod(serviceBean, methodName, parameters);

}

}

然后是RPC音讯处理的回调完结模块代码

/**

* @filename:MessageCallBack.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:Rpc音讯回调

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.core;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

import newlandframework.netty.rpc.model.MessageRequest;

import newlandframework.netty.rpc.model.MessageResponse;

public class MessageCallBack {

private MessageRequest request;

private MessageResponse response;

private Lock lock = new ReentrantLock();

private Condition finish = lock.newCondition();

public MessageCallBack(MessageRequest request) {

this.request = request;

}

public Object start() throws InterruptedException {

try {

lock.lock();

//设定一下超时时间,rpc服务器太久没有相应的话,就默许回来空吧。

finish.await(10*1000, TimeUnit.MILLISECONDS);

if (this.response != null) {

return this.response.getResult();

} else {

return null;

}

} finally {

lock.unlock();

}

}

public void over(MessageResponse reponse) {

try {

lock.lock();

finish.signal();

this.response = reponse;

} finally {

lock.unlock();

}

}

}

到此为止,NettyRPC的要害部分:服务端、客户端的模块现现已过Netty悉数完结了。现在给出spring加载装备rpc-invoke-config.xml的内容:

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/conte手机修理,谈谈怎样运用Netty开发完结高功能的RPC服务器,十字架xt

http://www.springframework.org/schema/context/spring-context.xsd">

再贴出RPC服务绑定ip信息的装备文件:rpc-server.properties的内容。

#rpc server's ip address config
rpc.server.addr=127.0.0.1:18888

最终NettyRPC服务端发动办法参阅如下:

new ClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config.xml");

假如全部顺利,没有呈现意外的话,控制台上面,会呈现如下截图所示的状况:

假如呈现了,阐明NettyRPC服务器,现已发动成功!

上面依据Netty的RPC服务器,并发处理功能怎样呢?实践是检验真理的唯一规范,下面咱们就来实战一下。

下面的测试事例,是依据RPC长途调用两数相加函数,并回来核算效果。客户端一起开1W个线程,同一时间,瞬时建议并发核算恳求,然后调查Netty的RPC服务器是否有正常应对回复呼应,以及客户端是否有正常回来调用核算效果。值得注意的是,测试事例是依据1W个线程瞬时并发恳求而规划的,并不是1W个线程循环建议恳求。这两者关于衡量RPC服务器的并发处理功能,仍是有很大不同的。当然,前者关于并发功能的处理要求,要高上许多许多。

现在,先给出RPC核算接口、RPC核算接口完结类的代码完结:

/**

* @filename:Calculate.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:核算器界说接口

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.servicebean;

public interface Calculate {

//两数相加

int add(int a, int b);

}

/**

* @filename:CalculateImpl.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:核算器界说接口完结

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.servicebean;

public class CalculateImpl implements Calculate {

//两数相加

public int add(int a, int b) {

return a + b;

}

}

下面是瞬时并发RPC恳求的测试样例:

/**

* @filename:CalcParallelRequestThread.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:并发线程模仿

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.servicebean;

import newlandframework.netty.rpc.core.MessageSendExecutor;

import java.util.concurrent.CountDownLatch;

import java.util.logging.Level;

import java.util.logging.Logger;

public class CalcParallelRequestThread implements Runnable {

private CountDownLatch signal;

private CountDownLatch finish;

private MessageSendExecutor executor;

private int taskNumber = 0;

p恒源不夜城ublic CalcParallelRequestThread(MessageSendExecutor executor, CountDownLatch signal, CountDownLatch finish, int taskNumber) {

this.signal = signal;

this.finish = finish;

this.taskNumber = taskNumber;

this.executor = executor;

}

public void run() {

try {

signal.await();

Calculate calc = executor.execute(Calculate.class);

int add = calc.add(taskNumber, taskNumber);

System.out.println("calc add result:[" + add + "]");

finish.countDown();

} catch (InterruptedException ex) {

Logger.getLogger(CalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex);

}

恒山}

}

/**

* @filename:RpcParallelTest.java

*

* Newland Co. Ltd. All rights reserved.

*

* @Description:rpc并发测试代码

* @author tangjie

* @version 1.0

*

*/

package newlandframework.netty.rpc.servicebean;

import java.util.concurrent.CountDownLatch;

import newlandframework.netty.rpc.core.MessageSendExecutor;

import org.apache.commons.lang.time.StopWatch;

public class RpcParallelTest {

public static void main(String[] args) throws Exception {

final MessageSendExecutor executor = new MessageSendExecutor("127.0.0.1:18888");

//并行度10000

int parallel = 10000;

//开端计时

StopWatch sw = new StopWatch();

sw.start();

CountDownLatch signal = new CountDownLatch(1);

CountDown手机修理,谈谈怎样运用Netty开发完结高功能的RPC服务器,十字架Latch finish = new CountDownLatch(parallel);

for (int index = 0; index < parallel; index++) {

CalcParallelRequestThread client = new CalcParallelRequestThread(executor, signal, finish, index);

new Thread(client).start();

}

//10000个并发线程瞬间建议恳求操作

signal.countDown();

finish.await();

sw.stop();

String tip = String.format("RPC调用一共耗时: [%s] 毫秒", sw.getTime());

System.out.println(tip);

executor.stop();

}

}

好了,现在先发动NettyRPC服务器,承认没有问题之后,运转并发RPC恳求客户端,看下客户端打印的核算效果,以及处理耗时。

从上面来看,10000个瞬时RPC核算恳求,一共耗时挨近11秒。咱们在来看下NettyRPC的服务端运转状况,如下所示:

能够很清楚地看到,RPC服务端都有收到客户端建议的RPC核算恳求,并回来音讯应对。

最终咱们仍是要别离验证一下,RPC服务端是否存在丢包、粘包、IO堵塞的状况?1W个并发核算恳求,是否成功接纳处理并应对了?实践状况阐明全部,看下图所示:

十分给力,RPC的服务端的确成功接纳到了客户端建议的1W笔瞬时并发核算恳求,并且成功应对处理了。并没有呈现:丢包、粘包、IO堵塞的状况。再看下RPC客户端,是否成功得到核算效果的应对回来了呢?

很好,RPC的客户端,的确收到了RPC服务端核算的1W笔加法恳求的核算效果,并且耗时挨近11秒。由此可见,依据Netty+事务线程池的NettyRPC服务器,应对并发多线程RPC恳求,处理起来是称心如意,挥洒自如!

最终,本文经过Netty这个NIO结构,完结了一个很简略的“高功能”的RPC服务器,代码尽管写出来了,可是仍是有一些值得改善的当地,比方:

1、目标序列化传输能够支撑现在干流的序列化结构:protobuf、JBoss Marshalling、Avro等等。

2、Netty的线程模型能够依据事务需求,进行定制。因为,并不是每笔事务都需求这么强壮的并发处理功能。

3、现在RPC核算只支撑一个RPC服务接口映射绑定一个对应的完结,后续要支撑一对多的状况。

4、事务线程池的发动参数、线程池并发堵塞容器模型等等,能够装备化办理。

5、Netty的Handler处理部分,关于杂乱的事务逻辑,现在是一致分派到特定的线程池进行后台异步处理。当然你还能够考虑JMS(音讯行列)办法进行解耦,一致分派给音讯行列的订阅者,一致处理。现在完结JMS的开源结构也有许多,ActiveMQ、RocketMQ等等,都能够考虑。

本文完结的NettyRPC,关于面前的您而言,必定还有许多当地,能够加以完善和改善,优化改善的作业就交给您自由发挥了。

因为自己技能才能、认知水平有限。本文中有说不对的当地,恳请园友们批评指正!不吝赐教!最终,感谢面前的您,耐性的阅读完本文,信任现在的你,关于Java开发高功能的服务端使用,又有了一个更深化的了解!本文算是对我Netty学习效果的阶段性总结,后续有时间,我还会持续推出Netty工业级开发的相关文章,敬请期待!

转载请保留出处和链接!

本文链接:http://www.dgcknigc.com/articles/41.html

文章底部广告(PC版)
文章底部广告(移动版)
百度分享获取地址:http://share.baidu.com/

本文标签:

百度推荐获取地址:http://tuijian.baidu.com/,百度推荐可能会有一些未知的问题,使用中有任何问题请直接联系百度官方客服!
评论框上方广告(PC版)
评论框上方广告(移动版)
推荐阅读
07月16日

糯米,73岁、84岁为什么被称为坎儿年?老年人怎么迈过此坎?,点到为止

发布 : | 分类 : 民生新闻 | 评论 : 0人 | 浏览 : 195次

长寿是每个人的愿望,但是最近几年,三高问题以及心血管疾病的出现,越来越多的老年人与长寿擦肩而过。高血压是比较容易犯的病,平时不良生活饮食习惯以及多种环境因素,加上正常血压调节机制失常,让越来越多的人血压节节攀升。...

标签 :