全面整理大数据分析之技术框架
storm是一个实时的计算框架,只负责计算,不负责存储。今天,我们就来整理这些大数据分析之技术框架,想要了解的小伙伴,可以参考一下。文章里面有细节的内容,还望各位认真阅读哦!
大数据离线部分
HDFS
1:HDFS的架构部分及工作原理
NameNode:负责管理元素据,将信息保存在内存中
DataNode:保存数据,以块的形式保存。启动后需要定时的向NameNode发送心跳,报告自身存储的块信息
2:HDFS的上传过程
3:HDFS的下载
4:NameNode的元数据安全机制
以记日志的形式将每一个操作写在磁盘的日志文件中,然后借助SecondaryNameNode的checkpoint功能将fsImage和日志进行合并。
重点:记住checkpoint工作过程
5:如果服务器的磁盘坏了,如何挽救数据?
配置多个dfs.namenode.name.dir路径为本地磁盘路径和nfs网络磁盘路径。
6:hdfs集群中,受到拓展瓶颈的是NameNode还是Datanode?
是NameNode,因为DataNode不够可以很方便的水平拓展,而工作的NameNode只有一个,他的存储能力完全取决于他的内存,所以。。。。,
但是其实NameNode一般不会成为瓶颈,因为一个块记录的元数据信息大小约为150B,如果每一个块大小为128M的话,那么15G的NameNode内存可以存储12PB的数据。
7:datanode明明已启动,但是集群中的可用datanode列表中就是没有,怎么办?
已经不是处女,在她的Data目录下,已经有其他NameNode的标记,这个NameNode不认。
8:文件下载到window中,为什么会报错?
默认使用操作系统的内核进行磁盘数据的写入,也就是需要一个winutil的工具,而默认的安装包中不提供,所以需要编译源码或者设置为使用java的进行磁盘写入。
9:Hadoop的HA(高可用)
MapReduce
1:MapReduce中,fileinputformat->map->shuffle->reduce的过程
2:MapReduce中,job提交的过程
3:自定义Javabean作为数据,需要extendswritableandCompareble接口。
4:自定义outputformat,进行不同方向的处理。
5:MapReduce的一些应用场景
1、排序并且求TOPOne和TOPN
2、求某个用户前几个月的总流量,并且选择出流量前几名的用户。
3、reduce端的join
4、map端join
5、求共同好友问题
hive
1:什么是hive?
一个将sql转化为MapReduce程序的、单机版的、数据仓库工具。通过关系型数据库(mysql等)来记录表元数据信息。真正的数据在HDFS中。
Hive利用HDFS存储数据,利用MapReduce查询分析数据
hive2.0版本之后,都是基于Spark处理了。
安装的时候,需要注意jline的版本冲突。
2:如何启动?
3:执行的sql的形式
hiveshell、hive-e“sql命令”、hive-f“一个包含着很多SQL语句的文件”
4:hive的创建表操作
内部表、外部表就差连个关键字(external和location)
分区表、分桶表
5:hive查询表
join
动态分区
分组查询
复杂的那个累计报表操作。
6:hive自定义函数(UDF)
sqoop
利用hadoop的map端进行数据的并行导入导出。
安装在HDFS上,配置HDFS的路径和Hive路径即可。
flume
1:agent:sources、channel、sinks
2:sources:exec、spooldir、arvo(加一个拦截器)
3:channel:men、disk
4:sinks:arvo、HDFS、kafka
5:flume安装在数据源这一边。
6:如何自定义拦截器?
classmyiterceptorimplementsIterceptor
//里面有一个静态的公共内部类。
publicstaticclassmybuilderimplementsIterceptor.Builder
7:如何实现flume的多级连接,以及如何实现高可用?
大数据实时storm部分
storm
#p#分页标题#e#
1:storm是一个实时的计算框架,只负责计算,不负责存储。它通过spout的open和nextTuple方法去外部存储系统(kafka)获取数据,然后传送给后续的bolt处理,
bolt利用prepare和execute方法处理完成后,继续往后续的bolt发送,或者根据输出目录,把信息写到指定的外部存储系统中。
2:storm的数据不丢失原理
交叉收到的数据做异或元算中间结果不为0的原理。
3:设置spout_max_pending(可以限流)
4:jstorm的通信机制,每一个:worker都有一个接受线程和输出线程
5:storm的架构分析
nimbus、zookeeper、supervisor、worker
nimbus:接受任务请求,并且进行任务的分发,最后写入到zookeeper中。
supervisor:接受nimbus的任务调度,然后启动和管理属于自己的worker进程,supervisor是可以快速失败的,不影响任务的执行。
我们可以写一个脚本来监控supervisor的进程,如果不存在了,立马启动,就可以了。
worker:启动spoutTask、boltTask等等任务,去执行业务逻辑。
6:storm的编程模型
topology:由spout和bolt组成的一个流程图。他描述着本次任务的信息
spout:
open
nexttuple
declareOutputFields
bolt:
prepare
execute
declareOutputFields
6:storm的tuple结构,它里面有两个数据结构,一个list、一个是map
list:记录着信息
map:记录着每个字段对应的下表,通过找到下边再去上面的list中找数据。
7:storm任务提交的过程
批处理系统
批处理在大数据世界有着悠久的历史。批处理主要操作大容量静态数据集,并在计算过程完成后返回结果。
批处理模式中使用的数据集通常符合下列特征…
·有界:批处理数据集代表数据的有限集合
·持久:数据通常始终存储在某种类型的持久存储位置中
·大量:批处理操作通常是处理极为海量数据集的唯一方法
批处理非常适合需要访问全套记录才能完成的计算工作。例如在计算总数和平均数时,必须将数据集作为一个整体加以处理,而不能将其视作多条记录的集合。这些操作要求在计算进行过程中数据维持自己的状态。
需要处理大量数据的任务通常最适合用批处理操作进行处理。无论直接从持久存储设备处理数据集,或首先将数据集载入内存,批处理系统在设计过程中就充分考虑了数据的量,可提供充足的处理资源。由于批处理在应对大量持久数据方面的表现极为出色,因此经常被用于对历史数据进行分析。
大量数据的处理需要付出大量时间,因此批处理不适合对处理时间要求较高的场合。
ApacheHadoop
ApacheHadoop是一种专用于批处理的处理框架。Hadoop是首个在开源社区获得极大关注的大数据框架。基于谷歌有关海量数据处理所发表的多篇论文与经验的Hadoop重新实现了相关算法和组件堆栈,让大规模批处理技术变得更易用。
新版Hadoop包含多个组件,即多个层,通过配合使用可处理批数据:
·HDFS:HDFS是一种分布式文件系统层,可对集群节点间的存储和复制进行协调。HDFS确保了无法避免的节点故障发生后数据依然可用,可将其用作数据来源,可用于存储中间态的处理结果,并可存储计算的最终结果。
·YARN:YARN是YetAnotherResourceNegotiator(另一个资源管理器)的缩写,可充当Hadoop堆栈的集群协调组件。该组件负责协调并管理底层资源和调度作业的运行。通过充当集群资源的接口,YARN使得用户能在Hadoop集群中使用比以往的迭代方式运行更多类型的工作负载。
·MapReduce:MapReduce是Hadoop的原生批处理引擎。
批处理模式
Hadoop的处理功能来自MapReduce引擎。MapReduce的处理技术符合使用键值对的map、shuffle、reduce算法要求。基本处理过程包括:
·从HDFS文件系统读取数据集
·将数据集拆分成小块并分配给所有可用节点
·针对每个节点上的数据子集进行计算(计算的中间态结果会重新写入HDFS)
·重新分配中间态结果并按照键进行分组
·通过对每个节点计算的结果进行汇总和组合对每个键的值进行“Reducing”
·将计算而来的最终结果重新写入HDFS
kafka
1、kafka和jms的区别
2、kafka的topic理解
#p#分页标题#e#
topic是逻辑存在的,真正在物理磁盘中的体现是partitioner,一个topic可以对应多个partition,不同的paritition存放在不同的broker中,以提高并发存储能力。
3、partitioner
partition是topic信息在屋里存储中的具体体现,在磁盘中它是一个文件夹,名字是topic名字_partition编号。4、segment
每个partition对对应多个segment文件,默认大小是1G,为了快速定位到指定的offset位置。
5、kafka为什么这么快
1/使用了操作系统使用的pagecache缓存,缓存大,缓存到一定量的数据时,以顺序写入的方式写入到磁盘中。
因为:磁盘顺序写入的方式非常的快=>600MB/s,而随机存储只有100kb/s左右。
2/使用操作系统的sendfile技术。在读取信息发送的时候,不需要经过用户区,而是在os端直接发送,可以减少很多步骤。
6、为什么要多个partitioner7、为什么每个partitioner需要切分为多个segment文件
8、kafka的HA
对partitioner分区进行备份,利用zookeeper的选举机制选择leader。数据的生产存储和消费读取都是有leader负责,其他的replicatition只是负责备份而已。
9、kafka如何用shell脚本来讲一个文件读写进去?10、kafka如何用JavaAPI实现生产者和消费者?
大数据一站式解决方案:Scala和Spark部分
scala回顾
1、如何定义变量
2、如何定义函数、方法,如何在将函数作为方法的参数传入进去?
3、条件判断语句,循环控制语句
4、集合操作:Array、list、set、tuple、map(注意:可变和不可变的区别)5、样例类的使用6、trit、抽象类的使用7、主构造器和辅助构造器的使用
8、scala的高级特性
高阶函数:作为值得函数、匿名函数、闭包、柯里化
隐式转换:一个类对象中,如果他没有摸一个功能,但是我们有想要它实现,可以使用英式转换的方式。
objectMyPredef{
//定义隐式转换方法
implicitdeffileReadToRichFile(file:File)=newRichFile(file)
}
使用:
importMyPredef._9、Actor
写起来像多线程,用起来像socket10、akka
ActorSystem.actorOf()创建一个Actor,
创建的同时,就是执行Actor中的prestart方法,去初始化一些信息。
SparkRDD
1、SparkRDD叫做:弹性分布式数据集,其实就是一个类,用来描述:任务的数据从哪里读取、用那个算进行计算、得到的结果有存放在哪里、RDD之间的依赖关系是款以来还是窄依赖
2、RDD有五个特点
一系列分区
每个算子作用在每个分区上
一系列依赖关系
最有位置(如果从HDFS上读取数据)
3、RDD的两种算子Transformation和Action
Transformation是懒加载,只是定义了这个算子的任务,该如何做,但是还没有做。
Action是立即执行,当执行到Action时,会触发DAGSchudle切分stage,切分完成后,有TaskScheduler将任务通过DriverActor发送到executor中执行。
4、RDD的几个复杂的Transformation
->combineByKey(x=>x,(a:List[String],b:String)=>a:+b,
(m:List[String],n:List[String])=>m++n)
第一个参数表示分组后的第一个值如何处理,
第二个参数表示后续的值和前一个值如何处理,
第三个参数表示,map端处理完成后,在reduce端如何对这些list进行处理。
->aggregate(“初始量,可以是String也可以是int”)(第一个func,第二个func)
初始量作用于没一个分区,第一个func作用于map端,第二个func作用于reduce端。
->reduceByKey(_+_)作用于map端和reduce端,可以进行局部聚合。
其实reduceByKey和aggregateByKey在底层都调用了combineByKey方法来实现响应的功能。
->mapPartitions
对每一个分区进行操作,直接在里面使用匿名函数即可
当然如果逻辑非常复杂也是可以考虑在外面先定义好这个函数之后在传输进去。
rdd1.mapPartitions((it:Iterator[String])=>{
it.toList.map(x=>(x,1)).iterator
})
>mapPartitionsWithIndex
首先定义一个函数,当然也可以写在里面作为匿名函数
valfunc=(index:Int,it:Iterator[Int])=>{
it.toList.map(x=>(“index:”+index,x)).iterator
}
rdd1.mapPartitionsWithIndex(func).collect
5、RDD自定义Partitioner
//自定义分区器,重写里面的getPartition方法和numPartitions方法。
//构造这个对象的时候,就把所有情况的信息传输过来,然后在里面进行分类处理。
classHostPartition(hostArr:Array[String])extendsPartitioner{
//对所有的数据进行分类,每一种类型对应一个int编号。所以使用map比较合适。
valmap=newmutable.HashMap[String,Int]()
for(index<-0until(hostArr.length)){
map.put(hostArr(index),index)
}
//重写getPartition的方法。
overridedefgetPartition(key:Any):Int={
map.getOrElse(key.toString,0)
}
overridedefnumPartitions:Int=hostArr.length
}
应用:
valhostPartition:HostPartition=newHostPartition(hostList)
valallPartitionRDD:RDD[(String,(String,Int))]=host_url_count.partitionBy(hostPartition)
6、自定义排序规则==>定义一个
caseclassGril(yanzhi:Int,nianling:Int)extendsOrdered[Gril]withSerializable{
overridedefcompare(that:Gril):Int={
valyanzhiResult:Int=this.yanzhi.compareTo(that.yanzhi)
if(yanzhiResult==0){
returnthis.nianling.compareTo(that.nianling)
}
returnyanzhiResult
}
}
应用:
valrdd2:RDD[(String,Int,Int)]=rdd1.sortBy(msg=>Gril(msg._2,msg._3))
Spark的SQLContext
#p#分页标题#e#
1、Spark整合Hive和HDFS只需要将Hive的hive-site.xml;hadoop的core-site.xml和hdfs-site.xml拷贝到Spark的conf目录下即可。Spark就知道如何使用hive的表,同时也知道去哪个NameNode哪里都数据了。
2、DataFrame是什么?
是一个分布式数据集,对RDD的封装。RDD有的方法他基本上都有
3、DataFrame如何创建?
三种方式:->RDD+caseclass
->RDD+structType
->sqlContext.read.format.options(Map())
4、DataFrame首先需要注册成表结构之后才可以使用sqlContext来操作。
dF.registerTempTable(“person”)
5、使用sqlContext==>返回一个DataFrame
sqlContext.sql(“select*fromperson”)
6、DataFrame将数据写入到HDFS或者mysql中
valprop=newProperties()
prop.put(“user”,”root”)
prop.put(“password”,”815325″)
//如果数据库中没有这个表,那么他也会创建一张表(很强大)
resultDF.write.mode(“append”).jdbc(“jdbc:mysql://localhost:3306/bigdata”,”result”,prop)
结束语:看完文章的小伙伴,或多或少都学到了一些知识吧!如果各位小伙伴还想了解更多这方面的知识内容,随时可以登陆课课家哦!这里有全面的知识还有视频教学哟~ 我们在这里随时等着你的到来。