目录
1. 简述什么是Spark?
2. 简述Spark的四大特点
3. 简述Spark比Mapreduce执行效率高的原因
4. 简述Spark on Yarn的两种部署模式的区别和特点
5. Spark底层工作原理是怎样的
6. RDD算子分成了哪几类,各自的特点是什么?
7. RDD的五大特性和五大特点
8. RDD中的重分区算子,以及各自特点?
9. mapPartitions和foreachPartitions分区算子,相对map和foreach有什么优点?
10. 简述Spark持久化中缓存和checkpotin检查点的区别
11. 简述DAG和Stage形成过程
12. 简述Job调度流程
13. 简述SparkSQL和Hive的对比
14. 创建得到DataFrame的方式有哪些,适用于什么场景?
15. SparkSQL中数据清洗的API有哪些,各自作用是什么?
16. 设置SparkSQL的shuffle分区数的方式有哪几种?
17. 简述基于Pandas实现UDF和UDAF函数的步骤?
18. 简述SParkSQL函数的分类
19. 简述SparkSQL底层工作流程
20. 简述消息队列的应用场景
21. 简述Kafka的架构
22. 简述Kafka 之所以具有高速的读写性能,主要有哪几个原因
23. 简述Kafka的分区和副本机制
24. 简述kafka中生产者数据分发策略
生产者产生的消息,是如何保存到具体分区上的
JAVA中的轮询分发策略 和 粘性分发策略介绍
25. 简述消息存储机制和查询机制
消息存储机制
查询机制:消费者在消费的时候,是如何找到对应offset偏移量的消息的
26. Kafka消费者的负载均衡机制
27. Kafka如何保证数据不丢失
生产者保证数据不丢失:
Broker端如何保证数据不丢失:
消费端如何保证数据不丢失:
28.Kafka中消费者如何对数据仅且只消费一次?
29. 结构化流中Sink输出模式有哪几类,各自特点是什么?
30. 结构化流中Sink输出终端常见的有哪几类,各自特点是什么?
31. 结构化流如何处理延迟到来的数据?
32. 处理小文件的操作
1. 简述什么是Spark?
spark是一款大数据统一分析引擎,底层数据结构是RDD
2. 简述Spark的四大特点
速度快(线程,基于内存的rdd,高效api)
易用性(多种语言python,java,scala等)
通用性(有sparksql,mlib等安装包)
兼容性(适配不同的资源调度,存储工具,可运行在多个系统中)
3. 简述Spark比Mapreduce执行效率高的原因
mapreduce是基于进程执行的,消耗资源多运算慢;
使用磁盘进行计算,反复IO读写效率低下;
API较为原始低级,实现复杂的API需要写很复杂的代码;
Spark基于线程做数据处理,创建时所需要的资源更少,运行速度更快;
引入了新的数据结构-RDD弹性分布式数据集,使得Spark可以基于内存进行数据处理,读写速度相较于磁盘更快
Spark提供了更丰富的编程API,能够轻松的实现功能开发;
4. 简述Spark on Yarn的两种部署模式的区别和特点
两种方式分别是client客户端模式和cluster集群模式
两种方法的本质区别是driver进程运行的地方不一样
Client部署方式:Driver进程运行在你提交程序的那台机器上
优点是日志和运行结果都输出到了提交的那台机器上,方便查看结果
缺点是Driver进程和Yarn集群可能不在同一个集群中,会导致Driver进程和Excutor进程间进行数据交换的时候,效率较低
场景一般在开发和测试环境中使用
Cluster部署方式:Driver进程运行在集群中某台节点上
优点是Driver进程和Yarn集群在同一个集群中,Driver进程和Excutor进程间进行数据交换的时候效率比较高
缺点是查看日志与运行结果需要在18080或者8088的页面中进行查看
一般在生产环境中使用
5. Spark底层工作原理是怎样的
DAGScheduler:DAG调度器,将job任务形成DAG有向无环图和划分Stage阶段;
TaskScheduler:Task调度器,将Task线程分配给具体的Executor执行;
以client on spark 为例
1. 提交spark程序,在哪里提交程序,就在哪里启动Driver进程;
2. 由于Driver进程是java与scala语言编写的,无法直接执行python代码,所以需要将创建SparkContext对象的代码基于PY4J转为java后再创建对象;
2.1 Driver进程启动后,底层PY4J创建SparkContext顶级对象,同时还会创建DAGscheduler和TaskSchduler;
3. Driver连接Master,根据资源的配置,向master申请资源来创建Executor
4. Master接收到资源申请,进行资源分配,分配的原则采用FIFO先进先出规则,制定资源分配方案并返回给Driver;
5. Driver连接到对应的worker从节点上,占用相应的资源,通知worker启动Excutor,启动后将信息反向注册回Driver;
6. Driver开始处理代码
6.1 Driver加载RDD相关的算子,根据算子间的依赖绘制DAG有向无环图和划分Stage阶段(一个Spark应用程序遇到Action算子后,就会触发一个Job任务的产生,Job任务会将它所依赖的所有算子都加载进来,形成一个Stage; 接着从Action算子从后往前进行回溯,遇到窄依赖就将算子放在同一个Stage当中; 如果遇到宽依赖,就经历shuffle阶段,划分形成新的Stage,最后一直回溯完成)
6.2 之后Driver需要确定任务分配给哪些Excutor进行执行,首先确定每个Stage阶段有多少个Task线程,将众多的Task线程放到Taskset集合中,DAG调度器将TaskSet集合给到Task调度器,Task调度器拿到Taskset集合以后,将Task分配给到具体Executor执行,底层是基于SchedulerBackend调度队列来实现的,
6.3 Driver通知对应的Executor进程来执行相应的任务
6.4 Executor开始执行具体的任务,因执行的是python函数,因此会调用服务器上的python解释器,将py函数和输入数据传输到python解释器,执行完后会将数据返回给Executor进程;
6.5 Executor在运行过程中,会判断是否需要将结果返回给Driver进程,如果需要就返回,如不需要就直接输出;
6.6 Driver会定时检查多个Executor的执行状态,直到所有的Executor执行完成,就认为任务运行结束;
7. Driver调用sparkContext.stop()代码,通知Master回收资源,整个程序运行结束;
6. RDD算子分成了哪几类,各自的特点是什么?
分成两类: Transformation算子和Action算子
Transformation算子:返回值是一个新的RDD;该算子运行后会不会立即执行,需要配合Action算子触发。
Action算子:返回值是None或者非RDD数据类型;算子运行后会立即执行,并且把之前的Transformation算子一并运行。
7. RDD的五大特性和五大特点
五大特性:
1. RDD由一系列分区组成
2. RDD计算相当于对RDD每个分区做计算
3. RDD之间有宽窄依赖;
4. KeyValue型RDD可以自定义分区;
5. 尽量让计算程序靠近数据源,移动数据不如移动计算程序;
五大特点:
1. 分区:RDD的分区是逻辑上的分区,并不是直接对数据进行分区操作,因为RDD本身不存储数据;
2. 只读:RDD只读的,对其进行增加改变本质都是创建新的RDD
3. 依赖: 存在宽窄依赖
4. 缓存:如果在程序中多次使用同一个RDD可以将其缓存起来,该RDD只有第一次计算时会根据血缘关系得到分区的数据.
5. checkpoint检查点: 与缓存类似,但可以持久化保存.
8. RDD中的重分区算子,以及各自特点?
重分区算子有:reparation , coalesce , Partitions by
repartition:调整RDD的分区数,得到一个新RDD,既可以增大也可以减小分区数,但都会触发shuffle.
coalesce: 默认只能减小分区,减小的过程中不会触发shuffle,如果将参数2的shuffle改为True也可以增大分区,但会触发shuffle
partitions by: 主要是针对kv类型的RDD进行重分区操作,可以增大也可以减少,但都会shuffle,用户自定义函数fn来指定分区方案.
9. mapPartitions和foreachPartitions分区算子,相对map和foreach有什么优点?
优点:每次都对一整个分区进行操作,减少分区调用操作的次数,减少资源消耗,而且可以对分区内数据批量操作,提高效率.适用于文件的打开和关闭、数据库的连接和关闭等有反复消耗资源的操作.
10. 简述Spark持久化中缓存和checkpotin检查点的区别
区别有4点:
1. 主要作用
缓存是为了提升Spark程序运算效率.
检查点是为了提升Spark程序容错性.
2. 存储位置
缓存存储在内存或磁盘中,或Executor的堆外内存中.
检查点存储在磁盘或Hdfs中
3. 生命周期
缓存在程序结束或手动调用unpersist后被删除
检查点可永久保存在HDFS上,除非手动删除
4. 血缘关系
缓存不会切断RDD的血缘关系,因为缓存是不稳定的,如果发生故障,可以从头运行RDD
检查点会切断RDD的血缘关系,因为保存在安全的HDFS上,认为不会丢失
一同使用时先设置缓存再设置检查点,可以减少一次IO过程
11. 简述DAG和Stage形成过程
在spark的运行过程中,遇到了一个action算子会生成一个job
一个job将action算子和其依赖的其他算子聚合起来形成一个stage
然后会向前回溯rdd算子,如果没有shuffle阶段(产生宽依赖)就把他们
放在一个stage里面,如果有shuffle发生(产生宽依赖)就会划分一个新的
stage放置这些算子
12. 简述Job调度流程
1- Driver进程启动后,底层PY4J创建SparkContext顶级对象。在创建该对象的过程中,还会创建另外两个对象,分别是: DAGScheduler和TaskScheduler
DAGScheduler: DAG调度器。将Job任务形成DAG有向无环图和划分Stage的阶段
TaskScheduler: Task调度器。将Task线程分配给到具体的Executor执行2- 一个Spark程序遇到一个Action算子就会触发产生一个Job任务。SparkContext将Job任务给到DAG调度器,拿到Job任务后,会将Job任务形成DAG有向无环图和划分Stage的阶段。并且会确定每个Stage阶段有多少个Task线程,会将众多的Task线程放到TaskSet的集合中。DAG调度器将TaskSet集合给到Task调度器
3- Task调度器拿到TaskSet集合以后,将Task分配给到给到具体的Executor执行。底层是基于SchedulerBackend调度队列来实现的。
4- Executor开始执行任务。并且Driver会监控各个Executor的执行状态,直到所有的Executor执行完成,就认为任务运行结束
13. 简述SparkSQL和Hive的对比
1. hive只能写SQL,spark既可以写SQL又可以写代码
2.hive有元数据存储metastore,spark需要手动维护元数据
3. hive的运行基于磁盘,spark的运行基于内存
4. hive的计算引擎是MapReduce,spark基于spark RD
相同点:
1都是大数据分布式处理架构
2都是处理结构化数据
3都可以用yarn做集群资源调度
14. 创建得到DataFrame的方式有哪些,适用于什么场景?
1.通过RDD得到一个DataFrame
场景:RDD可以存储任意结构的数据;而DataFrame只能处理二维表数据。在使用Spark处理数据的初期,可能输入进来的数据是半结构化或者是非结构化的数据,那么我可以先通过RDD对数据进行ETL处理成结构化数据,再使用开发效率高的SparkSQL来对后续数据进行处理分析。
2.内部初始化数据得到DataFrame
场景:一般用在开发和测试中。因为只能处理少量的数据
3. 读取外部文件得到DataFrame
text , json , csv
csv常设置的参数有pathheadersepinferschemaencoding等
15. SparkSQL中数据清洗的API有哪些,各自作用是什么?
Drop duplicate 去重: 没有指定subset就默认一整行完全一样才会删除,指定了就将范围限定到指定字段.
dropna去空 : 没有指定参数,那只要有一个是null的那整行就被删除, 可以指定参数thresh,只有当null数量大于了thresh数才会删除整行.
fillna填充替换 :DF.fillna(value={“name”:”未知姓名”,”age”:100}).show() ,value必须传递参数,用于填充缺失值,subset限定缺失值替换范围。如果不是字典,那么只会替换字段类型匹配的空值,最常用的是value传递字典的形式。
16. 设置SparkSQL的shuffle分区数的方式有哪几种?
shuffle分区数量默认200个,手动调整的方式如下:
1. 全局设置: spark.sql.shuffle.partitions 数量
2. 动态分区:在客户端通过submit命令提交的时候,动态设置shuffle分区数量,部署上线时,基于spark-submit提交运行的时候:
.服务器托管网/spark-submit –conf”spark.sql.shuffle.partition=数量”
3. 写死 : SparkSession.conf.set(‘spark.sql.shuffle.partitions’,数量)
17. 简述基于Pandas实现UDF和UDAF函数的步骤?
sparksql原生只能udf,借助第三方工具可以实现udaf
UDF步骤:
1.创建python自定义函数
要求输入类型和返回值类型都必须是pandas中的series类型
2.注册进SparkSQL
方式一:udf对象 = spark.udf.register(参数1,参数2)
方式二:udf对象 = F.pandas_udf(参数1,参数2)
方式三:语法糖装饰器写法
@F.pandas_udf(returnTyep = 返回值SparkSQL的数据类型)放置到对应的python函数上
3.在代码中使用
UDAF步骤:
1.创建python自定义函数
要求输入参数类型是pandas中的Series对象,返回python中的标量数据类型
2.注册进SparkSQL
方式一: udf对象= spark.udf.resiter(参数1,参数2)
方式二: udf对象 = F.pandas_udf(参数1,参数2)
方式三: 语法糖装饰器写法
@ F.pandas_udf(returnType= 返回值sparksql的数据类型) 放置到对应的python函数上
3.在代码中使用
18. 简述SParkSQL函数的分类
1. UDF 一进一出,split等
2. UDAF 多进一出, 聚合函数 count,sum等
3. UDTF 一进多出, 表生成函数, explode炸裂函数等
19. 简述SparkSQL底层工作流程
如何将sparksql翻译成rdd的,基于catalys优化器来实施
1. 当catalys接收到客户端的代码,会先校验语法,通过后会根据执行顺序,生成未解析的逻辑计划(ats抽象语法树)
2. 对于ats抽象语法树加入元数据信息,确定一共涉及到哪些字段,字段的类型是什么,以及表其他相关元数据信息,加入元数据信息后,得到了未优化的逻辑计划.
3. 对未优化的逻辑计划执行优化操作,优化是通过优化器来执行的,在优化器匹配相对应的优化规则,Sparksql底层提供了一两百个优化规则,如:
谓词下推:也叫作断言下推,将数据过滤操作提前到数据扫描的时候进行,减少后续操作的数据量,提升效率;
列式裁剪:不加载与数据分析无关的字段,减少后续处理的数据量,提升效率;
4. 由于优化规则很多,导致会得到多个优化的逻辑计划,在转换为物理执行计划的过程中,会根据成本模型(运行耗时,资源消耗等)得到一个最优的物理执行计划;
5. 将物理执行计划通过code generation(代码生成器),转换成Spark RDD的代码;
6. 最后就是将SparkRDD代码部署到集群上运行
20. 简述消息队列的应用场景
1. 应用解耦合
2. 异步处理
3. 限流削峰
4. 消息驱动系统
21. 简述Kafka的架构
架构中的角色:
1. Producer生产者:负责将信息/数据发送到kafka中
2. consumer消费者:负责将信息从kafka中取出
3. broker : kafka集群中的节点,节点与节点之间没有主从之分
4. topic : 主题,是业务层面对消息进行分类的,一个topic可以有多个分区,分区数量没有限制
5. partiton分区: 一个分区可以有多个副本,副本的数量不超过broker集群的数量
6. leader 主副本: leader主副本会主动将信息副本发送到follwer从副本上
7. Follower从副本: Follower从副本被动接收Leader主副本发送过来的信息副本
8. Zookeeper:用来管理Kafka集群,管理信息的元数据
9. ISR同步列表: 存储和Leader主副本信息差距最小的一个副本,当leader主副本无法对外提供服务的时候,会从该ISR列表中选择一个Follwer从副本变成Leader主副本,对外提供服务
22. 简述Kafka 之所以具有高速的读写性能,主要有哪几个原因
1. 顺序写入磁盘 相比于随机写入,顺序写入磁盘的性能更高,减少了磁盘寻址时间,减少了IO操作的次数,提高了写入的速度
2. 零拷贝技术 直接从数据源进行读取,不用经过中间的数据拷贝过程,减少了网络IO和磁盘IO等过程,提升了效率
3. 分布式架构 分布式的架构实现了扩展与负载均衡,提高了整体的吞吐量和并发处理能力
23. 简述Kafka的分区和副本机制
分区机制:
1- 避免单台服务器容量的限制,分多个区可以避免单个分区的数据过大导致服务器无法存储
2- 提升topic的吞吐量,数据读写速度,利用多台服务器的数据读写能力,网络等资源
3- 分区的数量没有限制,但尽量不要超过kafka集群中broker节点个数的3倍
副本机制:
1-通过多副本机制,提升数据的安全性,但副本过多会导致冗余过多
2- 副本数量有限制,不可超过kafka集群中broker节点个数,推荐分区的副本数量为1-3个
24. 简述kafka中生产者数据分发策略
生产者产生的消息,是如何保存到具体分区上的
1- 随机分发策略
将消息发到到随机的某个分区上,还是发送到Leader主副本上。Python支持,Java不支持
当在发送数据的时候, 如果只传递了topic 和 value,没有指定key的时候, 那么此时就采用随机策略
2- 指定分区策略
将消息发到指定的分区上面。Python支持,Java支持
当在发送数据的时候, 如果指定了partition参数, 表示的采用指定分区的方案, 分区的编号从0开始
当指定了partition的参数后, 与DefaultPartitioner没有任何的关系
3- Hash取模策略
对消息的key先取Hash值,再和分区数取模。Python支持,Java支持
当在发送数据的时候, 如果传递了topic 和 value 以及key的时候, 那么此时就是采用hash取模策略
注意: 相同key的返回的hash值是一致的, 同样对应分区也是同一个。也就是要注意数据倾斜的问题
4- 轮询策略
在kafka的2.4及以上版本,已经更名为粘性分发策略,python不支持,java支持
5- 自定义分发策略
Python支持,Java支持
参考源代码DefaultPartitioner模仿写即可
JAVA中的轮询分发策略 和 粘性分发策略介绍
1- 轮询分发策略:kafka老版本的策略,当生产数据的时候,只有value但是没有key的时候,采用轮询
优点: 可以保证每个分区拿到的数据基本是一样,因为是一个一个的轮询的分发
缺点: 如果采用异步发送方式,意味着一批数据发送到broker端,由于是轮询策略,会将这一批数据拆分为多个小的批次,分别再写入到不同的分区里面去,写入进去以后,每个分区都会给予响应,会影响写入效率。
2- 粘性分发策略: kafka2.4版本及以上的策略,当生产数据的时候,只有value但是没有key的时候,采用粘性分发策略
优点: 在发送数据的时候,首先会随机的选取一个分区,然后尽可能将数据分发到这个分区上面去,也就是尽可能粘着这个分区。该分发方式,
在异步发送的操作中,效率比较高。
缺点: 在数据发送特别快的时候,可能会导致某个分区的数据比其他分区数据多很多,造成大量的数据集中在一个分区上面
25. 简述消息存储机制和查询机制
消息存储机制
Kafka集群中的消息存储在一组称为“分区”的逻辑日志上。每个主题可以分成多个分区,每个分区都有一个唯一的标识符和一组不断增加的有序消息。这些分区可以分布在不同的Kafka节点上,以实现负载均衡和可伸缩性。
在每个Kafka节点上,每个分区都被存储为一个或多个文件(称为日志段),这些文件包含了该分区的所有消息。当消息被写入时,它们会被追加到最后一个日志段。当日志段达到一定大小(通过broker端参数log.segment.bytes进行配置)或时间(通过broker端参数log.segment.ms进行配置)时,将会创建一个新的日志段,原来的日志段将会被关闭。这种设计使得Kafka能够高效地追加消息,并且可以轻松地删除旧数据,同时保证消息的持久性和可靠性。此外,Kafka还提供了复制机制来确保数据的容错性和高可用性。
1-xx.log和xx.index它们的作用是什么? 答: xx.log: 称之为segment片段文件,也就是一个Partition分区的数据,会被分成多个segment(log)片段文件进行存储。 xx.index: 称之为索引文件,该文件的作用是用来加快对xx.log文件内容检索的速度
2-xx.log和xx.index文件名称的意义? 答: 这个数字是xx.log文件中第一条消息的offset(偏移量)。offset偏移量从0开始编号。
3-为什么一个Partition分区的数据要分成多个xx.log(segment片段文件)文件进行存储? 答: 1- 如果一个文件的数据量过大,打开和关闭文件都非常消耗资源 2- 在一个大的文件中,检索内容也会非常消耗资源 3- Kafka只是用来临时存储消息数据。会定时将过期数据删除。如果数据放在一个文件中,删除的效率低;如果数据分成了多个segment片段文件进行存储,删除的时候只需要判断segment文件最后修改时间,如果超过了保留时间,就直接将整个segment文件删除。该保留时间是通过serve服务器托管网r.properties文件中的log.retention.hours=168进行设置,默认保留168小时(7天)
查询机制:消费者在消费的时候,是如何找到对应offset偏移量的消息的
查询步骤:
1- 首先先确定要读取哪个xx.log(segment片段)文件。368776该offset的消息在368769.log文件中
2- 查询xx.log对应的xx.index,查询该条消息的物理偏移量范围
3- 根据消息的物理偏移量范围去读取xx.log文件(底层是基于磁盘的顺序读取)
4- 最终就获取到了具体的消息内容
26. Kafka消费者的负载均衡机制
Kafka集群中每分钟新产生400条数据,下游的一个消费者每分钟能够处理400条数据。
随着业务发展,Kafka集群中每分钟新产生1200条数据,下游的一个消费者每分钟能够处理400条数据。 答:会导致broker中积压的消息条数越来越多,造成消息处理不及时。可以增加消费者数量,并且将这些消费者放到同一个消费组当中
随着业务发展,Kafka集群中每分钟新产生1600条数据,下游的一个消费者每分钟能够处理400条数据。 答:会导致broker中积压的消息条数越来越多,造成消息处理不及时。再增加消费组中消费者的个数已经无法解决问题。
如何解决: 1- 增加消费组中消费者的个数 2- 提高下游消费者对消息的处理效率
Kafka消费者的负载均衡机制
1- 在同一个消费组中,消费者的个数最多不能超过Topic的分区数。如果超过了,就会有一些消费者处于闲置状态,消费不到任何数据。
2- 在同一个消费组中,一个Topic中一个分区的数据,只能被同个消费组中的一个消费者所消费,不能被同个消费组中多个消费者所消费。但是一个消费组内的一个消费者可以消费多个分区的数据。也就是分区和消费者的对应关系,多对一
3- 不同的消费组中的消费者,可以对一个Topic的数据同时消费,也就是不同消费组间没有任何关系。也就是Topic的数据能够被多个消费组中的消费者重复消费。
补充:
查看消费组中有多少个消费者,用来避免消费者个数超过分区个数。
./kafka-consumer-groups.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --group g_1 --members --describe
27. Kafka如何保证数据不丢失
生产者保证数据不丢失:
生产者端将消息发给Kafka集群后,broker要给生产者响应信息,响应原理就是ack机制
ack机制有三个参数,分别是 0,1,-1
0:生产给到集群,生产者不等待不接收broker返回的响应信息
1: 生产给到集群,集群中的分区对应leader主副本所在的broker给生产者返回响应信息
-1: 生产给到集群,集群中的分区对应的所有副本给生产者返回响应信息
效率级别 0>1>-1
安全级别 -1>1>0
根据安全和效率的要求选择ack参数配置
Broker端如何保证数据不丢失:
broker通过多副本机制保证数据不丢失,同时需要生产者将ack设置为-1,安全级别最高
消费端如何保证数据不丢失:
消费者消费消息的步骤:
1- 消费者首先连接到kafka集群中,进行消息消费
2- Kafka集群接收到消费者的请求后,会根据消费组id,查找上次消费消息对应的offset偏移量
3- 如果没有查找到offset,消费者默认从topic最新的地方开始消费
4- 如果有查找到offset,会从上次消费到的offset地方继续进行消费
4.1- 首先先确定要读取的这个偏移量在哪个segment文件当中
4.2- 查询这个segment文件对应的index文件,根据offset确定这个消息在log文件的什么位置,也就是确定消息的物理偏移量
4.3- 读取log文件,查询对应范围内的数据即可
4.4- 获取最终的消息数据
5- 消费者在消费的过程中,底层有个线程会定时的将消费的offset提交给到kafka集群,kafka集群会更新对应的offset的值;
28.Kafka中消费者如何对数据仅且只消费一次?
1- 将消费者的 enable.auto.commit 属性设置为 false,并手动管理消费者的偏移量。这样可以确保消费者在处理完所有消息后才更新偏移量,避免重复消费数据。也就是将消息的消费、消息业务处理代码、offset提交代码放在同一个事务当中。
2- 使用幂等生产者或事务性生产者来确保消息只被发送一次。这样可以避免重复发送消息,从而避免消费者重复消费数据。
3- 在消息中加入唯一的ID
29. 结构化流中Sink输出模式有哪几类,各自特点是什么?
输出模式有三种:append , complete, update
append模式:
只支持追加,不支持聚合和排序,每次只打印追加的内容
适用于对数据进行累加计算的场景
complete模式:
每一次都是全量处理,因为数据量大,所以必须聚合,也可以支持排序
适用于需要获取完整数据集的场景,不适用无限数据流的场景
update模式:
就是支持聚合的append,有聚合操作,只会输出有变化和新增的内容,不支持排序
适用场景需要跟踪数据的变化,实时监控指标的更新等
30. 结构化流中Sink输出终端常见的有哪几类,各自特点是什么?
1- console sink: 将结果数据输出到控制台。主要是用在测试中,并且支持3种输出模式
2- File sink: 输出到文件。将结果数据输出到某个目录下,形成文件数据。只支持append模式
3- foreach sink 和 foreachBatch sink: 将数据进行遍历处理。遍历后输出到哪里,取决于自定义函数。并且支持3种输出模式
4- memory sink: 将结果数据输出到内存中。主要目的是进行再次的迭代处理。数据大小不能过大。支持append模式和complete模式
5- Kafka sink: 将结果数据输出到Kafka中。类似于Kafka中的生产者角色。并且支持3种输出模式
31. 结构化流如何处理延迟到来的数据?
Spark结构化流可以通过设置水印(watermark)来处理延迟到来的数据。
水印是一种基于时间的度量,表示数据流中已经处理的最新时间。可以将水印理解为一个延迟阈值,表示在当前时间点之前的所有数据都已经到达,而在此之后的数据可能还未到达。Spark结构化流会根据水印来判断哪些数据已经过期,从而进行数据清理和聚合操作。
具体来说,可以通过以下步骤来设置水印:
定义事件时间字段:在创建数据流时,需要指定事件时间字段,即数据中表示事件时间的列。
设置水印生成规则:使用
withWatermark()
方法来设置水印生成规则,该方法需要指定一个时间间隔作为水印的生成周期。处理延迟数据:在数据处理过程中,可以使用
window()
方法来对数据进行窗口操作,同时使用trigger()
方法来设置触发器,以便在水印到达窗口结束时间后触发数据处理。
32. 处理小文件的操作
常规处理小文件的办法:
1- 大数据框架提供的现有的工具或者命令
1.1- hadoop fs -getmerge /input/small_files/*.txt /output/merged_file.txt
1.2- hadoop archive -archiveName myhar.har -p /small_files /big_files2- 可以通过编写自定义的代码,将小文件读取进来,在代码中输出的时候,输出形成大的文件
wholeTextFiles: 读取小文件。
1-支持本地文件系统和HDFS文件系统。参数minPartitions指定最小的分区数。
2-通过该方式读取文件,会尽可能使用少的分区数,可能会将多个小文件的数据放到同一个分区中进行处理。
3-一个文件要完整的存放在一个元组中,也就是不能将一个文件分成多个进行读取。文件是不可分割的。
4-RDD分区数量既受到minPartitions参数的影响,同时受到小文件个数的影响
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net