使用 Spark 学习 Scala 隐含函数

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / Java 学习路线 / 一对一提问 / 学习打卡/ 赠书活动

目前,正在 星球 内带小伙伴们做第一个项目:全栈前后端分离博客项目,采用技术栈 Spring Boot + Mybatis Plus + Vue 3.x + Vite 4手把手,前端 + 后端全栈开发,从 0 到 1 讲解每个功能点开发步骤,1v1 答疑,陪伴式直到项目上线,目前已更新了 204 小节,累计 32w+ 字,讲解图:1416 张,还在持续爆肝中,后续还会上新更多项目,目标是将 Java 领域典型的项目都整上,如秒杀系统、在线商城、IM 即时通讯、权限管理等等,已有 870+ 小伙伴加入,欢迎点击围观

不久前,我写了两篇关于避免在 Spark 中使用 groupBy 函数的文章。虽然我不会在这里重新散列这两篇文章,但底线是利用 combineByKey aggreagateByKey 函数。虽然这两个函数都有可能提高我们的 Spark 作业的性能和效率,但有时为基本用例反复创建所需的参数可能会变得乏味。这让我开始思考是否有一种方法可以为基本用例提供某种程度的抽象?例如,将值分组到列表或集合中。同时,我一直在努力扩展我对 Scala 更高级功能的了解,包括 隐式 TypeClasses 。我想到的是 GroupingRDDFunctions 类,它通过使用 Scala 的 隐式类 功能为 aggregateByKey 函数的基本用例提供一些语法糖。

Scala 隐式简述

虽然对 Scala 的隐含函数的完整解释超出了本文的范围,但这里有一个简短的描述。当 Scala 编译器发现错误类型的变量或表达式时,它会寻找 implicit 函数、表达式或类来提供正确的类型。 implicit 函数(或类)需要在当前范围内,编译器才能完成它的工作。这通常通过导入包含隐式定义的 Scala 对象来完成。在这种情况下, GroupingRDDFunctions 类包装在 GroupingRDDUtils 对象中。这是类声明:


 object GroupingRDDUtils {
  implicit class GroupingRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Logging with Serializable {
//... details left out for clarity
   }
}

要使用 GroupingRDDFunctions,只需使用以下导入语句:


 object GroupingRDDUtils {
  implicit class GroupingRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Logging with Serializable {
//... details left out for clarity
   }
}

提供的功能

GroupingRDDFunctions 上定义的方法是:

  1. groupByKeyToList
  2. groupByKey唯一
  3. 按键计数
  4. sumWithTotal – 提供一个元组,其中数值的总和为 Double 以及用于创建总和的项目总数
  5. 平均按键

下面是一些使用 GroupingRDDFunctions 的示例:


 object GroupingRDDUtils {
  implicit class GroupingRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Logging with Serializable {
//... details left out for clarity
   }
}

这里真的没有什么特别的事情发生。我们只是简单地包装一个 RDD 实例,并提供在该 RDD 实例上使用上面列出的方法的能力。在 GroupingRDDFunctions 类中,我们仍然利用 aggregateByKey 函数。

隐式参数转换

作为隐式使用的另一个示例,让我们看一下 averageByKey 函数。在下面的代码中,我们通过将 avaragingFunction 应用于 sumWithTotal 方法返回的结果来按键计算平均值。但如果我们仔细观察,我们的键和值是“K”和“V”的泛型,但所有这些函数都适用于双打。


 object GroupingRDDUtils {
  implicit class GroupingRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Logging with Serializable {
//... details left out for clarity
   }
}

那么如果提供的值是整数而不是双精度值会怎样呢?另请查看 incrementCountSumValue 方法,如何将“V”类型的值添加到元组的双精度值中?这是使用隐式函数的好例子。编译器会寻找并找到 intToDouble 函数并应用到 incrementCountSumValue 方法的参数上。如果该值是整数,则将其隐式转换为双精度数,否则返回双精度数。

结论

虽然在 Scala 中使用隐式需要谨慎,但在我看来,此处提供的示例代表了一个很好的用例。我们正在为一个类添加一些有用的行为,只需添加一个 import 语句。此外,检查隐式类以查看幕后发生的事情也非常容易。

参考