>>分享流行的Java框架以及开源软件,对孙卫琴的《精通Spring:Java Web开发技术详解》提供技术支持 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 16019 个阅读者 刷新本主题
 * 贴子主题:  Rust学习笔记 | 异步代码的几种写法 回复文章 点赞(0)  收藏  
作者:sunshine    发表时间:2021-02-04 03:20:09     消息  查看  搜索  好友  邮件  复制  引用

  
  作者:谢敬伟,江湖人称“刀哥”,20年IT老兵,数据通信网络专家,电信网络架构师,目前任Netwarps开发总监。刀哥在操作系统、网络编程、高并发、高吞吐、高可用性等领域有多年的实践经验,并对网络及编程等方面的新技术有浓厚的兴趣。
       Rust历史不长,仍然处于快速发展的历程中。关于异步编程的模式,现在已经发展到 async/await协程的高级阶段。大概是因为 async/await出现的时间还不长,所以现有大多数的开源项目并不是或不是纯粹使用 async/await来书写的,而是前前后后有多种的写法。这样的状况给 Rust的学习带来了一些的难度。在这里,我们来捋一捋异步代码的几种写法。

mio

最原始的方式是使用 mio进行开发。 mio是一个底层异步 I/O库,提供非阻塞方式的 API,具有很高的性能。实际上 mio是对于操作系统 epoll/kqueue/IOCP的封装。在 C/C++中我们使用 libevent之类的库, mio可以理解为对应的 Rust版本。基于 mio的代码大致如下:

       loop {
     // Poll Mio for events, blocking until we get an event.
     poll.poll(&mut events, None)?;

     // Process each event.
     for event in events.iter() {
         if event.is_writable() {
             // socket可写,开始发送数据
         }

         if event.is_readable() {
             // socket可读,开始接收数据
         }
         // socket 关闭,退出循环
         return Ok(());
     }
}    

总的来说,这是完全基于异步事件通知的写法,和 C/C++区别不是很大,异步代码对于程序员是一个挑战,当代码逻辑越来越复杂,添加新功能或是解决已有问题的难度也越来越大。

     另外, mio实现的是一个单线程事件循环,虽然可以处理成千上万路的 I/O操作,但没有多线程的能力,需要自己扩充。

Future Poll

为了更好地规范异步的逻辑, Rust抽象出 Future表示尚未发生的事物。这些 Future可以用很多方式組合成一个更复杂的复合 Future来代表一系列的事件。 Future需要程序主动去 poll(轮询)才能获取到最终的结果,每一次轮询的结果可能是 Ready或者 Pending。

     运行库提供 Executor和 Reactor来执行 Future,也就是调用 Future的 poll方法循环执行一系列就绪的 Future,当 Future返回 Pending的时候,会将 Future转移到 Reactor上等待唤醒。 Reactor被用来负责唤醒之前无法完成的 Future。事实上, tokio的 Reactor是基于 mio实现的,而 async-std/smol则是封装了 epoll/kqueue/IOCP,提供类似的功能。

     手动实现 Future是一件相对繁琐的工作,主要的问题在于异步模式本身的特性。例如,接收网络数据,无法臆测每次轮询会收到多少字节的数据,往往需要开辟一段接收缓冲区容纳数据,协议解码也需要一个状态机拼包向上层提交;发送网络数据存在相似问题,发送数据时底层未就绪,则缓冲发送数据,待下次轮询时,需要首先检查并处理发送缓冲区。另外还有一些值得注意的地方,如果手动实现的 Future返回 Pending,则必须自己实现唤醒机制,也就是需要将 cx克隆一份记下来,然后在适当的时侯调用 cx.wake()。因为网络相关的功能往往是分层的,因此手动的 Poll循环也会是层层堆叠的,这时候,返回值 Poll::Ready(T)就有学问了。泛型T可能包裹各种不同的数据, Option<T>, Result<T,E>,或者两者的组合。因为最外层还有一个 Poll<T>,所有这时候的 match语句写起来会非常臃肿,粘贴复制写很多代码,完成的功能却非常有限,而且由于这些代码很相似,大大增加了出错的可能性。

     标准库中仅仅定义了 Future,更多的相关功能需要引用 futures-rs类库,里面定义了一系列有关异步的操作,包括 Stream、 Sink、 AsyncRead、 AsyncWrite等基础 Trait,以及对应实现了大量方便操作的组合子的 Ext Trait,特别用途的 fused、 Box, Try系列的扩展,诸如 join!、 select!、 pin_mut!等一系列的宏。理论上,不使用这些扩展也能写出代码,只不过那样的代码很可能篇幅会长的可怕。值得一提的是,除了一些可以简化代码的过程宏之外,扩展 Trait提供的组合子也会让代码精简不少。比如 Future::and_then可以让代码写成链式调用的方式; Sink::send包装了 Sink发送三步骤  poll_ready/start_send/poll_flush,使用 .await一行代码直接就可以完成发送。因此,很多 poll方式的代码实际上是准确地说是混合式的,其中也使用了不少 async代码块。

     总之,搞清楚 Future相关的这些内容是需要花费不少时间,更不用说用它们来写代码了。不过,即便是使用 async/await这种更高级原语,也是有必要了解底层的工作原理和实现机制,所谓知其然知其所以然。

async/await

