Spark

概述

Spark最初由美国加州伯克利大学(UCBerkeley)的AMP(Algorithms,Machinesand People)实验室于2009年开发,是基于内存计算的大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序。Spark在诞生之初属于研究性项目,其诸多核心理念均源自学术研究论文。2013年,Spark加入Apache孵化器项目后,开始获得迅猛的发展,如今已成为Apache软件基金会最重要的三大分布式计算系统开源项目之一(即Hadoop、Spark、Storm) 

特点

  • 运行速度快:Spark使用先进的DAG(Directed Acyclic Graph,有向无环图)执行引擎,以支持循环数据流与内存计算,基于内存的执行速度可比Hadoop MapReduce快上百倍,基于磁盘的执行速度也能快十倍;
  • 容易使用:Spark支持使用Scala、Java、Python和R语言进行编程,简洁的API设计有助于用户轻松构建并行程序,并且可以通过SparkShell进行交互式编程
  • 通用性:Spark提供了完整而强大的技术栈,包括SQL查询、流式计算、机器学习和图算法组件,这些组件可以无缝整合在同一个应用中,足以应对复杂的计算;
  • 运行模式多样:Spark可运行于独立的集群模式中,或者运行于Hadoop中,也可运行于Amazon EC2等云环境中,并且可以访问HDFS、Cassandra、HBase、Hive等多种数据源。

Spark与Hadoop的对比

Spark在借鉴Hadoop MapReduce优点的同时,很好地解决了MapReduce所面临的问题。相比于MapReduce,Spark主要具有如下优点:

  • Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比MapReduce更灵活;
  • Spark提供了内存计算,中间结果直接放到内存中,带来了更高的迭代运算效率;
  • Spark基于DAG的任务调度执行机制,要优于MapReduce的迭代执行机制

Spark最大的特点就是将计算数据、中间结果都存储在内存中,大大减少了IO开销,因而,Spark更适合于迭代运算比较多的数据挖掘与机器学习运算。使用Hadoop进行迭代计算非常耗资源,因为每次迭代都需要从磁盘中写入、读取中间数据IO开销大。而Spark将数据载入内存后,之后的迭代计算都可以直接使用内存中的中间结果作运算,避免了从磁盘中频繁读取数据。

Spark生态系统

Spark的生态系统主要包含了SparkCore、Spark SQL、Spark Streaming、MLLib和Graphx 等组件

  • Spark Core包含Spark的基本功能,如内存计算、任务调度、部署模式、故障恢复、存储管理等。Spark建立在统一的抽象RDD之上,使其可以以基本一致的方式应对不同的大数据处理场景;通常所说的Apache Spark,就是指SparkCore
  • Spark SQL允许开发人员直接处理RDD,同时也可查询Hive、HBase等外部数据源。Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行查询,并进行更复杂的数据分析
  • Spark Streaming支持高吞吐量可容错处理实时流数据处理,其核心思路是将流式计算分解成一系列短小的批处理作业。SparkStreaming支持多种数据输入源,如Kafka、Flume和TCP套接字等
  • Mlib(机器学习) 提供了常用机器学习算法的实现,包括聚类、分类、回归、协同过滤等,降低了机器学习的门槛,开发人员只要具备一定的理论知识就能进行机器学习的工作
  • GraphX(图计算)是Spark中用于图计算的API,可认为是Pregel在Spark上的重写及优化,Graphx性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法。

基本概念

  • RDD:是弹性分布式数据集(Resilient Distributed Dataset)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型
  • DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系
  • Executor(执行者):是运行在工作节点(Worker Node)上的一个进程,负责运行任务,并为应用程序存储数据
  • 应用:用户编写的Spark应用程序
  • 任务:运行在Executor上的工作单元
  • 作业:一个作业包含多个RDD及作用于相应RDD上的各种操作
  • 阶段:是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为“阶段”,或者也被称为“任务集”

Spark架构

运行架构组成

  1. 集群资源管理器(Cluster Manager)
  2. 运行作业任务的工作节点(Worker Node)
  3. 每个应用的任务控制节点(Driver)
  4. 每个工作节点上负责具体任务的执行进程(Executor)

集群资源管理器可以是Spark自带的资源管理器,也可以是YARN或Mesos等资源管理框架。

Executor对比MapReduce

相较于Hadoop的MapReduce计算架构,Spark的Executor有两个优点

优点一:利用多线程来执行具体的任务(Hadoop MapReduce采用的是进程模型)减少任务的启动开销

