响应式扩展:什么是异步性?

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / Java 学习路线 / 一对一提问 / 学习打卡 / 赠书活动

目前, 星球 内第2个项目《仿小红书(微服务架构)》正在更新中。第1个项目:全栈前后端分离博客项目已经完结,演示地址:http://116.62.199.48/。采用技术栈 Spring Boot + Mybatis Plus + Vue 3.x + Vite 4手把手,前端 + 后端全栈开发,从 0 到 1 讲解每个功能点开发步骤,1v1 答疑,陪伴式直到项目上线,目前已更新了 276 小节,累计 43w+ 字,讲解图:1917 张,还在持续爆肝中,后续还会上新更多项目,目标是将 Java 领域典型的项目都整上,如秒杀系统、在线商城、IM 即时通讯、权限管理等等,已有 1500+ 小伙伴加入,欢迎点击围观

响应式扩展:什么是异步性?

通过塔米尔德雷舍

在这篇摘自 Reactive Extensions in Action 的文章中,我定义了异步性并讨论了为什么它对 Reactive 应用程序如此重要。

异步消息传递是 Reactive 系统的一个关键特征,但是异步性到底是什么意思,为什么它对 Reactive 应用程序如此重要?

我们的生活由许多异步任务组成。您可能没有意识到这一点,但如果它们不是异步的,您的日常活动就会很烦人。

要了解异步是什么,我们首先需要了解非异步执行,或同步执行。

同步的:(韦氏词典)恰好同时发生、存在或出现。

同步执行意味着你必须等待一个任务完成才能继续下一个任务。同步执行的一个真实示例可能是您在柜台接近工作人员的方式,在店员等待时决定点什么,等到饭菜准备好,然后店员等到您付款。只有这样你才能去你的餐桌吃饭的下一个任务。该序列如图 1 所示。

图 1 同步食物订单,其中每个步骤都必须在进入下一个步骤之前完成。

这种类型的序列感觉像是在浪费时间(或者更确切地说,是在浪费资源),所以想象一下当您对应用程序执行相同操作时它们的感受。下一节将对此进行演示。

一切都与资源利用有关

想象一下,如果您必须等待每一个操作完成才能做其他事情,您的生活将会怎样。想一想当时等待和利用的资源。同样的问题也与计算机科学相关:


 resultA=LongOperationA(); 
resultB=LongOperationB(); 
resultC=LongOpertionsC();

在此同步代码片段中,LongOperationC() 在 LongOprationB() 和 LongOperationA() 完成之前不会开始执行。在执行这些方法中的每一个期间,调用线程被阻塞,它所拥有的资源实际上被浪费了,不能用于服务其他请求或处理其他事件。如果这发生在 UI 线程上,那么在执行完成之前,应用程序看起来会冻结。

如果这发生在服务器应用程序上,那么在某个时候我们可能会用完空闲线程并且请求将开始被拒绝。在这两种情况下,应用程序都会停止响应。

运行上面的代码片段所花费的总时间是

total_time = LongOperationA 时间 + LongOperationB 时间 + LongOperationC 时间

总完成时间是其组件完成时间的总和。

如果我们可以在不等待前一个操作完成的情况下开始一个操作,我们可以更好地利用我们的资源,这就是 异步执行 的目的。

异步执行意味着操作已启动,但它的执行在后台发生并且调用者未被阻止。相反,调用者会在操作完成时收到通知。在那段时间里,调用者可以继续做有用的工作。

在食品订购示例中,异步方法类似于坐在餐桌旁并由服务员服务。

首先,你坐在桌旁,服务员过来递上菜单,然后离开。当您决定要点什么时,服务员仍然可以为其他顾客服务。当您决定要吃什么菜时,服务员会回来为您点餐。在准备食物的同时,您可以自由聊天、使用手机或欣赏美景。你没有被阻止(服务员也没有)。食物准备好后,服务员将它带到您的餐桌上,然后回去为其他顾客服务,直到您要求结账并付款。

这种模型是异步的,任务是并发执行的,执行的时间和请求的时间不同,这样资源(比如waiter)就可以腾出时间来处理更多的请求。

异步执行发生的地方

在计算机程序中,我们可以区分两种类型的异步操作,基于 IO 和基于 CPU。

基于 CPU 的操作意味着异步代码将在另一个线程上运行,并在另一个线程上执行完成时返回结果。

IO-based operation是指在硬盘或网络等IO设备上进行操作。如果网络是这种情况,则向另一台机器发出请求(通过使用 TCP 或 UDP 或其他网络协议),并且当您机器上的操作系统通过中断从网络硬件获取信号时,结果返回,然后操作将完成。

