1、RDD
Resilient Distributed Dataset (RDD),弹性分布式数据集
弹性是指什么?
1、内存的弹性:内存与磁盘的自动切换
2、容错的弹性:数据丢失可以自动恢复
3、计算的弹性:计算出错重试机制
4、分片的弹性:根据需要重新分片
分布式
就是RDD中的计算逻辑根据分区划分Task发送Executor(不同节点)执行
数据集
RDD中是划分了分区了,有数据的引用,不是数据的存储
主要特性
- A list of partitions 分区
- A function for computing each split 每个切片有一个计算
- A list of dependencies on other RDDs RDD之间是相互依赖的
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) key/value形式的有分区器
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 有优选的位置去计算分片
2、RDD依赖
血缘
package com.journey.core.wc;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
// 如果是集群上运行,直接就是hdfs路径了
JavaRDD lineRDD = sc.textFile("datas/wc", 4);
System.out.println(lineRDD.toDebugString());
System.out.println("*************************************");
JavaRDD wordsRDD = lineRDD.flatMap(new FlatMapFunction() {
@Override
public Iterator call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});
System.out.println(wordsRDD.toDebugString());
System.out.println("*************************************");
JavaPairRDD wordToPairRDD = wordsRDD.mapToPair(new PairFunction() {
@Override
public Tuple2 call(String word) throws Exception {
return Tuple2.apply(word, 1);
}
});
System.out.println(wordToPairRDD.toDebugString());
System.out.println("*************************************");
JavaPairRDD word2CountRDD = wordToPairRDD.reduceByKey(new Function2() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println(word2CountRDD.toDebugString());
System.out.println("*************************************");
List> result = word2CountRDD.collect();
System.out.println(result);
sc.stop();
}
}
输出结果 :
(4) datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
| datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
| datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
| datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 []
| MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
| datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
| datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) ShuffledRDD[4] at reduceByKey at WordCount.java:47 []
+-(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 []
| MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
| datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
| datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
[(Spark,2), (Hello,4), (World,1), (Mayun,1)]
前面4是指分区
待继续。。。。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
相关推荐: 使用开源时序数据库 GreptimeDB 存储开源实时监控 HertzBeat 的指标度量数据
使用开源时序数据库 GreptimeDB 存储开源实时监控 HertzBeat 的指标度量数据 什么是 GreptimeDB GreptimeDB 是一款开源、分布式、云原生时序数据库,融合时序数据处理和分析。 完善的生态系统,支持大量开放协议,与 MySQL…