特征 Spark多线程模型 MapReduce多进程模型
任务执行单元 线程(Thread) 进程(Process)
资源管理 共享JVM进程资源(内存、连接池) 每个任务独占独立JVM进程资源
启动开销 毫秒级(线程池复用) 秒级(每次启动新进程)
资源隔离性 弱(线程间可能资源争用) 强(进程间资源隔离)
适用场景 低延迟、内存密集型任务 高资源隔离需求的批处理任务
  1. 进程模型(MapReduce)

    • 每个Map/Reduce Task启动独立JVM进程
    • 开销来源
      • 进程创建需要申请内存、加载JVM(约1秒)
      • 任务结束后进程销毁,资源无法复用
    • 示例:处理1000个任务需启动1000次JVM进程
  2. 线程模型(Spark)

    • Executor进程内预分配线程池(如100线程)
    • 优化机制
      • 线程复用:任务结束后线程返回池中待用
      • 共享内存:同一Executor内任务共享堆内存(如广播变量)
    • 示例:1000个任务在10个Executor(各100线程)中仅需10次JVM启动

优点二:Executor中有一个Block Manager存储模块,会将内存和磁盘共同作为存储设备,当需要多轮迭代计算时,可以将中间结果存储到这个存储模块里,下次需要时,就可以直接读该存储模块里的数据,而不需要读写到HDFS等文件系统里,因而有效减少了IO开销;或者在交互式查询场景下,预先将表缓存到该存储系统上,从而可以提高读写IO性能

维度

Block Manager存储 HDFS存储
存储层级 内存优先,磁盘兜底(LRU淘汰策略) 纯磁盘存储(3副本冗余)
数据生命周期 任务/会话级(随Executor进程释放) 持久化存储(需手动删除)
访问速度 内存:纳秒级;本地磁盘:微秒级 网络磁盘:毫秒级(跨节点传输)
适用场景 迭代计算中间结果/热点数据缓存 原始数据持久化/冷数据存储

应用组成

在Spark中

  • 一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成
  • 一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。

执行一个应用时,任务控制节点会向集群管理器(Cluster Manager)申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行任务,运行结束后,执行结果会返回给任务控制节点,或者写到HDFS或者其他数据库中

运行流程

  1. 当一个Spank应用被提交时,首先需要为这个应用构建起基本的运行环境,即由任务控制节点(Driver)创建一个SparkContext,由SparkContext负责和资源管理器(ClusterManager)的通信以及进行资源的申请、任务的分配和监控等。SparkContext会向资源管理器注册并申请运行Executor的资源
  2. 资源管理器为Executor分配资源,并启动Executor进程,Executor运行情况将随着“心跳”发送到资源管理器上;
    • 存活确认:证明Executor进程正常运行(默认间隔3秒)
    • 资源状态上报:包含CPU/内存使用量、缓存数据量等指标
    • 任务状态同步:汇报正在执行的任务进度(如Task完成度百分比)
  3. SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAG调度器(DAGScheduler)进行解析,将DAG图分解成多个“阶段”(每个阶段都是一个任务集),并且计算出各个阶段之间的依赖关系,然后把一个个“任务集”提交给底层的任务调度器(TaskScheduler)进行处理
  4. Executor向SparkContext申请任务任务调度器将任务分发给Executor运行,同时,SparkContext将应用程序代码发放给Executor
  5. 任务在Executor上运行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后引入数据并释放所有资源。

运行架构特点

(1)每个应用都有自己专属的Executor进程,并且该进程在应用运行期间一直驻留Executor进程以多线程的方式运行任务,减少了多进程任务频繁的启动开销,使得任务执行变得非常高效和可靠

(2)Spark运行过程与资源管理器无关,只要能够获取Executor进程并保持通信即可

(3)Executor上有一个BlockManager存储模块,类似于键值存储系统(把内存和磁盘共同作为存储设备),在处理迭代计算任务时,不需要把中间结果写入到HDFS等文件系统,而是直接放在这个存储系统上,后续有需要时就可以直接读取;在交互式查询场景下,也可以把表提前缓存到这个存储系统上,提高读写IO性能

