Java知识分享网 - 轻松学习从此开始!    

Java知识分享网

Java1234官方群25:java1234官方群17
Java1234官方群25:838462530
        
SpringBoot+SpringSecurity+Vue+ElementPlus权限系统实战课程 震撼发布        

最新Java全栈就业实战课程(免费)

springcloud分布式电商秒杀实战课程

IDEA永久激活

66套java实战课程无套路领取

锋哥开始收Java学员啦!

Python学习路线图

锋哥开始收Java学员啦!
当前位置: 主页 > Java文档 > Java基础相关 >

Java编程方法论 之 Spring Reactor Reactor-Netty Spring


分享到:
时间:2020-08-22 10:02来源:http://www.java1234.com 作者:小锋  侵权举报
Java编程方法论 之 Spring Reactor Reactor-Netty Spring Webflux 全面解读 PDF 下载
失效链接处理
Java编程方法论 之 Spring Reactor  Reactor-Netty  Spring Webflux 全面解读 PDF 下载


本站整理下载:
 
相关截图:
 
主要内容:

ChannelPool 设计实现解读
回到Connection的设定这节最初,我们主要是通过 ChannelPool 来解决与多个服务端交互以及与单
个服务端建立多个连接的问题。那么这里就来对 ChannelPool 其中的设计与实现进行探索一番。
ChannelPool ,顾名思义,就是一个管理 channel 的容器,里面包含了从容器里获取 channel ,
将使用的 channel 放回容器中,还有一个就是关闭容器,于是,就有下面这个接口设计:
作为一个容器资源,为了让 ChannelPool 具备自动关闭的能力,即可通过 try-with-resources 的
编码方式释放资源, ChannelPool 继承 Closeable 接口,在关闭释放容器这块儿它的实现类只需
实现 close() 即可。 同时,为了实现 ChannelPool 作为容器的属性,这里采用 Deque 来做元素
存储,为了应对并发操作,同时针对不同的jdk版本做了兼容实现,然后在 ChannelPool 实现类
SimpleChannelPool 中进行调用:
public interface ChannelPool extends Closeable {
    Future<Channel> acquire();
    Future<Channel> acquire(Promise<Channel> promise);
    Future<Void> release(Channel channel);
    Future<Void> release(Channel channel, Promise<Void> promise);
    @Override
    void close();
} 123456789
10
11
12
13
假如我们想要从容器中获取或者返还一个 channel 时候附加一些自定义的动作,假如是纯粹的
JDK8+ 的环境的话,我们完全可以通过我们之前接触的函数式接口进行外包策略的设计。但是,为
了保证兼容性, netty 就不得不去做妥协,使用传统的一些代码设计手段,不过,依然是外包策
略,通过一个动作接口来进行,在这个接口中我们分别设定初始化 channel 时进行的动作,从
deque 中获取 channel 时进行的动作,将 channel 返还给 deque 时进行的动作,此接口设计代
码如下:
但我们还漏了一点,通过 channelPool 的 acquire() 方法获取到一个 channel ,直到这个
channel 返还回去,这个期间要做的动作是不是也应该封装一个接口规范,方便使用者做对接,那
索性在调用 io.netty.channel.pool.SimpleChannelPool#acquire() 方法的时候返回一个
Future<Channel> 类型的对象,然后在这个 Future<Channel> 对象上添加一个监听器,里面存放
我们的对接逻辑,这样,在 SimpleChannelPool#acquire() 获取成功时就会执行我们所定义监听
器中的逻辑,也就自然做到了类函数式的封装设计。 channelPool 中获取一个 channel ,在刚开
始的时候就绝对会涉及到创建 channel ,那么上面 channelCreated(Channel ch) 这个动作就应
该放在一个 channelHandler 中进行,那在 SimpleChannelPool 中定义一个 Bootstrap 类型的
字段,在我们在外围将 bootstrap 配置完后传入这个类中即可,并在 SimpleChannelPool 的构造
器中给 bootstrap 进行添加 channelHandler 的设定:
//io.netty.util.internal.PlatformDependent#newConcurrentDeque
public static <C> Deque<C> newConcurrentDeque() {
    if (javaVersion() < 7) {
        return new LinkedBlockingDeque<C>();
   } else {
        return new ConcurrentLinkedDeque<C>();
   }
}
//io.netty.channel.pool.SimpleChannelPool
private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
123456789
10
//io.netty.channel.pool.ChannelPoolHandler
public interface ChannelPoolHandler {
  void channelCreated(Channel ch) throws Exception;
  void channelAcquired(Channel ch) throws Exception;
  void channelReleased(Channel ch) throws Exception;    
} 123456789
这样,在初始化 channel 的时候就会执行我们所设定的 channelCreated(Channel ch) ,接下来
就是从 channelPool 中获取 channel ,刚开始从 deque 中获取,获取不到,那就调用
io.netty.bootstrap.Bootstrap#connect() 。新建的 channel 由于建完就拿来用了,并没有
放入 deque 中,无须调用在 ChannelPoolHandler 接口中定义的 channelAcquired ,如果是从
deque 中获取到的 channel ,那么就必须对该 channel 进行一些检查,这个检查主要还是针对我
们的自定义逻辑,也就是 channelAcquired :

 

------分隔线----------------------------

锋哥公众号


锋哥微信


关注公众号
【Java资料站】
回复 666
获取 
66套java
从菜鸡到大神
项目实战课程

锋哥推荐