使用 async/await可以将异步的代码写得类似同步的过程,更加符合人体工程学。因为 async被翻译为一个 Future状态机,原先在 poll方式中需要处理的与 Pending相关的状态现在都由 async生成的状态机自动完成,因此大大减轻了程序员的心智负担。

     如前所述,底层的 Futures提供了很多方便的组合子扩展 Future,使用起来很简洁,可以极大地简化代码。例如,上文提到过的 Sink::send包装了发送缓冲区的实现和异步发送的三个步骤; AsyncRead::read_exact实现了读取指定字节数的功能,在处理网络协议解析时可以避免手写一个拼包状态机; AsyncWrite::write_all实现了发送全部数据以及发送缓冲,等等。正是在这些底层功能的支持下, async/await成为了更高级的书写异步代码的方式。也许会有少许担心,这样所谓“高级”会不会在性能上有很大损失?笔者个人不这么认为。自动实现的状态机也许未必比程序员手动完成的性能更差。状态机编程对于任何人,即便是一个有经验的程序员都是不小挑战。蹩脚的状态机实现不仅可能有性能问题,更大的风险来自于实现上的漏洞,以及维护上的困难。代码写出来更多是给别人看的,完成同样的功能,简洁的代码更有可能是更高质量的代码。

     以下例子是固定长度分割的报文接收过程,使用 async/await是很简单的。如果实现为一个 Stream/poll_next,代码会复杂很多。

  /// convenient method for reading a whole frame
    pub async fn recv_frame(&mut self) -> io::Result<Vec<u8>> {
        let mut len = [0; 4];
        let _ = self.inner.read_exact(&mut len).await?;  // inner socket, 支持 AsyncRead

        let n = u32::from_be_bytes(len) as usize;
        if n > self.max_frame_len {
            let msg = format!(
                "data length {} exceeds allowed maximum {}",
                n, self.max_frame_len
            );
            return Err(io::Error::new(io::ErrorKind::PermissionDenied, msg));
        }

        let mut frame = vec![0; n];
        self.inner.read_exact(&mut frame).await?;

        Ok(frame)
    }

最后,完全使用 async/await写代码目前还有几个问题:
  • async trait

    当前 Trait 不支持  async fn,无法直接用 Trait来抽象异步方法。暂时解决办法是使用三方库  async-trait。如下:
  use async_trait::async_trait;


trait Advertisement {
    async fn run(&self);
}

宏  async_trait将代码转换为一个返回  Pin<Box<dyn Future + Send + 'async>> 的同步方法。因为装箱和动态派发的原因,性能上会有少许损失。
  • 异步析构

    当前 drop方法必须是同步调用,不能使用 await语法。当一个 I/O对象越过生命周期被析构,往往在关闭底层句柄之前,还需要完成某些 I/O操作。比如,通知网络对端连接已经关闭。在同步代码中,我们只需要在 drop()中置入这些操作,但是在异步代码中,无法在 drop()中做类似的事情。
解决办法,总是在异步 I/O对象越过生命周期之前显式地执行关闭动作,或是,实现一个类似 GC的功能,专门负责清理工作。

展望

笔者在学习 Rust过程中,主要关注网络相关的并发编程。因为之前有在 Go版本的 ipfs/libp2p上的开发经验,故而学习研究了 rust-libp2p以及 nervos tentacle。 rust-libp2p是 Parity实现的准官方版本,但是这个项目的代码及其难懂,过于强调使用泛型参数的抽象,导致代码可读性非常差。请教了代码作者,他承认代码可能有些复杂,但也强调都是有原因的...  nervos tentacle的实现在协议上不够完整,特别是与标准 libp2p并不兼容。两个项目共有的特点是主要用 poll的方式写代码,逻辑上都是状态机的嵌套。

     因此,笔者试图完全使用 async/await方式重构 libp2p,参考 rust-libp2p的实现,代码协程化,向上层提供纯粹的异步接口,争取在 API层面的体验接近 go-libp2p,这是推广 Rust协程机制的一个尝试,同时也是个人的一个学习的过程。目前刚刚起步,仅完成了 secio与 yamux部分,待合适时机开源,期望更多 Rust爱好者共同来开发完善。
  参考:
Asynchronous Destructors
  深圳星链网科科技有限公司(Netwarps),专注于互联网安全存储领域技术的研发与应用,是先进的安全存储基础设施提供商,主要产品有去中心化文件系统(DFS)、企业联盟链平台(EAC)、区块链操作系统(BOS)。
微信公众号:Netwarps
----------------------------
原文链接:https://blog.51cto.com/14915984/2538928

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2021-02-14 18:59:29 重新编辑]
  Java面向对象编程-->Java语言的基本语法和规范
  JavaWeb开发-->访问数据库(Ⅱ)
  JSP与Hibernate开发-->持久化层的映射类型
  Java网络编程-->RMI框架
  精通Spring-->Vue指令
  Vue3开发-->Vue组件开发基础
  10分钟认识RocketMQ!想进阿里连这个都不会?
  Marshalling在Netty中的使用
  使用IntelliJ IDEA开发Maven HelloWorld
  Spring事务的声明和管理
  Spring 5 webflux响应式编程 - 但时间也偷换概念
  Spring MVC实现国际化的几种方式
  Spring的方法拦截器范例
  拦截器不拦截静态资源的三种处理方法
  SSM三大框架整合详细教程
  Spring Boot、SpringMVC进行i18n国际化支持:使用MessageSou...
  重新理解响应式编程
  RabbitMQ的用途、原理以及配置
  如何实现Git服务间同步
  3分钟让你明白JSON是什么
  微服务中的Kafka与Micronaut
  更多...
 IPIP: 已设置保密
树形列表:   
1页 0条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。