(4)任务采用了数据本地性和推测执行等优化机制。数据本地性是尽量将计算移到数据所在的节点上进行,即“计算向数据靠拢”,因为移动计算比移动数据所占的网络资源要少得多。而且,Spark采用了延时调度机制,可以在更大的程度上实现执行过程优化。比如,拥有数据的节点当前正被其他的任务占用,那么,在这种情况下是否需要将数据移动到其他的空闲节点呢?答案是不一定。因为,如果经过预测发现当前节点结束当前任务的时间要比移动数据的时间还要,那么,调度就会等待,直到当前节点可用。

RDD

RDD:弹性分布式数据集(Resilient Distributed Dataset)

RDD提供一个抽象的数据架构,我们不必担心底层数据的分布特性,只需要将具体的应用逻辑表达为一系列转换处理,不同RDD之间的转换操作形成依赖关系,可以实现管道化,从而避免中间结果的存储,大大降低数据复制、磁盘IO和序列化开销。

概念

  • 一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算。
  • RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和group by)而创建得到新的RDD。
  • RDD提供了一组丰富的操作以支持常见的数据运算,分为“行动”(Action)和“转换”(Transformation)两种类型,前者用于执行计算并指定输出的形式,后者指定RDD之间的相互依赖关系

行动与转换操作 

转换操作:接受RDD并返回RDD。例如map、filter、groupBy、join等

行动操作:接受RDD返回非RDD(输出一个值或结果)。例如count、collect

RDD提供的转换接口都非常简单,都是类似map、filter、groupBy、join等粗粒度的数据转换操作,而不是针对某个数据项的细粒度修改。因此,RDD比较适合对于数据集中元素执行相同操作的批处理式应用,而不适合用于需要异步细粒度状态的应用,比如Web应用系统、增量式的网页爬虫等。

RDD可以很好地应用于许多并行计算应用中,可以具备很多现有计算框架(比如MapReduce、SQL、Pregel等)的表达能力,并且可以应用于这些框架处理不了的交互式数据挖掘应用。

执行过程

  1. RDD读入外部数据源(或者内存中的集合)进行创建
  2. RDD经过一系列的“转换”操作,每一次都会产生不同的RDD供给下一个“转换”使用
  3. 最后一个RDD经“行动”操作进行处理,并输出到外部数据源

这一系列处理称为一个Lineage(血缘关系),即DAG拓扑排序的结果

优点:惰性调用、管道化、避免同步等待、不需要保存中间结果、每次操作变得简单

运行原理

RDD采用了惰性调用,即在RDD的执行过程中,真正的计算发生在RDD的“行动”操作,对于“行动”之前的所有“转换”操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算。

逻辑上生成RDD,只有在进行“行动”操作时候,Spark才会根据RDD的依赖关系生成DAG,并从起点开始真正的计算

采用惰性调用,通过血缘关系连接起来的一系列RDD操作就可以实现管道化(pipeline),避免了多次转换操作之间数据同步的等待,而且不用担心有过多的中间数据,因为这些具有血缘关系的操作都管道化了,一个操作得到的结果不需要保存为中间数据,而是直接管道式地流入到下一个操作进行处理。同时,这种通过血缘关系把一系列操作进行管道化连接的设计方式,也使得管道中每次操作的计算变得相对简单,保证了每个操作在处理逻辑上的单一性;相反,在MapReduce的设计中,为了尽可能地减少MapReduce过程在单个MapReduce中会写入过多复杂的逻辑。

RDD之间的依赖关系

RDD中不同的操作会使得不同RDD中的分区会产生不同的依赖。

RDD中的依赖关系分为窄依赖(NarrowDependency)与宽依赖(Wide Dependency)。

  • 窄依赖表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区
  • 宽依赖则表现为存在一个父RDD的一个分区对应一个子RDD的多个分区。

  • Spark的这种依赖关系设计,使其具有了天生的容错性,大大加快了Spark的执行速度
  • 因为,RDD数据集通过“血缘关系记住了它是如何从其它RDD中演变过来的;
  • 血缘关系记录的是粗颗粒度的转换操作行为,当这个RDD的部分分区数据丢失时,它可以通过血缘关系获取足够的信息来重新运算和恢复丢失的数据分区,由此带来了性能的提升。
  • 相对而言,在两种依赖关系中,窄依赖的失败恢复更为高效,它只需要根据父RDD分区重新计算丢失的分区即可(不需要重新计算所有分区),而且可以并行地在不同节点进行重新计算。而对于宽依赖而言,单个节点失效通常意味着重新计算过程会涉及多个父RDD分区,开销较大
  • 此外,Spark还提供了数据检查点和记录日志,用于持久化中间RDD,从而使得在进行失败恢复时不需要追溯到最开始的阶段。在进行故障恢复时Spark会对数据检查点开销和重新计算RDD分区的开销进行比较,从而自动选择最优的恢复策略。