两种情况下的调用线程都可以自由执行其他任务并处理其他请求和事件。

异步运行代码的方法不止一种,这取决于所使用的语言。现在,让我们看一个使用 Futures 的 .NET 实现执行异步工作的示例 - Task 类。

上面代码片段的异步版本将类似于以下代码:


 resultA=LongOperationA(); 
resultB=LongOperationB(); 
resultC=LongOpertionsC();

在此版本中,每个方法都返回一个 Task<T>。此类表示正在后台执行的操作。当调用每个方法时,调用线程不会被阻塞,方法会立即返回,然后在前一个方法仍在执行的同时调用下一个方法。当调用所有方法时,我们通过使用 Task.WaitAll(…) 方法等待它们完成,该方法获取任务和块的集合,直到它们全部完成。我们可以这样写的另一种方式是:


 resultA=LongOperationA(); 
resultB=LongOperationB(); 
resultC=LongOpertionsC();

这样我们得到相同的结果,我们等待每个任务完成(当它们仍在后台运行时)。如果在我们调用 Wait() 方法时任务已经完成,那么它将立即返回。

运行异步版本代码片段的总时间显示为: total_time = MAX(LongOperationA time , LongOperationB time , LongOperationC time )

因为所有方法都同时运行(甚至可能并行),运行代码所花费的时间将是最长操作的时间。

异步性和 Rx

异步执行并不仅限于使用 Task<T> 进行处理。

回顾时变变量的 Rx 表示——IObservable<T>——我们可以用它来表示任何异步模式,所以当异步执行完成(成功或有错误)时,执行链将运行并且依赖项将被评估。 Rx 提供了将不同类型的异步执行(如 Task<T>)“转换”为 IObservable 的方法。

例如,在 Shoppy 应用程序中,我们希望在我们的位置发生变化时获得新的折扣。对 Shoppy web 服务的调用以异步方式完成,当它完成时,我们希望更新我们的视图以显示新项目。


 resultA=LongOperationA(); 
resultB=LongOperationB(); 
resultC=LongOpertionsC();

在此示例中,我们对 myConnectivity 可观察对象上进行的连接更改做出反应。每次连接发生变化时,我们都会检查是否是因为我们在线,如果是,我们将调用异步 GetDiscounts 方法。方法执行完成后,我们选择返回的结果。这个结果将被推送给从我们的代码创建的 newDiscounts 可观察对象的观察者。

事件和流

在软件系统中,事件是一种消息,用于指示某事已发生。该事件可能代表技术事件——例如,在 GUI 应用程序中,我们可能会在按下的每个键或鼠标移动时看到事件。该事件还可以表示业务发生,例如在金融系统中完成的货币交易。

事件由 事件源 引发并由 事件处理 程序使用。

正如我们所见,事件是表示时变值的一种方式。而在 Rx 中,事件源可以由 observable 表示,事件处理程序可以由观察者表示。但是我们的应用程序正在使用的简单数据呢,例如位于数据库中或从网络服务器获取的数据。它在 Reactive 世界中有一席之地吗?

数据类型

您编写的应用程序最终将处理某种数据。数据可以有两种类型:动态数据和静态数据。静态数据是以数字格式存储的数据,您通常从某些持久存储(例如数据库或文件)中读取这些数据。动态数据是在网络(或其他介质)上移动并被推送到您的应用程序或由您的应用程序从任何外部源拉取的数据。

许多技术使用事件作为处理动态数据的方式。通常,应用程序注册到一个源,并在新数据到达或新数据可用于检索时引发事件。把它想象成一个有铃铛的邮箱,只要有人把东西放进盒子里,它就会响起。

在处理动态数据时,更容易将其视为数据流(或事件流),就像数据包从中流过的软管,就像您在图 2 中看到的那样。使用水管时,您可以用它做很多事情,例如在末端放置过滤器,添加提供不同功能的不同软管头。您可以在软管上添加压力监视器以帮助您调节流量。同样的事情是您希望对数据流执行的操作。您需要构建一个流经的管道,最终给出适合您的逻辑的最终结果,这包括过滤、转换、分组、合并等。

图2 数据流就像一根软管,每一滴水都是一个数据包,需要经过不同的站点,直到到达终点。你的数据也需要被过滤和转换,直到它到达真正的处理程序,用它做一些有用的事情。

数据和事件流非常适合 Rx observables;当用 IObservable 抽象它们时,我们有可能组合运算符并创建复杂的执行管道。