RDD编程基础

RDD创建

1.  通过parallelize(并行化)方法创建RDD,可以调用SparkContext的parallelize方法,在Driver中一个已经存在的数组上创建。

>>> words=("Hadoop is good", "Spark is fast", "Spark is better")
>>> rdd = sc.parallelize(words)
>>> rdd.foreach(print)
Spark is better
Hadoop is good
Spark is fast

2. RDD本地写入

rdd.saveAsTextFile("file:///root/writeback")

3. 本地加载数据创建RDD

>>> lines = sc.textFile("file:///root/data.txt")
>>> lines.foreach(print)
Spark is better
Hadoop is good
Spark is fast

4. 从Hdfs加载数据创建RDD

>>> lines = sc.textFile("/data.txt")
>>> lines.foreach(print)
Hadoop is good
Spark is fast
Spark is better

RDD转换

1.  filter(func):筛选出满足函数func的元素,并返回一个新的数据集

>>> lines = sc.textFile("/zzh/data.txt")
>>> lines.filter(lambda line:"Spark" in line).foreach(print).count()
Spark is fast (0 + 2) / 2]
Spark is better

2. map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集

>>> lines.map(lambda line : line.split(" ")).foreach(print)['Hadoop', 'is', 'good']['Spark', 'is', 'fast']['Spark', 'is', 'better']

3. flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果

>>> lines.flatMap(lambda line : line.split(" ")).foreach(print)
Hadoop
is
good
Spark
is
fast
Spark
is
better

RDD行动

1) count() 返回数据集中的元素个数

>>> lines.flatMap(lambda line : line.split(" ")).count()
9

2) collect() 以数组的形式返回数据集中的所有元素

>>> lines.flatMap(lambda line : line.split(" ")).collect()
['Hadoop', 'is', 'good', 'Spark', 'is', 'fast', 'Spark', 'is', 'better']

3) first() 返回数据集中的第一个元素

>>> lines.flatMap(lambda line : line.split(" ")).first()
'Hadoop'

4) take(n) 以数组的形式返回数据集中的前n个元素

>>> lines.flatMap(lambda line : line.split(" ")).take(5)
['Hadoop', 'is', 'good', 'Spark', 'is']

5) reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素

lines.map(lambda line : len(line.split(" "))).foreach(print)
3
3
3
lines.map(lambda line : len(line.split(" "))).reduce(lambda a, b : a + b )
9

6) foreach(func) 将数据集中的每个元素传递到函数func中运行

持久化

在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。这对于迭代计算而言,代价是很大的,可以使用persist()方法对一个RDD标记为持久化,持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用

  1. persist(MEMORY_ONLY)表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。
  2. persist(MEMORY_AND_DISK)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上。
  3. 一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)。
  4. 使用unpersist()方法手动地把持久化的RDD从缓存中移除
>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> rdd.cache()ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289
>>> print(rdd.count())3
>>> print(','.join(rdd.collect()))Hadoop,Spark,Hive

广播变量

广播变量是一种在Spark中用来共享变量的工具。它允许程序开发者在每个机器上缓存一个只读的变量,而不需要为每个任务都复制一份。这节省了内存和计算资源。广播变量的主要用途是在多个阶段的任务之间共享数据。

广播变量的不可变性: 一旦你创建了广播变量,它的值就不能再被修改。这确保了在整个集群中,所有节点都获得相同的值。

通过SparkContext.broadcast(v)来将一个普通变量v转化为广播变量。

broadcastVar = sc.broadcast([1, 2, 3]) 
broadcastVar.value
[1,2,3]

累加器

什么是累加器? 累加器就像一个特殊的计数器,可以在不同的计算节点上进行累加操作。它用于跟踪和记录计数或求和的结果。

如何创建累加器? 你可以使用SparkContext.accumulator()来创建一个数值型累加器。这个累加器可以在集群中的任务中使用add方法来将数值累加到其中。但要注意,任务只能进行累加操作,不能直接读取累加器的值。只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值。

为什么使用累加器? 累加器非常有用,因为它们允许在分布式环境中有效地进行累加操作,而不需要担心数据同步和并发问题。

>>> accum = sc.accumulator(0)
>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.add(x))
>>> accum.value10

分区

RDD 分区的一个原则是使得分区的个数尽量等于集群中的 CPU 核心(Core)数目。对于不同的Spark 部署模式(Local 模式、Standalone 模式、YARN 模式、Mesos 模式)而言,都可以通过设置spark.default.parallelism 这个参数的值,来配置默认的分区数目。一般而言,各种模式下的默认分区数目如下。

  • Local 模式:默认为本地机器的 CPU 数目,若设置了 local[N],则默认为 N。
  • Standalone 或 YARN 模式:在“集群中所有 CPU 核心数目总和”和“2”这二者中取较大值作为默认值。
  • Mesos 模式:默认的分区数为 8。

1. 手动设置分区

创建 RDD 时手动指定分区个数,在调用 textFile()和 parallelize()方法的时候手动指定分区个数

>>> list = [1,2,3,4,5] 
>>> rdd = sc.parallelize(list,2) //设置两个分区

对于 parallelize()而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism 。

对于textFile()而言,如果没有在方法中指定分区数,则默认为 min(spark.default.parallelism,2),如果是从 HDFS 中读取文件,则分区数为文件分片数(比如,128MB/片)。

2. 使用 repartition 方法重新设置分区个数

>>> data = sc.parallelize([1,2,3,4,5],2) 
>>> len(data.glom().collect()) #显示 data 这个 RDD 的分区数量
2
>>> rdd = data.repartition(1) #对 data 这个 RDD 进行重新分区
>>> len(rdd.glom().collect()) #显示 rdd 这个 RDD 的分区数量
1

3. 自定义分区

# 生成键值对数据
data = [(i, f"value_{i}") for i in range(10)]
# 转换为键值对RDD
rdd = sc.parallelize(data)
# 使用partitionBy,这里选择10个分区,根据键将数据分到对应的分区
partitionedRDD = rdd.partitionBy(10)
# 查看每个分区的数据
result = partitionedRDD.glom().collect()
for i, partition in enumerate(result):
print(f"Partition {i}: {list(partition)}")
# 保存到文件系统
partitionedRDD.saveAsTextFile("file:///root/partitionBy")

键值对RDD 

键值对RDD的创建

我们可以采用多种方式创建键值对RDD,通过并行集合(列表)创建RDD

list = ["Hadoop","Spark","Hive","Spark"]
rdd = sc.parallelize(list)
rdd.saveAsTextFile("file:///root/kvRDD")
pairRDD = rdd.map(lambda word : (word,1))
pairRDD.foreach(print)

​使用map()函数来实现

lines = sc.textFile("file:///root/kvRDD")
pairRDD = lines.flatMap(lambda line : line.split(" ")).map(lambda word : (word,1))
pairRDD.foreach(print)

常用的键值对转换操作 

常用的键值对转换操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等。

  • reduceByKey(func)

reduceByKey(func)的功能是,使用func函数合并具有相同键的值。

>>>pairRDD.reduceByKey(lambda a,b : a+b).foreach(print)
(Spark,2)
(Hive,1)
(Hadoop,1)

  • groupByKey()

groupByKey()的功能是,对具有相同键的值进行分组。

比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),采用groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5))。

>>>pairRDD.groupByKey().foreach(print)
('spark', <pyspark.resultiterable.ResultIterable object at 0x7f1869f81f60>)

('hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f1869f81f60>)

('hive', <pyspark.resultiterable.ResultIterable object at 0x7f1869f81f60>)

#查看迭代器中内容
>>>pairRDD.groupByKey().map(lambda x : (x[0],tuple(x[1]))).foreach(print)

#实现reduceByKey的效果
>>>pairRDD.groupByKey().map(lambda x : (x[0],sum(x[1]))).foreach(print)

  • keys()

keys()只会把键值对RDD中的key返回形成一个新的RDD。

>>>pairRDD.keys().foreach(print)

Hadoop
Spark
Hive
Spark

  • values()

values()只会把键值对RDD中的value返回形成一个新的RDD。

>>>pairRDD.values().foreach(print)

  • sortByKey()

sortByKey()的功能是返回一个根据Key,也就是键进行排序的RDD。默认升序。

>>>d1 = sc.parallelize([("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17), ("c",2),("f",29),("g",21),("b",9)])

>>>d1.reduceByKey(lambda a,b:a+b).sortByKey().collect()

>>>d1.reduceByKey(lambda a,b:a+b).sortByKey(False).collect()

  • sortBy()

sortBy()则可以根据其他字段进行排序。

>>>d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],False).collect()

  • mapValues(func)

我们经常会遇到一种情形,我们只想对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。对于这种情形,Spark提供了mapValues(func),它的功能是,对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。

>>>pairRDD.mapValues( lambda x : x+1).foreach(print)

(Hadoop,2)
(Spark,2)
(Hive,2)
(Spark,2)

  • join

join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。

对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

>>>pairRDD1 = sc.parallelize([('Spark','fast')])

>>>pairRDD1.join(pairRDD).foreach(print)

('Spark', ('fast', 1))
('Spark', ('fast', 1))

  • combineByKey

combineByKey可以根据Key进行数据聚合,combineByKey相比于之前的方法,提供了更精细的流程安排,同时更加适合复杂逻辑与大规模数据的处理场景。假设有一些销售数据,数据采用键值对的形式,即<公司,当月收入>,要求使用 combineByKey 操作求出每个公司的总收入和每月平均收入,并保存在本地文件中。

>>>data = sc.parallelize([("company-1",88),("company-1",96),("company-1",85),("company-2",94),("company-2",86),("company-2",74),("company-3",86),("company-3",88),("company-3",92)],3)

>>>combineValues = data.combineByKey(lambda income:(income,1),\
lambda acc,income:(acc[0]+income, acc[1]+1),\
lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))


>>>combineValues.foreach(print)
>>>res = combineValues.map(lambda x:(x[0],x[1][0],x[1][0]/float(x[1][1])))
>>>res.foreach(print)
>>>res.repartition(1).saveAsTextFile("file:///root/combineresult")

表1 combineByKey中的三个重要参数详解

参数名称

参数值

参数含义

createCombiner

lambda income:(income,1)

在第一次遇到 key 时创建组合器函数,将 RDD 键值对中的values转换成 Combiner类型值。

mergeValue

lambda acc,income:(acc[0]+income, acc[1]+1)

合并值函数,遇到相同的 key 时,将新的values添加到之前存储的Combiner类型值。

mergeCombiners

lambda x:(x[0],x[1][0],x[1][0]/float(x[1][1]))

合并组合器函数,将保存的内容合并为一个值。

SparkSQL基础

SparkSession 支持从不同的数据源加载数据,以及把数据转换成 DataFrame,并且支持把DataFrame 转换成 SQLContext 自身的表,然后使用 SQL 语句来操作数据。SparkSession 亦提供了HiveQL 以及其他依赖于 Hive 的功能的支持

from pyspark.sql import SparkSession

# 创建一个Spark会话
spark = SparkSession.builder.appName("example").getOrCreate()

DataFrame 的读写

在创建 DataFrame 时,可以使用 spark.read 操作,实现从不同类型的文件中加载数据创建DataFrame:

  • 读取文本文件 people.txt:spark.read.text(“people.txt”)
  • 读取JSON文件people.json:spark.read.json(“people.json”)
  • 读取csv文件people.csv:spark.read.csv (“people.csv”)

可以使用 spark.write 操作,把一个 DataFrame 保存成不同格式的文件:

  • 保存文本文件 people.txt:spark.write.text(“people.txt”)
  • 保存JSON文件people.json:spark.write.json(“people.json”)
  • 保存csv文件people.csv:spark.write.csv (“people.csv”)
# 读取
df=spark.read.csv("file:////opt/software/spark/examples/src/main/resources/people.csv", header=True, inferSchema=True, sep=";")
df.show()
+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

#写入
df.select('name','age').write.csv("file:////opt/software/jupyter_notebook/people")

DataFrame的常用操作

printSchema()

可以使用 printSchema()操作打印出 DataFrame 的模式(Schema)信息。

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)

select()

从 DataFrame 中选取部分列的数据。

df.select(df['name'],df['age']+1).show()
+-----+---------+
| name|(age + 1)|
+-----+---------+
|Jorge|       31|
|  Bob|       33|
+-----+---------+

filter()

找到满足条件要求的记录。

df.filter(df['age']>30).show()
+----+---+---------+
|name|age|      job|
+----+---+---------+
| Bob| 32|Developer|
+----+---+---------+

groupBy()

对数据分组。

df.groupBy('job').count().show()
+---------+-----+
|      job|count|
+---------+-----+
|Developer|    2|
+---------+-----+

sort()

进行排序。

df.sort(df['age'].asc(),df['name'].desc()).show()
+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

 

页面链接:https://www.datazzh.top/archives/1683/2025/02/28/
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