cloudera manager介绍提供的API中有获取CPU,磁盘等的监控数据吗

2424人阅读
英文原文链接:https://www.cloudera.com/documentation/enterprise/5-8-x/topics/cm_metrics.html
译文如下:
本节提供有关Cloudera Manager支持的度量标准的信息。
度量是可以被测量以量化实体或活动的状态的属性。它们包括属性,例如打开的文件描述符的数量或群集中的CPU利用率百分比。
Cloudera Manager监视在集群上运行的服务和角色实例的多个性能指标。这些度量针对可配置的阈值进行监控,并且可以用于指示主机是否按预期工作。您可以在Cloudera Manager管理控制台中查看这些指标,该控制台显示有关作业的指标(例如当前正在运行的作业数量及其CPU /内存使用量),Hadoop服务(例如平均HDFS I / O延迟和并发数作业),集群(例如所有主机上的平均CPU负载)等。Cloudera Manager将来自生成实体的度量标准预聚合到它们所属的实体。例如,由磁盘,网络接口和文件系统生成的度量标准将聚合到其各自的主机和集群。
有关详细信息,请参阅度量聚合。
在Cloudera Manager管理控制台中,您可以通过以下方式发现Cloudera Manager收集哪些度量标准:
指标列表:
选择图表&图表构建器,
单击构建图表按钮右侧的问号图标。
单击“指标列表”链接。
使用tsquery语言来检索您感兴趣的实体类型的所有指标。tsquery语言是用于指定用于检索时间系列数据的语句的语言,也就是说,每个点包含时间戳的指标数据点流,该时间戳处的指标值。
您可以绘制时间范围内的指标。有关更多详细信息,请参阅查看集群,服务,角色和主机实例的图表。本指南中列出的指标包括简短描述及其单位及其适用的CDH版本。大多数单位是不言自明的。单位“CPU秒/秒”定义为每秒使用的CPU秒数。例如,如果您有一个具有16个核心的1个主机群集,并且每个任务使用一个核心的16个任务,则该度量标准的值为16。
所有度量的采样率为一分钟。
继续阅读:
Accumulo 1.6 Metrics
活动监视器指标
警报发布商指标
Beeswax服务器指标
Cloudera管理服务指标
Cloudera Manager服务器指标
DataNode度量
事件服务器指标
故障转移控制器指标
文件系统指标
Flume频道指标
Flume Channel Tier指标
水槽水位指标
水槽来源指标
Flume源层度量
垃圾收集器指标
HBase命名空间指标
HBase REST服务器指标
HBase区域指标
HBase表度量
HBase Thrift服务器指标
HDFS缓存指令指标
HDFS缓存池度量
历史记录服务器指标
Hive Metastore服务器指标
HiveServer2指标
主机监视器指标
HttpFS度量
Hue服务器指标
Impala指标
Impala目录服务器指标
Impala Daemon指标
Impala后台进程池指标
Impala Llama ApplicationMaster指标
Impala池度量
Impala查询指标
Impala StateStore指标
Isilon指标
JobHistory服务器指标
JobTracker指标
日志节点度量
Kerberos票证续订指标
密钥管理服务器指标
密钥管理服务器代理度量
主要信任度量
键值存储索引器度量标准
Lily HBase索引器指标
记录器度量
MapReduce指标
NFS网关指标
NameNode度量
导航器审核服务器指标
导航器元数据服务器指标
网络接口指标
NodeManager指标
Oozie服务器指标
RegionServer度量标准
报告管理器指标
ResourceManager指标
SecondaryNameNode指标
Sentry指标
Sentry服务器指标
服务器指标
服务监控指标
Solr收集度量
Solr副本度量
Solr服务器指标
Solr Shard指标
Spark(独立)指标
Sqoop 1客户端指标
Sqoop 2指标
Sqoop 2服务器指标
平板电脑服务器指标
TaskTracker指标
时间序列表指标
WebHCat服务器指标
YARN(MR2包括)指标
YARN池度量ZooKeeper度量
文章:39篇
阅读:52212
文章:48篇
阅读:65674
文章:58篇
阅读:108656
文章:15篇
阅读:113397361人阅读
大数据(11)
大数据面试题及答案
当前版本:
制作单位:
编写人员:
审 核 人:
签 收 人:
签署日期:
&&& 2017 年 05 月 22 日
版权所有 翻印必究
第1部分&&&&&&&&&&&选择题
1.1&&&&&Hadoop选择题
1.1.1&&Hdfs
1.&&&&&&下面哪个程序负责 HDFS 数据存储?
a)NameNode &
b)Jobtracker &
c)Datanode &
d)secondaryNameNode &
e)tasktracker
2.&&&&&&HDfS 中的 block 默认保存几份?
3.&&&&&&下列哪个程序通常与NameNode 在一个节点启动?
a)SecondaryNameNode
b)DataNode
c)TaskTracker
d)Jobtracker
注:haoop1.X
hadoop 的集群是基于 master/slave 模式,namenode 和 jobtracker 属于 master,datanode 和 tasktracker属于 slave,master 只有一个,而 slave 有多个。SecondaryNameNode 内存需求和 NameNode 在一个数量级上,所以通常 secondary NameNode(运行在单独的物理机器上)和 NameNode 运行在不同的机器上。
JobTracker 和 TaskTracker
JobTracker 对应于 NameNode
TaskTracker 对应于 DataNode
DataNode 和 NameNode 是针对数据存放来而言的
JobTracker 和 TaskTracker 是对于 MapReduce 执行而言的
mapreduce 中几个主要概念,mapreduce 整体上可以分为这么几条执行线索:
jobclient,JobTracker 与 TaskTracker。
1、JobClient 会在用户端通过 JobClient 类将应用已经配置参数打包成 jar 文件存储到 hdfs,并把路径提交到 Jobtracker,然后由 JobTracker 创建每一个 Task(即 MapTask 和 ReduceTask)并将它们分发到各个 TaskTracker 服务中去执行
2、JobTracker 是一个 master 服务,软件启动之后 JobTracker 接收 Job,负责调度 Job 的每一个子任务 task运行于 TaskTracker 上,并监控它们,如果发现有失败的 task 就重新运行它。一般情况应该把 JobTracker 部署在单独的机器上。
3、TaskTracker 是运行在多个节点上的 slaver 服务。TaskTracker 主动与 JobTracker 通信,接收作业,并负责直接执行每一个任务。TaskTracker 都需要运行在 HDFS 的 DataNode 上
4.&&&&&&HDFS 默认 Block Size
注:旧版本是64MB
5.&&&&&&Client 端上传文件的时候下列哪项正确
a)数据经过 NameNode 传递给 DataNode
b)Client 端将文件切分为 Block,依次上传
c)Client 只上传数据到一台 DataNode,然后由 NameNode 负责 Block 复制工作
Client 向 NameNode 发起文件写入的请求。
NameNode 根据文件大小和文件块配置情况,返回给 Client 它所管理部分 DataNode 的信息。
Client 将文件划分为多个 Block,根据 DataNode 的地址信息,按顺序写入到每一个 DataNode 块中。
6.&&&&&&下面与 HDFS 类似的框架是?C
7.&&&&&&的
8.&&&&&&的
1.1.2&&集群管理
1.&&&&&&下列哪项通常是集群的最主要瓶颈
c)磁盘 IO &
由于大数据面临海量数据,读写数据都需要 io,然后还要冗余数据,hadoop 一般备 3 份数据,所以 IO就会打折扣。
2.&&&&&&关于SecondaryNameNode 哪项是正确的?
a)它是 NameNode 的热备
b)它对内存没有要求
c)它的目的是帮助 NameNode
合并编辑日志,减少 NameNode
d)SecondaryNameNode 应与 NameNode 部署到一个节点
3.&&&&&&下列哪项可以作为集群的管理?
a)Puppet &b)Pdsh &c)ClouderaManager &d)Zookeeper
A:puppetpuppet 是一种 Linux、Unix、windows 平台的集中配置管理系统
B:pdsh 可以实现在在多台机器上执行相同的命令
详细参考:集群管理小工具介绍-pdsh
C:可以参考 Cloudera Manager 四大功能【翻译】
首先这里给管理下一个定义:部署、配置、调试、监控,属于管理
4.&&&&&&配置机架感知的下面哪项正确
a)如果一个机架出问题,不会影响数据读写
b)写入数据的时候会写到不同机架的 DataNode
c)MapReduce 会根据机架获取离自己比较近的网络数据
5.&&&&&&下列哪个是 Hadoop 运行的模式
a)单机版 b)伪分布式 c)分布式
6.&&&&&&Cloudera 提供哪几种安装 CDH 的方法
a)Cloudera manager &b)Tarball &c)Yum &d)Rpm
7.&&&&&&的
8.&&&&&&&D d
9.&&&&&&的
1.2&&&&&Hbase选择题
1.2.1&&Hbase基础
1.&&&&&&HBase 来源于哪篇博文? C
A TheGoogle File System
BMapReduce
2.&&&&&&下面对 HBase 的描述哪些是正确的? B、C、D
A 不是开源的
B 是面向列的
C 是分布式的
D 是一种 NoSQL 数据库
3.&&&&&&HBase 依靠()存储底层数据 A
DMapReduce
4.&&&&&&HBase 依赖()提供消息通信机制 A
AZookeeper
5.&&&&&&HBase 依赖()提供强大的计算能力 D
AZookeeper
DMapReduce
6.&&&&&&MapReduce 与 HBase 的关系,哪些描述是正确的? B、C
A 两者不可或缺,MapReduce 是 HBase 可以正常运行的保证
B 两者不是强关联关系,没有 MapReduce,HBase 可以正常运行
CMapReduce 可以直接访问 HBase
D 它们之间没有任何关系
7.&&&&&&下面哪些选项正确描述了HBase 的特性? A、B、C、D
A 高可靠性
8.&&&&&&下面哪些概念是 HBase 框架中使用的?A、C
CZookeeper
9.&&&&&&&D
1.2.2&&Hbase核心
1.&&&&&&LSM 含义是?A
A 日志结构合并树
C 平衡二叉树
D 长平衡二叉树
2.&&&&&&下面对 LSM 结构描述正确的是? A、C
A 顺序存储
B 直接写硬盘
C 需要将数据 Flush 到磁盘
D 是一种搜索平衡树
3.&&&&&&LSM 更能保证哪种操作的性能?B
4.&&&&&&LSM 的读操作和写操作是独立的?A
C LSM 并不区分读和写
D LSM 中读写是同一种操作
5.&&&&&&LSM 结构的数据首先存储在()。 B
C 磁盘阵列中
6.&&&&&&HFile 数据格式中的 Data 字段用于()。A
A 存储实际的 KeyValue 数据
B 存储数据的起点
C 指定字段的长度
D 存储数据块的起点
7.&&&&&&HFile 数据格式中的 MetaIndex 字段用于()。D
A Meta 块的长度
B Meta 块的结束点
C Meta 块数据内容
D Meta 块的起始点
8.&&&&&&HFile 数据格式中的 Magic 字段用于()。A
A 存储随机数,防止数据损坏
B 存储数据的起点
C 存储数据块的起点
D 指定字段的长度
9.&&&&&&HFile 数据格式中的 KeyValue 数据格式,下列选项描述正确的是()。A、D
A 是 byte[]数组
B 没有固定的结构
C 数据的大小是定长的
D 有固定的结构
10.&&HFile 数据格式中的 KeyValue 数据格式中 Value 部分是()。C
A 拥有复杂结构的字符串
C 二进制数据
D 压缩数据
1.2.3&&HBase 高级应用介绍
1.&&&&&&HBase 中的批量加载底层使用()实现。A
AMapReduce
CCoprocessor
D BloomFilter
2.&&&&&&HBase 性能优化包含下面的哪些选项?A、B、C、D
C 配置优化
D JVM 优化
3.&&&&&&Rowkey 设计的原则,下列哪些选项的描述是正确的?A、B、C
A 尽量保证越短越好
B 可以使用汉字
C 可以使用字符串
D 本身是无序的
4.&&&&&&HBase 构建二级索引的实现方式有哪些? A、B
AMapReduce
BCoprocessor
C BloomFilter
5.&&&&&&关于 HBase 二级索引的描述,哪些是正确的?A、B
A 核心是倒排表
B 二级索引概念是对应 Rowkey 这个“一级”索引
C 二级索引使用平衡二叉树
D 二级索引使用 LSM 结构
6.&&&&&&下列关于 Bloom Filter 的描述正确的是?A、C
A 是一个很长的二进制向量和一系列随机映射函数
B 没有误算率
C 有一定的误算率
D 可以在 Bloom Filter 中删除元素
1.2.4&&HBase 安装、部署、启动
1.&&&&&&HBase 官方版本可以安装在什么操作系统上?A、B、C
2.&&&&&&HBase 虚拟分布式模式需要()个节点?A
D 最少 3 个
3.&&&&&&HBase 分布式模式最好需要()个节点?C
4.&&&&&&下列哪些选项是安装 HBase 前所必须安装的?A、B
A 操作系统
C ShellScript
D JavaCode
5.&&&&&&解压.tar.gz 结尾的 HBase 压缩包使用的 Linux 命令是?A
A tar-zxvf
1.3&&&&&Zookeeper选择题
1.3.1&&Zookeeper基础
1.&&&&&&下面与 Zookeeper 类似的框架是?D
2.&&&&&&的
第2部分&&&&&&&&&&&判断题
第2部分&&&&&&
2.1&&&&&Hadoop判断题
2.1.1&&集群管理
1.&&&&&&Ganglia 不仅可以进行监控,也可以进行告警。(正确)
ganglia 作为一款最常用的 Linux 环境中的监控软件,它擅长的的是从节点中按照用户的需求以较低的代价
采集数据。但是 ganglia 在预警以及发生事件后通知用户上并不擅长。最新的 ganglia 已经有了部分这方面
的功能。但是更擅长做警告的还有 Nagios。Nagios,就是一款精于预警、通知的软件。通过将 Ganglia 和
Nagios 组合起来,把 Ganglia 采集的数据作为 Nagios 的数据源,然后利用 Nagios 来发送预警通知,可以
完美的实现一整套监控管理的系统。
2.&&&&&&Nagios 不可以监控 Hadoop 集群,因为它不提供 Hadoop支持。(错误 )
Nagios 是集群监控工具,而且是云计算三大利器之一
3.&&&&&&如果 NameNode 意外终止,SecondaryNameNode 会接替它使集群继续工作。(错误 )
SecondaryNameNode 是帮助恢复,而不是替代
4.&&&&&&Cloudera CDH 是需要付费使用的。(错误)
第一套付费产品是 Cloudera Enterpris,Cloudera Enterprise 在美国加州举行的 Hadoop 大会 (HadoopSummit) 上公开,以若干私有管理、监控、运作工具加强 Hadoop 的功能。收费采取合约订购方式,价格随用的 Hadoop 叢集大小变动。
5.&&&&&&NameNode 负责管理 metadata,client 端每次读写请求,它都会从磁盘中读取或则会写入 metadata信息并反馈 client 端。(错误)
NameNode 不需要从磁盘读取 metadata,所有数据都在内存中,硬盘上的只是序列化的结果,只有每次namenode 启动的时候才会读取。
1)文件写入
Client 向 NameNode 发起文件写入的请求。
NameNode 根据文件大小和文件块配置情况,返回给 Client 它所管理部分 DataNode 的信息。
Client 将文件划分为多个 Block,根据 DataNode 的地址信息,按顺序写入到每一个 DataNode 块中。
2)文件读取
Client 向 NameNode 发起文件读取的请求。
NameNode 返回文件存储的 DataNode 的信息。
Client 读取文件信息。
6.&&&&&&NameNode 本地磁盘保存了 Block 的位置信息。( 个人认为正确,欢迎提出其它意见)
DataNode 是文件存储的基本单元,它将 Block 存储在本地文件系统中,保存了 Block 的 Meta-data,同时周期性地将所有存在的 Block 信息发送给 NameNode。
7.&&&&&&DataNode 通过长连接与 NameNode 保持通信。错误
通过心跳机制。
(1).长连接
Client 方与 Server 方先建立通讯连接,连接建立后不断开,然后再进行报文发送和接收。这种方式下由于通讯连接一直存在,此种方式常用于点对点通讯。
(2).短连接
Client 方与 Server 每进行一次报文收发交易时才进行通讯连接,交易完毕后立即断开连接。此种方式常用于一点对多点通讯,比如多个 Client 连接一个 Server.
8.&&&&&&Hadoop 自身具有严格的权限管理和安全措施保障集群正常运行。(错误)
9.&&&&&&Slave 节点要存储数据,所以它的磁盘越大越好。(错误)
一旦 Slave 节点宕机,数据恢复是一个难题
10.&&hadoop dfsadmin –report 命令用于检测 HDFS 损坏块。(错误)
hadoop dfsadmin -report
用这个命令可以快速定位出哪些节点 down 掉了,HDFS 的容量以及使用了多少,以及每个节点的硬盘使用情况。
当然 NameNode 有个 http 页面也可以查询,但是这个命令的输出更适合我们的脚本监控 dfs 的使用状况
11.&&Hadoop 默认调度器策略为 FIFO(正确 )
12.&&集群内每个节点都应该配 RAID,这样避免单磁盘损坏,影响整个节点运行。(错误)
首先明白什么是 RAID:磁盘阵列(Redundant Arrays of Independent Disks,RAID),有“独立磁盘构成的具有冗余能力的阵列”之意。
这句话错误的地方在于太绝对,具体情况具体分析。题目不是重点,知识才是最重要的。
因为 hadoop 本身就具有冗余能力,所以如果不是很严格不需要都配备 RAID。
13.&&Hadoop 环境变量中的 HADOOP_HEAPSIZE 用于设置所有 Hadoop 守护线程的内存。它默认是 200 GB。( 错误)
hadoop 为各个守护进程(namenode,secondarynamenode,jobtracker,datanode,tasktracker)统一分配的内存在 hadoop-env.sh 中设置,参数为 HADOOP_HEAPSIZE,默认为 1000M。
14.&&DataNode 首次加入 cluster 的时候,如果 log 中报告不兼容文件版本,那需要 NameNode执行―Hadoopnamenode -format‖操作格式化磁盘。(错误 )
这个报错是说明 DataNode 所装的 Hadoop 版本和其它节点不一致,应该检查 DataNode 的 Hadoop 版本
2.1.2&&Hdfs
1.&&&&&&Block Size 是不可以修改的。(错误)
Hadoop 的基础配置文件是 hadoop-default.xml,默认建立一个 Job 的时候会建立 Job 的 Config,Config
首先读入hadoop-default.xml的配置,然后再读入hadoop-site.xml的配置(这个文件初始的时候配置为空),
hadoop-site.xml 中主要配置需要覆盖的 hadoop-default.xml 的系统级配置。具体配置可以参考下:
&property&
& &name&dfs.block.size&/name&//block 的大小,单位字节,后面会提到用处,必须是 512 的倍数,因
为采用 crc 作文件完整性校验,默认配置 512 是 checksum 的最小单元。
& &value&5120000&/value&
& &description&The default block size for new files.&/description&
&/property&
2.&&&&&&Hadoop 支持数据的随机读写。(错)
lucene 是支持随机读写的,而 hdfs 只支持随机读。但是 HBase 可以来补救。
HBase 提供随机读写,来解决 Hadoop 不能处理的问题。HBase 自底层设计开始即聚焦于各种可伸缩性问题:表可以很―高‖,有数十亿个数据行;也可以很―宽‖,有数百万个列;水平分区并在上千个普通商用机节点上自动复制。表的模式是物理存储的直接反映,使系统有可能提高高效的数据结构的序列化、存储和检索。
3.&&&&&&因为 HDFS 有多个副本,所以 NameNode 是不存在单点问题的。(错误 )
副本针对DataName而讲的
4.&&&&&&的
5.&&&&&&的
6.&&&&&&的
2.1.3&&MapReduce
1.&&&&&&Hadoop 是 Java 开发的,所以 MapReduce 只支持 Java 语言编写。(错误 )
支持c++等语言,需要通过接口。
2.&&&&&&每个 map 槽就是一个线程。(错误)
一个task对应一个线程
分析:首先我们知道什么是 map 槽,map 槽-&map slot,map slot 只是一个逻辑值 ( org.apache.hadoop.mapred.TaskTracker.TaskLauncher.numFreeSlots ),而不是对应着一个线程或者进程
3.&&&&&&Mapreduce 的 input split 就是一个 block。(错误)
& &应该是一个block数组
1、运行mapred程序;
2、本次运行将生成一个Job,于是JobClient向JobTracker申请一个JobID以标识这个Job;
3、JobClient将Job所需要的资源提交到HDFS中一个以JobID命名的目录中。这些资源包括JAR包、配置文件、InputSplit、等;
4、JobClient向JobTracker提交这个Job;
5、JobTracker初始化这个Job;
6、JobTracker从HDFS获取这个Job的Split等信息;
7、JobTracker向TaskTracker分配任务;
8、TaskTracker从HDFS获取这个Job的相关资源;
9、TaskTracker开启一个新的JVM;
10、TaskTracker用新的JVM来执行Map或Reduce;
InputSplit也是一个interface,具体返回什么样的implement,这是由具体的InputFormat来决定的。InputSplit也只有两个接口函数:
long getLength() throws IOE
String[] getLocations() throws IOE
这个interface仅仅描述了Split有多长,以及存放这个Split的Location信息(也就是这个Split在HDFS上存放的机器。它可能有多个replication,存在于多台机器上)。除此之外,就再没有任何直接描述Split的信息了。比如:Split对应于哪个文件?在文件中的起始和结束位置是什么?等等重要的特征都没有描述到。
为什么会这样呢?因为关于Split的那些描述信息,对于MapReduce框架来说是不需要关心的。框架只关心Split的长度(主要用于一些统计信息)和Split的Location(主要用于Split的调度,后面会细说)。
而Split中真正重要的描述信息还是只有InputFormat会关心。在需要读取一个Split的时候,其对应的InputSplit会被传递到InputFormat的第二个接口函数getRecordReader,然后被用于初始化一个RecordReader,以解析输入数据。也就是说,描述Split的重要信息都被隐藏了,只有具体的InputFormat自己知道。它只需要保证getSplits返回的InputSplit和getRecordReader所关心的InputSplit是同样的implement就行了。这就给InputFormat的实现提供了巨大的灵活性。
4.&&&&&&的
5.&&&&&&的
6.&&&&&&的
8.&&&&&&的
第3部分&&&&&&&&&&&叙述题
第3部分&&&&&&
3.1&&&&&Hadoop叙述题
3.1.1&&Hadoop部署
1.&&&&&&hdfs的体系结构
hdfs有namenode、secondraynamenode、datanode组成。
为n+1模式
namenode负责管理datanode和记录元数据
secondraynamenode负责合并日志
datanode负责存储数据
2.&&&&&&简要描述如何安装配置一个apache开原本hadoop,只描述即可,无需列出完整步骤,能列出步骤更好。
1.创建hadoop用户
3.安装JDK,并配置环境变量
4.修改host文件映射
5.安装SSH,配置无秘钥通信
6.上传解压hadoop安装包
7.配置conf文件夹下的hadoop-env.sh、core-site.xlmapre-site.xml、hdfs-site.xml
8.配置hadoop的环境变量
9.Hadoop namenode -format
10.start-all
3.&&&&&&启动hadoop集群时报下图错误,分析什么原因:
1、权限问题,可能曾经用root启动过集群。(例如hadoop搭建的集群,是tmp/hadoop-hadoop/.....)
2、可能是文件夹不存在
3、解决: 删掉tmp下的那个文件,或改成当前用户
4.&&&&&&请列出hadoop的进程名称
1.namenode:管理集群,并记录datanode文件信息。
2.Secondname:可以做冷备,对一定范围内的数据做快照性备份。
3.Datanode:存储数据。
4.Jobtracker:管理任务,并将任务分配给tasktracker。
5.Tasktracker:任务执行者
5.&&&&&&Hadoop的核心配置是什么?
Hadoop的核心配置通过两个xml文件来完成:
1.hadoop-default.xml;
2.hadoop-site.xml。
这些文件都使用xml格式,因此每个xml中都有一些属性,包括名称和值,但是当下这些文件都已不复存在。
6.&&&&&&那当下又该如何配置?
Hadoop现在拥有3个配置文件:
1,core-site.xml;
2,hdfs-site.xml;
3,mapred-site.xml。
这些文件都保存在conf/子目录下。
7.&&&&&&“jps”命令的用处?
这个命令可以检查Namenode、Datanode、Task Tracker、 Job Tracker是否正常工作。
8.&&&&&&简要描述如何安装配置一个apache 开源版 hadoop,描述即可,列出步骤更好
9.&&&&&&请列出正常工作的 hadoop 集群中 hadoop 都需要启动哪些进程,他们的作用分别是什么?
10.&&启动 hadoop 报如下错误,该如何解决?
error org.apache.hadoop.hdfs.server.namenode.NameNode
org.apache.hadoop.hdfs.server.common.inconsistentFSStateExceptio
n Directory /tmp/hadoop-root/dfs/name is in an inconsistent
state storage direction does not exist or is not accessible?
11.&&请写出以下执行命令
1)杀死一个 job?
2)删除 hdfs 上的/tmp/aaa 目录
3)加入一个新的存储节点和删除一个计算节点需要刷新集群状态命令?
hadoop job -list 记录job-id、hadoop job -kill job-id
hadoop fs -rmr /tmp/aaa
添加新节点:
hadoop -daemon.sh start datanode
hadoop -daemon.sh start tasktracker
移除一个节点:
hadoop mradmin -refreshnodes
hadoop dfsadmin -refreshnodes
12.&&请列出你所知道的 hadoop 调度器,并简要说明其工作方法?
1.FIFO schedular:默认,先进先出的原则
2.Capacity schedular:计算能力调度器,选择占用最小,优先级高的先执行,以此类推。
3.Fair schedular:公平调度,所有的job具有相同的资源。
13.&&请列出在你以前工作中所使用过的开发 mapreduce 的语言?
14.&&你认为用 Java,Streaming,pipe 方式开发 mapreduce,各有哪些优缺点?
15.&&hadoop框架中怎么来优化
(1)& 从应用程序角度进行优化。由于mapreduce是迭代逐行解析数据文件的,怎样在迭代的情况下,编写高效率的应用程序,是一种优化思路。
(2)& 对Hadoop参数进行调优。当前hadoop系统有190多个配置参数,怎样调整这些参数,使hadoop作业运行尽可能的快,也是一种优化思路。
(3) 从系统实现角度进行优化。这种优化难度是最大的,它是从hadoop实现机制角度,发现当前Hadoop设计和实现上的缺点,然后进行源码级地修改。该方法虽难度大,但往往效果明显。
(4)linux内核参数调整
1. 使用自定义Writable
自带的Text很好用,但是字符串转换开销较大,故根据实际需要自定义Writable,注意作为Key时要实现WritableCompareable接口
避免output.collect(new Text( ),new Text())
提倡key.set( ) value.set( ) output.collect(key,value)
前者会产生大量的Text对象,使用完后Java垃圾回收器会花费大量的时间去收集这些对象
2. 使用StringBuilder
不要使用Formatter StringBuffer(&线程安全)
StringBuffer尽量少使用多个append方法,适当使用+
3. 使用DistributedCache加载文件
比如配置文件,词典,共享文件,避免使用static变量
4. 充分使用Combiner Parttitioner Comparator。
Combiner : 对map任务进行本地聚合
Parttitioner : 合适的Parttitioner避免reduce端负载不均
Comparator : 二次排序
比如求每天的最大气温,map结果为日期:气温,若气温是降序的,直接取列表首元素即可
5. 使用自定义InputFormat和OutputFormat
6. MR应避免
静态变量:不能用于计数,应使用Counter
大对象:Map List
递归:避免递归深度过大
超长正则表达式:消耗性能,要在map或reduce函数外编译正则表达式
不要创建本地文件:变向的把HDFS里面的数据转移到TaskTracker,占用网络带宽
不要大量创建目录和文件
不要大量使用System.out.println,而使用Logger
不要自定义过多的Counter,最好不要超过100个
不要配置过大内存,mapred.child.java.opts -Xmx2000m是用来设置mapreduce任务使用的最大heap量
7.关于map的数目
map数目过大[创建和初始化map的开销],一般是由大量小文件造成的,或者dfs.block.size设置的太小,对于小文件可以archive文件或者Hadoop fs -merge合并成一个大文件.
map数目过少,造成单个map任务执行时间过长,频繁推测执行,且容易内存溢出,并行性优势不能体现出来。dfs.block.size一般为256M-512M
压缩的Text 文件是不能被分割的,所以尽量使用SequenceFile,可以切分
8.关于reduce的数目
reduce数目过大,产生大量的小文件,消耗大量不必要的资源,reduce数目过低呢,造成数据倾斜问题,且通常不能通过修改参数改变。
可选方案:mapred.reduce.tasks设为-1变成AutoReduce。
Key的分布,也在某种程度上决定了Reduce数目,所以要根据Key的特点设计相对应的Parttitioner 避免数据倾斜
9.Map-side相关参数优化
io.sort.mb(100MB):通常k个map tasks会对应一个buffer,buffer主要用来缓存map部分计算结果,并做一些预排序提高map性能,若map输出结果较大,可以调高这个参数,减少map任务进行spill任务个数,降低 I/O的操作次数。若map任务的瓶颈在I/O的话,那么将会大大提高map性能。如何判断map任务的瓶颈?
io.sort.spill.percent(0.8):spill操作就是当内存buffer超过一定阈值(这里通常是百分比)的时候,会将buffer中得数据写到Disk中。而不是等buffer满后在spill,否则会造成map的计算任务等待buffer的释放。一般来说,调整 io.sort.mb而不是这个参数。
io.sort.factor(10):map任务会产生很多的spill文件,而map任务在正常退出之前会将这些spill文件合并成一个文件,即merger过程,缺省是一次合并10个参数,调大io.sort.factor,减少merge的次数,减少Disk I/O操作,提高map性能。
min.num.spill.for.combine:通常为了减少map和reduce数据传输量,我们会制定一个combiner,将map结果进行本地聚集。这里combiner可能在merger之前,也可能在其之后。那么什么时候在其之前呢?当spill个数至少为min.num.spill.for.combine指定的数目时同时程序指定了Combiner,Combiner会在其之前运行,减少写入到Disk的数据量,减少I/O次数。
10.压缩(时间换空间)
MR中的数据无论是中间数据还是输入输出结果都是巨大的,若不使用压缩不仅浪费磁盘空间且会消耗大量网络带宽。同样在spill,merge(reduce也对有一个merge)亦可以使用压缩。若想在cpu时间和压缩比之间寻找一个平衡,LzoCodec比较适合。通常MR任务的瓶颈不在CPU而在于I/O,所以大部分的MR任务都适合使用压缩。
11. reduce-side相关参数优化
reduce:copy-&sort-&reduce,也称shuffle
mapred.reduce.parellel.copies(5):任一个map任务可能包含一个或者多个reduce所需要数据,故一个map任务完成后,相应的reduce就会立即启动线程下载自己所需要的数据。调大这个参数比较适合map任务比较多且完成时间比较短的Job。
mapred.reduce.copy.backoff:reduce端从map端下载数据也有可能由于网络故障,map端机器故障而失败。那么reduce下载线程肯定不会无限等待,当等待时间超过mapred.reduce.copy.backoff时,便放弃,尝试从其他地方下载。需注意:在网络情况比较差的环境,我们需要调大这个参数,避免reduce下载线程被误判为失败。
io.sort.factor:recude将map结果下载到本地时,亦需要merge,如果reduce的瓶颈在于I/O,可尝试调高增加merge的并发吞吐,提高reduce性能、
mapred.job.shuffle.input.buffer.percent(0.7):reduce从map下载的数据不会立刻就写到Disk中,而是先缓存在内存中,mapred.job.shuffle.input.buffer.percent指定内存的多少比例用于缓存数据,内存大小可通过mapred.child.java.opts来设置。和map类似,buffer不是等到写满才往磁盘中写,也是到达阈值就写,阈值由mapred.job,shuffle.merge.percent来指定。若Reduce下载速度很快,容易内存溢出,适当增大这个参数对增加reduce性能有些帮助。
mapred.job.reduce.input.buffer.percent (0):当Reduce下载map数据完成之后,就会开始真正的reduce的计算,reduce的计算必然也是要消耗内存的,那么在读物reduce所需要的数据时,同样需要内存作为buffer,这个参数是决定多少的内存百分比作为buffer。默认为0,也就是说reduce全部从磁盘读数据。若redcue计算任务消耗内存很小,那么可以设置这个参数大于0,使一部分内存用来缓存数据。
16.&&从应用程序角度进行优化
(1) 避免不必要的reduce任务
如果mapreduce程序中reduce是不必要的,那么我们可以在map中处理数据, Reducer设置为0。这样避免了多余的reduce任务。
(2) 为job添加一个Combiner
为job添加一个combiner可以大大减少shuffle阶段从map task拷贝给远程reduce task的数据量。一般而言,combiner与reducer相同。
(3) 根据处理数据特征使用最适合和简洁的Writable类型
Text对象使用起来很方便,但它在由数值转换到文本或是由UTF8字符串转换到文本时都是低效的,且会消耗大量的CPU时间。当处理那些非文本的数据时,可以使用二进制的Writable类型,如IntWritable, FloatWritable等。二进制writable好处:避免文件转换的消耗;使map task中间结果占用更少的空间。
(4) 重用Writable类型
很多MapReduce用户常犯的一个错误是,在一个map/reduce方法中为每个输出都创建Writable对象。例如,你的Wordcout mapper方法可能这样写:
public void map(...) {
& for (String word : words) {
&&& output.collect(new Text(word), new IntWritable(1));
这样会导致程序分配出成千上万个短周期的对象。Java垃圾收集器就要为此做很多的工作。更有效的写法是:
class MyMapper … {
& Text wordText = new Text();
& IntWritable one = new IntWritable(1);
& public void map(...) {
&&& for (String word: words) {
&&&&& wordText.set(word);
&&&&& output.collect(wordText, one);
(5) 使用StringBuffer而不是String
当需要对字符串进行操作时,使用StringBuffer而不是String,String是read-only的,如果对它进行修改,会产生临时对象,而StringBuffer是可修改的,不会产生临时对象。
17.&&datanode在什么情况下不会备份
当分备份数为1时。
18.&&combiner出现在那个过程
出现在map阶段的map方法后。
19.&&3个datanode中有一个datanode出现错误会怎样?
这个datanode的数据会在其他的datanode上重新做备份。
20.&&描述一下hadoop中,有哪些地方使用了缓存机制,作用分别是什么?
21.&&如何确定hadoop集群的健康状态
22.&&hadoop 的 namenode 宕机,怎么解决
先分析宕机后的损失,宕机后直接导致client无法访问,内存中的元数据丢失,但是硬盘中的元数据应该还存在,如果只是节点挂了,重启即可,如果是机器挂了,重启机器后看节点是否能重启,不能重启就要找到原因修复了。但是最终的解决方案应该是在设计集群的初期就考虑到这个问题,做namenode的HA。
23.&&一个datanode 宕机,怎么一个流程恢复
Datanode宕机了后,如果是短暂的宕机,可以实现写好脚本监控,将它启动起来。如果是长时间宕机了,那么datanode上的数据应该已经被备份到其他机器了,那这台datanode就是一台新的datanode了,删除他的所有数据文件和状态文件,重新启动。
3.1.2&&Hadoop原理
1.&&&&&&请简述 hadoop 怎么样实现二级排序?
在Reduce阶段,先对Key排序,再对Value排序
最常用的方法是将Value放到Key中,实现一个组合Key,然后自定义Key排序规则(为Key实现一个WritableComparable)。
2.&&&&&&如何使用MapReduce实现两个表join,可以考虑一下几种情况:(1)一个表大,一个表小(可放到内存中);(2)两个表都是大表?
第一种情况比较简单,只需将小表放到DistributedCache中即可;
第二种情况常用的方法有:map-side join(要求输入数据有序,通常用户Hbase中的数据表连接),reduce-side join,semi join(半连接)
3.&&&&&&MapReduce中排序发生在哪几个阶段?这些排序是否可以避免?为什么?
一个MapReduce作业由Map阶段和Reduce阶段两部分组成,这两阶段会对数据排序,从这个意义上说,MapReduce框架本质就是一个Distributed Sort。在Map阶段,在Map阶段,Map Task会在本地磁盘输出一个按照key排序(采用的是快速排序)的文件(中间可能产生多个文件,但最终会合并成一个),在Reduce阶段,每个Reduce Task会对收到的数据排序,这样,数据便按照Key分成了若干组,之后以组为单位交给reduce()处理。很多人的误解在Map阶段,如果不使用Combiner便不会排序,这是错误的,不管你用不用Combiner,Map
Task均会对产生的数据排序(如果没有Reduce Task,则不会排序, 实际上Map阶段的排序就是为了减轻Reduce端排序负载)。由于这些排序是MapReduce自动完成的,用户无法控制,因此,在hadoop 1.x中无法避免,也不可以关闭,但hadoop2.x是可以关闭的。
4.&&&&&&请简述 mapreduce 中,combiner,partition 作用?
combiner是reduce的实现,在map端运行计算任务,减少map端的输出数据。
作用就是优化。
但是combiner的使用场景是mapreduce的map输出结果和reduce输入输出一样。
partition的默认实现是hashpartition,是map端将数据按照reduce个数取余,进行分区,不同的reduce来copy自己的数据。
partition的作用是将数据分到不同的reduce进行计算,加快计算效果。
1、combiner最基本是实现本地key的聚合,对map输出的key排序,value进行迭代。如下所示:
  map: (K1, V1) → list(K2, V2)
  combine: (K2, list(V2)) → list(K2, V2)
  reduce: (K2, list(V2)) → list(K3, V3)
  2、combiner还具有类似本地的reduce功能.
  例如hadoop自带的wordcount的例子和找出value的最大值的程序,combiner和reduce完全一致。如下所示:
  map: (K1, V1) → list(K2, V2)
  combine: (K2, list(V2)) → list(K3, V3)
  reduce: (K3, list(V3)) → list(K4, V4)
  3、如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。
  4、对于hadoop自带的wordcount的例子,value就是一个叠加的数字,所以map一结束就可以进行reduce的value叠加,而不必要等到所有的map结束再去进行reduce的value叠加。
  combiner使用的合适,可以在满足业务的情况下提升job的速度,如果不合适,则将导致输出的结果不正确。
5.&&&&&&解释―hadoop‖和―hadoop 生态系统‖两个概念
6.&&&&&&说明 Hadoop 2.0 的基本构成
分别说明hdfs,yarn,mapreduce
7.&&&&&&相比于 HDFS1.0,HDFS 2.0 最主要的改进在哪几方面?
8.&&&&&&试使用―步骤 1,步骤 2,步骤 3…..‖说明 YARN 中运行应用程序的基本流程
9.&&&&&&―MapReduce 2.0‖与―YARN‖是否等同,尝试解释说明
10.&&MapReduce 2.0 中,MRAppMaster 主要作用是什么,MRAppMaster如何实现任务容错的?
11.&&hdfs 原理,以及各个模块的职责
12.&&mr 的工作原理
Map—combiner—partition—sort—copy—sort—grouping—reduce
13.&&map 方法是如何调用 reduce 方法的
14.&&shell 如何判断文件是否存在,如果不存在该如何处理?
15.&&fsimage 和 edit 的区别?
16.&&hadoop1 和 hadoop2 的区别?
17.&&hdfs 中的 block 默认报错几份?
18.&&哪个程序通常与 nn 在一个节点启动?并做分析
19.&&列举几个配置文件优化?
20.&&datanode 首次加入 cluster 的时候,如果 log 报告不兼容文件版本,那需要 namenode 执行格式化操作,这样处理的原因是?
21.&&用mapreduce怎么处理数据倾斜问题?
数据倾斜:map /reduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完,此称之为数据倾斜。
用hadoop程序进行数据关联时,常碰到数据倾斜的情况,这里提供一种解决方法。
自己实现partition类,用key和value相加取hash值:
public int getPartition(K
key, V value,
&&&&&&&&&&&&&&&&&&&&&&&&& int
numReduceTasks) {
&&& return (key.hashCode()
& Integer.MAX_VALUE) %
numReduceTasks;
public int getPartition(K
key, V value,
&&&&&&&&&&&&&&&&&&&&&&&&& int
numReduceTasks) {
&&& return (((key).hashCode()+value.hashCode())
& Integer.MAX_VALUE) %
numReduceTasks;
public class HashPartitioner&K, V&
extends Partitioner&K, V& {
private int
& /** Use {@link Object#hashCode()} to partition. */
int getPartition(K
key, V value,
&&&&&&&&&&&&&&&&&&&&&&&&& int
numReduceTasks) {
&&& return (key.hashCode()+(aa++) & Integer.MAX_VALUE)
% numReduceTasks;
22.&&谈谈数据倾斜,如何发生的,并给出优化方案
23.&&mapreduce 基本执行过程
24.&&谈谈 hadoop1 和 hadoop2 的区别
25.&&hadoop中Combiner的作用?
combiner是reduce的实现,在map端运行计算任务,减少map端的输出数据。
作用就是优化。
但是combiner的使用场景是mapreduce的map和reduce输入输出一样。
26.&&Mapreduce 的 map 数量 和 reduce 数量 怎么确定 ,怎么配置
map的数量有数据块决定,reduce数量随便配置。
27.&&在hadoop中文件的压缩带来了两大好处:
(1)它减少了存储文件所需的空间;
(2)加快了数据在网络上或者从磁盘上或到磁盘上的传输速度;
28.&&mapreduce的调度模式
一个MapReduce作业的生命周期大体分为5个阶段&【1】&:
1.&作业提交与初始化
2.&任务调度与监控
3. 任务运行环境准备
4. 任务执行
5. 作业完成
我们假设JobTracker已经启动,那么调度器是怎么启动的?JobTracker在启动时有以下代码:
JobTracker tracker = startTracker(new JobConf());
tracker.offerService();
其中offerService方法负责启动JobTracker提供的各个服务,有这样一行代码:
taskScheduler.start();
taskScheduler即为任务调度器。start方法是抽象类TaskScheduler提供的接口,用于启动调度器。每个调度器类都要继承TaskScheduler类。回忆一下,调度器启动时会将各个监听器对象注册到JobTracker,以FIFO调度器JobQueueTaskScheduler为例:
& public synchronized void start() throws IOException {
&&& super.start();
&&& taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
&&& eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
&&& eagerTaskInitializationListener.start();
&&& taskTrackerManager.addJobInProgressListener(
&&&&&&& eagerTaskInitializationListener);
这里注册了两个监听器,其中eagerTaskInitializationListener负责作业初始化,而jobQueueJobInProgressListener则负责作业的执行和监控。当有作业提交到JobTracker时,JobTracker会执行所有订阅它消息的监听器的jobAdded方法。对于eagerTaskInitializationListener来说:&
& public void jobAdded(JobInProgress job) {
&&& synchronized (jobInitQueue) {
&&&&& jobInitQueue.add(job);
&&&&& resortInitQueue();
&&&&& jobInitQueue.notifyAll();
提交的作业的JobInProgress对象被添加到作业初始化队列jobInitQueue中,并唤醒初始化线程(若原来没有作业可以初始化):
class JobInitManager implements Runnable {
&&& public void run() {
&&&&& JobInProgress job =
&&&&& while (true) {
&&&&&&& try {
&&&&&&&&& synchronized (jobInitQueue) {
&&&&&&&&&&& while (jobInitQueue.isEmpty()) {
&&&&&&&&&&&&& jobInitQueue.wait();
&&&&&&&&&&& }
&&&&&&&&&&& job = jobInitQueue.remove(0);
&&&&&&&&& }
&&&&&&&&& threadPool.execute(new InitJob(job));
&&&&&&& } catch (InterruptedException t) {
&&&&&&&&& LOG.info(&JobInitManagerThread interrupted.&);
&&&&& threadPool.shutdownNow();
这种工作方式是一种“生产者-消费者”模式:作业初始化线程是消费者,而监听器eagerTaskInitializationListener是生产者。这里可以有多个消费者线程,放到一个固定资源的线程池中,线程个数通过mapred.jobinit.threads参数配置,默认为4个。
下面我们重点来看调度器中的另一个监听器。&jobQueueJobInProgressListener对象在调度器中初始化时连续执行了两个构造器完成初始化:
public JobQueueJobInProgressListener() {
&&& this(new TreeMap&JobSchedulingInfo,
&&&&&&&&&&&&&&&&&&&& JobInProgress&(FIFO_JOB_QUEUE_COMPARATOR));
&& * For clients that want to provide their own job priorities.
&& * @param jobQueue A collection whose iterator returns jobs in priority order.
& protected JobQueueJobInProgressListener(Map&JobSchedulingInfo,
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& JobInProgress& jobQueue) {
&&& this.jobQueue = Collections.synchronizedMap(jobQueue);
其中,第一个构造器调用重载的第二个构造器。可以看到,调度器使用一个队列jobQueue来保存提交的作业。这个队列使用一个TreeMap对象实现,TreeMap的特点是底层使用红黑树实现,可以按照键来排序,并且由于是平衡树,效率较高。作为键的是一个JobSchedulingInfo对象,作为值就是提交的作业对应的JobInProgress对象。另外,由于TreeMap本身不是线程安全的,这里使用了集合类的同步方法构造了一个线程安全的Map。使用带有排序功能的数据结构的目的是使作业在队列中按照优先级的大小排列,这样每次调度器只需从队列头部获得作业即可。
作业的顺序由优先级决定,而优先级信息包含在JobSchedulingInfo对象中:
static class JobSchedulingInfo {
&&& private JobP
&&& private long startT
&&& private JobID
该对象包含了作业的优先级、ID和开始时间等信息。在Hadoop中,作业的优先级有以下五种:VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOW。这些字段是通过作业的JobStatus对象初始化的。由于该对象作为TreeMap的键,因此要实现自己的equals方法和hashCode方法:
&&& public boolean equals(Object obj) {
&&&&& if (obj == null || obj.getClass() != JobSchedulingInfo.class) {
&&&&& } else if (obj == this) {
&&&&& else if (obj instanceof JobSchedulingInfo) {
&&&&&&& JobSchedulingInfo that = (JobSchedulingInfo)
&&&&&&& return (this.id.equals(that.id) &&
&&&&&&&&&&&&&&& this.startTime == that.startTime &&
&&&&&&&&&&&&&&& this.priority == that.priority);
我们看到,两个JobSchedulingInfo对象相等的条件是类型一致,并且作业ID、开始时间和优先级都相等。hashCode的计算比较简单:
&&& public int hashCode() {
&&&&& return (int)(id.hashCode() * priority.hashCode() + startTime);
注意,监听器的第一个构造器有一个比较器参数,用于定义&JobSchedulingInfo的比较方式:
static final Comparator&JobSchedulingInfo& FIFO_JOB_QUEUE_COMPARATOR
&&& = new Comparator&JobSchedulingInfo&() {
&&& public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
&&&&& int res = o1.getPriority().compareTo(o2.getPriority());
&&&& &if (res == 0) {
&&&&&&& if (o1.getStartTime() & o2.getStartTime()) {
&&&&&&&&& res = -1;
&&&&&&& } else {
&&&&&&&&& res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
&&&&& if (res == 0) {
&&&&&&& res = o1.getJobID().compareTo(o2.getJobID());
从上面看出,首先比较作业的优先级,若优先级相等则比较开始时间(FIFO),若再相等则比较作业ID。&我们在实现自己的调度器时可能要定义自己的作业队列,那么作业在队列中的顺序(即&JobSchedulingInfo的比较器&)就要仔细定义,这是调度器能够正常运行基础。
Hadoop中的作业调度采用pull方式,即TaskTracker定时向JobTracker发送心跳信息索取一个新的任务,这些信息包括数据结点上作业和任务的运行情况,以及该TaskTracker上的资源使用情况。JobTracker会依据以上信息更新作业队列的状态,并调用调度器选择一个或多个任务以心跳响应的形式返回给TaskTracker。从上面描述可以看出,JobTracker和taskScheduler之间的互相利用关系:前者利用后者为TaskTracker分配任务;后者利用前者更新队列和作业信息。接下来,我们一步步详述该过程。
首先,当一个心跳到达JobTracker时(实际上这是一个来自TaskTracker的远程过程调用&heartbeat方法&,协议接口是InterTrackerProtocol),会执行两种动作:更新状态和下达命令&【1】&。下达命令稍后关注。有关更新状态的一些代码片段如下:
if (!processHeartbeat(status, initialContact, now)) {
&&&&& if (prevHeartbeatResponse != null) {
&&&&&&& trackerToHeartbeatResponseMap.remove(trackerName);
&&&&& return new HeartbeatResponse(newResponseId,
&&&&&&&&&&&&&&&&&& new TaskTrackerAction[] {new ReinitTrackerAction()});
具体的心跳处理,由私有函数processHeartbeat完成。该函数中有以下两个方法调用:
updateTaskStatuses(trackerStatus);
&&& updateNodeHealthStatus(trackerStatus, timeStamp);
分别用来更新任务的状态和结点的健康状态。在第一个方法中有下面代码片段:
TaskInProgress tip = taskidToTIPMap.get(taskId);
&&&&& // Check if the tip is known to the jobtracker. In case of a restarted
&&&&& // jt, some tasks might join in later
&&&&& if (tip != null || hasRestarted()) {
&&&&&&& if (tip == null) {
&&&&&&&&& tip = job.getTaskInProgress(taskId.getTaskID());
&&&&&&&&& job.addRunningTaskToTIP(tip, taskId, status, false);
&&&&&&& // Update the job and inform the listeners if necessary
&&&&&&& JobStatus prevStatus = (JobStatus)job.getStatus().clone();
&&&&&&& // Clone TaskStatus object here, because JobInProgress
&&&&&&& // or TaskInProgress can modify this object and
&&&&&&& // the changes should not get reflected in TaskTrackerStatus.
&&&&&&& // An old TaskTrackerStatus is used later in countMapTasks, etc.
&&&&&&& job.updateTaskStatus(tip, (TaskStatus)report.clone());
&&&&&&& JobStatus newStatus = (JobStatus)job.getStatus().clone();
&&&&&&& // Update the listeners if an incomplete job completes
&&&&&&& if (prevStatus.getRunState() != newStatus.getRunState()) {
&&&&&&&&& JobStatusChangeEvent event =
&&&&&&&&&&& new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED,
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& prevStatus, newStatus);
&&&&&&&&& updateJobInProgressListeners(event);
&&&&& } else {
&&&&&&& LOG.info(&Serious problem.& While updating status, cannot find taskid &
&&&&&&&&&&&&&&&& + report.getTaskID());
这里的job对象通过从TaskTracker那里得到的task状态信息中抽取出来。注意,这里拷贝了原有作业状态的一个副本,然后修改这个副本的相关信息,调用的是updateJobStatus方法,更新任务的状态信息和JobInProgress的相关信息,如map和reduce任务的进度等,这里不展开了。这些信息的更新可以为调度器的工作提供依据。
作业状态的更新是通过updateJobInProgressListeners方法实现,该方法的参数是一个JobStatusChangeEvent对象,表示作业状态变化的事件。这种事件的类型可以是运行状态改变、开始时间改变、优先级改变等等。用户也可以根据需要自定义事件类型。事件对象维护了两个JobStatus对象,分别表示事件发生前后作业的状态。&
进入该方法后,我们又看到了熟悉的观察者模式:
// Update the listeners about the job
& // Assuming JobTracker is locked on entry.
& private void updateJobInProgressListeners(JobChangeEvent event) {
&&& for (JobInProgressListener listener : jobInProgressListeners) {
&&&&& listener.jobUpdated(event);
这次每个监听器要回调jobUpdated方法,表示作业有更新。对于jobQueueJobInProgressListener来说是这样做的:
& public synchronized void jobUpdated(JobChangeEvent event) {
&&& JobInProgress job = event.getJobInProgress();
&&& if (event instanceof JobStatusChangeEvent) {
&&&&& // Check if the ordering of the job has changed
&&&&& // For now priority and start-time can change the job ordering
&&&&& JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)
&&&&& JobSchedulingInfo oldInfo =&
&&&&&&& new JobSchedulingInfo(statusEvent.getOldStatus());
&&&&& if (statusEvent.getEventType() == EventType.PRIORITY_CHANGED
&&&&&&&&& || statusEvent.getEventType() == EventType.START_TIME_CHANGED) {
&&&&&&& // Make a priority change
&&&&&&& reorderJobs(job, oldInfo);
&&&&& } else if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) {
&&&&&&& // Check if the job is complete
&&&&&&& int runState = statusEvent.getNewStatus().getRunState();
&&&&&&& if (runState == JobStatus.SUCCEEDED
&&&&&&&&&&& || runState == JobStatus.FAILED
&&&&&&&&&&& || runState == JobStatus.KILLED) {
&&& &&&&&&jobCompleted(oldInfo);
首先,获取作业更新&前&的状态。然后根据事件的类型,进行相应的处理。比如,如果优先级变化了,则要重新排列队列中作业的顺序。这里直接取出原有作业,重新插入队列。插入后,作业会自动重新排序,体现了TreeMap的优越性。再比如,如果作业状态变为完成,那么就从队列中删除该作业。
private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo) {
&&& synchronized (jobQueue) {
&&&&& jobQueue.remove(oldInfo);
&&&&& jobQueue.put(new JobSchedulingInfo(job), job);
下面就是调度器中最关键的一步了:任务选择。此时,作业队列中信息已经更新完毕,可以选择一些任务返回给TaskTracker执行了。heartbeat方法接下来会有这样的代码:
List&Task& tasks = getSetupAndCleanupTasks(taskTrackerStatus);
& if (tasks == null ) {
&&& tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
如果不需要setup和cleanup,就说明需要选择map或reduce任务。调用TaskScheduler的assignTasks方法完成任务选择。由于篇幅限制,我打算将这部分内容放到下一篇文章中,并关注heartbeat中JobTracker下达的命令过程以及JobInProgress和TaskInProgress对调度有影响的一些字段。
3.1.3&&Hadoop使用
1.&&&&&&hdfs写流程
1.client链接namenode存数据
2.namenode记录一条数据位置信息(元数据),告诉client存哪。
3.client用hdfs的api将数据块(默认是64M)存储到datanode上。
4.datanode将数据水平备份。并且备份完将反馈client。
5.client通知namenode存储块完毕。
6.namenode将元数据同步到内存中。
7.另一块循环上面的过程。
2.&&&&&&hdfs读流程
1.client链接namenode,查看元数据,找到数据的存储位置。
2.client通过hdfs的api并发读取数据。
3.关闭连接。
3.&&&&&&举一个简单的例子说明mapreduce是怎么来运行的 ?
&&& Word count例子接口
============================
一个MapReduce作业(job)通常会把输入的数据集切分为若干独立的数据块,由map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序,然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。整个框架负责任务的调度和监控,以及重新执行已经失败的任务。
  通常,MapReduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。
  MapReduce框架由一个单独的master JobTracker和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务
4.&&&&&&用mapreduce来实现下面需求?现在有10个文件夹,每个文件夹都有1000000个url.现在让你找出top1000000url。
5.&&&&&&yarn流程
1)&&&&& 用户向YARN 中提交应用程序, 其中包括ApplicationMaster 程序、启动ApplicationMaster 的命令、用户程序等。
2)&&&&& ResourceManager 为该应用程序分配第一个Container, 并与对应的NodeManager 通信,要求它在这个Container 中启动应用程序的ApplicationMaster。
3)&&&&& ApplicationMaster 首先向ResourceManager 注册, 这样用户可以直接通过ResourceManage 查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4~7。
4)&&&&& ApplicationMaster 采用轮询的方式通过RPC 协议向ResourceManager 申请和领取资源。
5)&&&&& 一旦ApplicationMaster 申请到资源后,便与对应的NodeManager 通信,要求它启动任务。
6)&&&&& NodeManager 为任务设置好运行环境(包括环境变量、JAR 包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
7)&&&&& 各个任务通过某个RPC 协议向ApplicationMaster 汇报自己的状态和进度,以让ApplicationMaster 随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。在应用程序运行过程中,用户可随时通过RPC 向ApplicationMaster 查询应用程序的当前运行状态。
8)&&&&& 应用程序运行完成后,ApplicationMaster 向ResourceManager 注销并关闭自己。
6.&&&&&&的
7.&&&&&&的
8.&&&&&&的
9.&&&&&&的
3.2&&&&&Hive叙述题
3.2.1&&Hive基础
1.&&&&&&hive 有哪些方式保存元数据,各有哪些特点?
1、内存数据库derby,安装小,但是数据存在内存,不稳定
2、mysql数据库,数据存储模式可以自己设置,持久化好,查看方便。
2.&&&&&&hive内部表和外部表的区别
内部表:加载数据到hive所在的hdfs目录,删除时,元数据和数据文件都删除
外部表:不加载数据到hive所在的hdfs目录,删除时,只删除表结构。
3.&&&&&&生产环境中为什么建议使用外部表?
1、因为外部表不会加载数据到hive,减少数据传输、数据还能共享。
2、hive不会修改数据,所以无需担心数据的损坏
3、删除表时,只删除表结构、不删除数据。
4.&&&&&&你们数据库怎么导入hive 的,有没有出现问题
在导入hive的时候,如果数据库中有blob或者text字段,会报错。有个参数limit
5.&&&&&&简述Hive中的虚拟列作用是什么,使用它的注意事项
Hive提供了三个虚拟列:
INPUT__FILE__NAME
BLOCK__OFFSET__INSIDE__FILE
ROW__OFFSET__INSIDE__BLOCK
但ROW__OFFSET__INSIDE__BLOCK默认是不可用的,需要设置hive.exec.rowoffset为true才可以。可以用来排查有问题的输入数据。
INPUT__FILE__NAME, mapper任务的输出文件名。
BLOCK__OFFSET__INSIDE__FILE, 当前全局文件的偏移量。对于块压缩文件,就是当前块的文件偏移量,即当前块的第一个字节在文件中的偏移量。
hive& SELECT INPUT__FILE__NAME, BLOCK__OFFSET__INSIDE__FILE, line
& FROM hive_text WHERE line LIKE '%hive%' LIMIT 2;
har://file/user/hive/warehouse/hive_text/folder=docs/
data.har/user/hive/warehouse/hive_text/folder=docs/README.txt& 2243
har://file/user/hive/warehouse/hive_text/folder=docs/
data.har/user/hive/warehouse/hive_text/folder=docs/README.txt& 3646
6.&&&&&&hive partition分区
分区表,动态分区
7.&&&&&&insert into 和 override write区别?
insert&into:将某一张表中的数据写到另一张表中
override&write:覆盖之前的内容。
8.&&&&&&假如一个分区的数据主部错误怎么通过hivesql删除hdfs
alter table ptable drop partition (daytime='',city='bj');
元数据,数据文件都删除,但目录daytime= 还在
9.&&&&&&Hive里面用什么代替in查询
提示:Hive中的left semi join替换sql中的in操作
3.3&&&&&Hbase
3.3.1&&Hbase基础
1.&&&&&&介绍一下 hbase 过滤器
2.&&&&&&hbase 集群安装注意事项
3.&&&&&&hbase的rowkey怎么创建好?列族怎么创建比较好?
hbase存储时,数据按照Row key的字典序(byte order)排序存储。设计key时,要充分排序存储这个特性,将经常一起读取的行存储放到一起。(位置相关性)
一个列族在数据底层是一个文件,所以将经常一起查询的列放到一个列族中,列族尽量少,减少文件的寻址时间。
因为hbase是列式数据库,列非表schema的一部分,所以在设计初期只需要考虑rowkey 和 columnFamily即可,rowkey有位置相关性,所以如果数据是练习查询的,最好对同类数据加一个前缀,而每个columnFamily实际上在底层是一个文件,那么文件越小,查询越快,所以讲经常一起查询的列设计到一个列簇,但是列簇不宜过多。
&Rowkey长度原则
Rowkey是一个二进制码流,Rowkey的长度被很多开发者建议说设计在10~100个字节,不过建议是越短越好,不要超过16个字节。
原因如下:
(1)数据的持久化文件HFile中是按照KeyValue存储的,如果Rowkey过长比如100个字节,1000万列数据光Rowkey就要占用100*1000万=10亿个字节,将近1G数据,这会极大影响HFile的存储效率;
(2)MemStore将缓存部分数据到内存,如果Rowkey字段过长内存的有效利用率会降低,系统将无法缓存更多的数据,这会降低检索效率。因此Rowkey的字节长度越短越好。
(3)目前操作系统是都是64位系统,内存8字节对齐。控制在16个字节,8字节的整数倍利用操作系统的最佳特性。
Rowkey散列原则
如果Rowkey是按时间戳的方式递增,不要将时间放在二进制码的前面,建议将Rowkey的高位作为散列字段,由程序循环生成,低位放时间字段,这样将提高数据均衡分布在每个Regionserver实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息将产生所有新数据都在一个 RegionServer上堆积的热点现象,这样在做数据检索的时候负载将会集中在个别RegionServer,降低查询效率。
Rowkey唯一原则
必须在设计上保证其唯一性。
4.&&&&&&简述Hbase性能优化的思路
1、在库表设计的时候,尽量考虑rowkey和columnfamily的特性
2、进行hbase集群的调优
5.&&&&&&简述Hbase filter的实现原理是什么?结合实际项目经验,写出几个使用filter的场景。
hbase的filter是通过scan设置的,所以是基于scan的查询结果进行过滤。
1.在进行订单开发的时候,我们使用rowkeyfilter过滤出某个用户的所有订单
2.在进行云笔记开发时,我们使用rowkey过滤器进行redis数据的恢复。
6.&&&&&&ROWKEY的后缀匹配怎么实现?列如ROWKEY是yyyyMMDD-UserID形式,如UserID为条件查询数据,怎么实现。
7.&&&&&&HBase的检索支持3种方式:
(1) 通过单个Rowkey访问,即按照某个Rowkey键值进行get操作,这样获取唯一一条记录;
(2) 通过Rowkey的range进行scan,即通过设置startRowKey和endRowKey,在这个范围内进行扫描。这样可以按指定的条件获取一批记录;
(3) 全表扫描,即直接扫描整张表中所有行记录。
8.&&&&&&简述HBase的瓶颈
HBase的瓶颈就是硬盘传输速度。HBase的操作,它可以往数据里面insert,也可以update一些数据,但update的实际上也是insert,只是插入一个新的时间戳的一行。Delete数据,也是insert,只是insert一行带有delete标记的一行。Hbase的所有操作都是追加插入操作。Hbase是一种日志集数据库。它的存储方式,像是日志文件一样。它是批量大量的往硬盘中写,通常都是以文件形式的读写。这个读写速度,就取决于硬盘与机器之间的传输有多快。而Oracle的瓶颈是硬盘寻道时间。它经常的操作时随机读写。要update一个数据,先要在硬盘中找到这个block,然后把它读入内存,在内存中的缓存中修改,过段时间再回写回去。由于你寻找的block不同,这就存在一个随机的读。硬盘的寻道时间主要由转速来决定的。而寻道时间,技术基本没有改变,这就形成了寻道时间瓶颈。
9.&&&&&&Hbase内部是什么机制?
在HMaster、RegionServer内部,创建了RpcServer实例,并与Client三者之间实现了Rpc调用,HBase0.95内部引入了Google-Protobuf作为中间数据组织方式,并在Protobuf提供的Rpc接口之上,实现了基于服务的Rpc实现,本文详细阐述了HBase-Rpc实现细节。
HBase的RPC Protocol
&在HMaster、RegionServer内部,实现了rpc 多个protocol来完成管理和应用逻辑,具体如下protocol如下:
HMaster支持的Rpc协议:
MasterMonitorProtocol,Client与Master之间的通信,Master是RpcServer端,主要实现HBase集群监控的目的。
MasterAdminProtocol,Client与Master之间的通信,Master是RpcServer端,主要实现HBase表格的管理。例如TableSchema的更改,Table-Region的迁移、合并、下线(Offline)、上线(Online)以及负载平衡,以及Table的删除、快照等相关功能。
RegionServerStatusProtoco,RegionServer与Master之间的通信,Master是RpcServer端,负责提供RegionServer向HMaster状态汇报的服务。
RegionServer支持的Rpc协议:
ClientProtocol,Client与RegionServer之间的通信,RegionServer是RpcServer端,主要实现用户的读写请求。例如get、multiGet、mutate、scan、bulkLoadHFile、执行Coprocessor等。
AdminProtocols,Client与RegionServer之间的通信,RegionServer是RpcServer端,主要实现Region、服务、文件的管理。例如storefile信息、Region的操作、WAL操作、Server的开关等。
(备注:以上提到的Client可以是用户Api、也可以是RegionServer或者HMaster)
&HBase-RPC实现机制分析
RpcServer配置三个队列:
1)普通队列callQueue,绝大部分Call请求存在该队列中:callQueue上maxQueueLength为${ipc.server.max.callqueue.length},默认是${hbase.master.handler.count}*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER,目前0.95.1中,每个Handler上CallQueue的最大个数默认值(DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER)为10。
2)优先级队列: PriorityQueue。如果设置priorityHandlerCount的个数,会创建与callQueue相当容量的queue存储Call,该优先级队列对应的Handler的个数由rpcServer实例化时传入。
3)拷贝队列:replicationQueue。由于RpcServer由HMaster和RegionServer共用,该功能仅为RegionServer提供,queue的大小为${ipc.server.max.callqueue.size}指定,默认为24,handler的个数为hbase.regionserver.replication.handler.count。
RpcServer由三个模块组成:
Listener ===Queue=== Responder
这里以HBaseAdmin.listTables为例,&&&& 分析一个Rpc请求的函数调用过程:
1) RpcClient创建一个BlockingRpcChannel。
2)以channel为参数创建执行RPC请求需要的stub,此时的stub已经被封装在具体Service下,stub下定义了可执行的rpc接口。
3)stub调用对应的接口,实际内部channel调用callBlockingMethod方法。
RpcClient内实现了protobuf提供的BlockingRpcChannel接口方法callBlockingMethod,& @OverridepublicMessage callBlockingMethod( md, RpcController controller,Message param, Message returnType)throwsServiceException {returnthis.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket,this.isa, this.rpcTimeout);}
通过以上的实现细节,最终转换成rpcClient的调用,使用MethodDescriptor封装了不同rpc函数,使用Message基类可以接收基于Message的不同的Request和Response对象。
4)RpcClient创建Call对象,查找或者创建合适的Connection,并唤醒Connection。
5)Connection等待Call的Response,同时rpcClient调用函数中,会使用connection.writeRequest(Call call)将请求写入到RpcServer网络流中。
6)等待Call的Response,然后层层返回给更上层接口,从而完成此次RPC调用。
RPCServer收到的Rpc报文的内部组织如下:
AuthMethod
Connection
HeaderLength
ConnectionHeader
验证RpcServer的CURRENT_VERSION
与RPC报文一致
目前支持三类:
AuthMethod.SIMPLE
AuthMethod.KERBEROS
AuthMethod.DIGEST
RPC.proto定义
RPCProtos.ConnectionHeader
message ConnectionHeader {
optional UserInformation userInfo = 1;
optional string serviceName = 2;
// Cell block codec we will use sending over optional cell blocks.& Server throws exception
// if cannot deal.
optional string cellBlockCodecClass = 3 [default = &org.apache.hadoop.hbase.codec.KeyValueCodec&];
// Compressor we will use if cell block is compressed.& Server will throw exception if not supported.
// Class must implement hadoop’s CompressionCodec Interface
optional string cellBlockCompressorClass = 4;
序列化之后的数据
整个Request存储是经过编码之后的byte数组,包括如下几个部分:
RequestHeaderLength(RawVarint32)
RequestHeader
ParamSize(RawVarint32)
CellScanner
RPC.proto定义:
message RequestHeader {
// Monotonically increasing callId to keep track of RPC requests and their response
optional uint32 callId = 1;
optional RPCTInfo traceInfo = 2;
optional string methodName = 3;
// If true, then a pb Message param follows.
optional bool requestParam = 4;
// If present, then an encoded data block follows.
optional CellBlockMeta cellBlockMeta = 5;
// TODO: Have client specify priority
序列化之后的数据
并从Header中确认是否存在Param和CellScanner,如果确认存在的情况下,会继续访问。
Protobuf的基本类型Message,
Request的Param继承了Message,
这个需要获取的Method类型决定。
从功能上讲,RpcServer上包含了三个模块,
1)Listener。包含了多个Reader线程,通过Selector获取ServerSocketChannel接收来自RpcClient发送来的Connection,并从中重构Call实例,添加到CallQueue队列中。
&”IPC Server listener on 60021″ daemon prio=10 tid=0xa97800 nid=0x14c6 runnable [0xd0000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
- locked &0xcae68& (a sun.nio.ch.Util$2)
- locked &0xcae50& (a java.util.Collections$UnmodifiableSet)
- locked &0x2ca8& (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:84)
at org.apache.hadoop.hbase.ipc.RpcServer$Listener.run(RpcServer.java:646)
2)Handler。负责执行Call,调用Service的方法,然后返回Pair&Message,CellScanner&
“IPC Server handler 0 on 60021″ daemon prio=10 tid=0xeab000 nid=0x14c7 waiting on condition [0xcf000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for& &0xcad90& (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at org.apache.hadoop.hbase.ipc.RpcServer$Handler.run(RpcServer.java:1804)
3) Responder。负责把Call的结果返回给RpcClient。
&”IPC Server Responder” daemon prio=10 tid=0xa97000 nid=0x14c5 runnable [0xd1000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
- locked &0x7078& (a sun.nio.ch.Util$2)
- locked &0x7060& (a java.util.Collections$UnmodifiableSet)
- locked &0x5b68& (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
at org.apache.hadoop.hbase.ipc.RpcServer$Responder.doRunLoop(RpcServer.java:833)
at org.apache.hadoop.hbase.ipc.RpcServer$Responder.run(RpcServer.java:816)
RpcClient为Rpc请求建立Connection,通过Connection将Call发送RpcServer,然后RpcClient等待结果的返回。
3.4&&&&&Zookeeper
1.&&&&&&写出你对 zookeeper 的理解
2.&&&&&&的
3.&&&&&&的
4.&&&&&&的
3.5&&&&&Storm
1.&&&&&&storm 如果碰上了复杂逻辑,需要算很长的时间,你怎么去优化
拆分复杂的业务到多个bolt中,这样可以利用bolt的tree将速度提升
提高并行度
2.&&&&&&开发流程,容错机制
开发流程:
写主类(设计spout和bolt的分发机制)
写spout收集数据
写bolt处理数据,根据数据量和业务的复杂程度,设计并行度。
容错机制:
采用ack和fail进行容错,失败的数据重新发送。
3.&&&&&&storm和spark-streaming:为什么用storm不同spark-streaming
4.&&&&&&的
5.&&&&&&的
6.&&&&&&的
7.&&&&&&的
3.6&&&&&Flume
1.&&&&&&flume管道内存,flume宕机了数据丢失怎么解决
1、Flume的channel分为很多种,可以将数据写入到文件
2、防止非首个agent宕机的方法数可以做集群或者主备
2.&&&&&&flume配置方式,flume集群(问的很详细)
Flume的配置围绕着source、channel、sink叙述,flume的集群是做在agent上的,而非机器上。
3.&&&&&&flume不采集Nginx日志,通过Logger4j采集日志,优缺点是什么?
优点:Nginx的日志格式是固定的,但是缺少sessionid,通过logger4j采集的日志是带有sessionid的,而session可以通过redis共享,保证了集群日志中的同一session落到不同的tomcat时,sessionId还是一样的,而且logger4j的方式比较稳定,不会宕机。
缺点:不够灵活,logger4j的方式和项目结合过于紧密,而flume的方式比较灵活,拔插式比较好,不会影响项目性能。
4.&&&&&&flume和kafka采集日志区别,采集日志时中间停了,怎么记录之前的日志。
Flume采集日志是通过流的方式直接将日志收集到存储层,而kafka试讲日志缓存在kafka集群,待后期可以采集到存储层。
Flume采集中间停了,可以采用文件的方式记录之前的日志,而kafka是采用offset的方式记录之前的日志。
5.&&&&&&的
6.&&&&&&的
3.7&&&&&Kafka
1.&&&&&&Kafka容错机制
分区备份,存在主备partition
2.&&&&&&kafka数据流向
Producer à leader partition à follower partition(半数以上) àconsumer
3.&&&&&&kafka+spark-streaming结合丢数据怎么解决?
spark streaming从1.2开始提供了数据的零丢失,想享受这个特性,需要满足如下条件:
数据输入需要可靠的sources和可靠的receivers
应用metadata必须通过应用driver checkpoint
WAL(write ahead log)
可靠的sources和receivers
spark streaming可以通过多种方式作为数据sources(包括kafka),输入数据通过receivers接收,通过replication存储于spark中(为了faultolerance,默认复制到两个spark executors),如果数据复制完成,receivers可以知道(例如kafka中更新offsets到zookeeper中)。这样当receivers在接收数据过程中crash掉,不会有数据丢失,receivers没有复制的数据,当receiver恢复后重新接收。
metadata checkpoint
可靠的sources和receivers,可以使数据在receivers失败后恢复,然而在driver失败后恢复是比较复杂的,一种方法是通过checkpoint metadata到HDFS或者S3。metadata包括:
configuration
一些排队等待处理但没有完成的RDD(仅仅是metadata,而不是data)
这样当driver失败时,可以通过metadata checkpoint,重构应用程序并知道执行到那个地方。
数据可能丢失的场景
可靠的sources和receivers,以及metadata checkpoint也不可以保证数据的不丢失,例如:
两个executor得到计算数据,并保存在他们的内存中
receivers知道数据已经输入
executors开始计算数据
driver突然失败
driver失败,那么executors都会被kill掉
因为executor被kill掉,那么他们内存中得数据都会丢失,但是这些数据不再被处理
executor中的数据不可恢复
为了避免上面情景的出现,spark streaming 1.2引入了WAL。所有接收的数据通过receivers写入HDFS或者S3中checkpoint目录,这样当driver失败后,executor中数据丢失后,可以通过checkpoint恢复。
At-Least-Once
尽管WAL可以保证数据零丢失,但是不能保证exactly-once,例如下面场景:
Receivers接收完数据并保存到HDFS或S3
在更新offset前,receivers失败了
Spark Streaming以为数据接收成功,但是Kafka以为数据没有接收成功,因为offset没有更新到zookeeper
随后receiver恢复了
从WAL可以读取的数据重新消费一次,因为使用的kafka High-Level消费API,从zookeeper中保存的offsets开始消费
通过上面描述,WAL有两个缺点:
降低了receivers的性能,因为数据还要存储到HDFS等分布式文件系统
对于一些resources,可能存在重复的数据,比如Kafka,在Kafka中存在一份数据,在Spark Streaming也存在一份(以WAL的形式存储在hadoop API兼容的文件系统中)
Kafka direct API
为了WAL的性能损失和exactly-once,spark streaming1.3中使用Kafka direct API。非常巧妙,Spark driver计算下个batch的offsets,指导executor消费对应的topics和partitions。消费Kafka消息,就像消费文件系统文件一样。
不再需要kafka receivers,executor直接通过Kafka API消费数据
WAL不再需要,如果从失败恢复,可以重新消费
exactly-once得到了保证,不会再从WAL中重复读取数据
主要说的是spark streaming通过各种方式来保证数据不丢失,并保证exactly-once,每个版本都是spark streaming越来越稳定,越来越向生产环境使用发展。
4.&&&&&&kafka中存储目录data/dir.....topic1和topic2怎么存储的,存储结构,data.....目录下有多少个分区,每个分区的存储格式是什么样的?
1、topic是按照“主题名-分区”存储的
2、分区个数由配置文件决定
3、每个分区下最重要的两个文件是.log和000000.index,0000000.log以默认1G大小回滚。
5.&&&&&&的
6.&&&&&&D 的
3.8&&&&&Spark
1.&&&&&&mr和spark区别,怎么理解spark-rdd
Mr是文件方式的分布式计算框架,是将中间结果和最终结果记录在文件中,map和reduce的数据分发也是在文件中。
spark是内存迭代式的计算框架,计算的中间结果可以缓存内存,也可以缓存硬盘,但是不是每一步计算都需要缓存的。
Spark-rdd是一个数据的分区记录集合………………
2.&&&&&&Spark应用转换流程
1、spark应用提交后,经历了一系列的转换,最后成为task在每个节点上执行
2、RDD的Action算子触发Job的提交,生成RDD DAG
3、由DAGScheduler将RDD DAG转化为Stage DAG,每个Stage中产生相应的Task集合
4、TaskScheduler将任务分发到Executor执行
5、每个任务对应相应的一个数据块,只用用户定义的函数处理数据块
3.&&&&&&Driver运行在Worker上
通过org.apache.spark.deploy.Client类执行作业,作业运行命令如下:
作业执行流程描述:
1、客户端提交作业给Master
2、Master让一个Worker启动Driver,即SchedulerBackend。Worker创建一个DriverRunner线程,DriverRunner启动SchedulerBackend进程。
3、另外Master还会让其余Worker启动Exeuctor,即ExecutorBackend。Worker创建一个ExecutorRunner线程,ExecutorRunner会启动ExecutorBackend进程。
4、ExecutorBackend启动后会向Driver的SchedulerBackend注册。SchedulerBackend进程中包含DAGScheduler,它会根据用户程序,生成执行计划,并调度执行。对于每个stage的task,都会被存放到TaskScheduler中,ExecutorBackend向SchedulerBackend汇报的时候把TaskScheduler中的task调度到ExecutorBackend执行。
5、所有stage都完成后作业结束。
4.&&&&&&Driver运行在客户端
作业执行流程描述:
1、客户端启动后直接运行用户程序,启动Driver相关的工作:DAGScheduler和BlockManagerMaster等。
2、客户端的Driver向Master注册。
3、Master还会让Worker启动Exeuctor。Worker创建一个ExecutorRunner线程,ExecutorRunner会启动ExecutorBackend进程。
4、ExecutorBackend启动后会向Driver的SchedulerBackend注册。Driver的DAGScheduler解析作业并生成相应的Stage,每个Stage包含的Task通过TaskScheduler分配给Executor执行。
5、所有stage都完成后作业结束。
5.&&&&&&的
6.&&&&&&的
7.&&&&&&的
8.&&&&&&的
3.9&&&&&Sqoop
1.&&&&&&命令:
sqoop import --connect jdbc:mysql://192.168.56.204:3306/sqoop --username hive --password hive --table jobinfo --target-dir /sqoop/test7 --inline-lob-limit
--fields-terminated-by '\t' -m 2
sqoop create-hive-table --connect jdbc:mysql://192.168.56.204:3306/sqoop --table jobinfo --username hive --password hive --hive-table sqtest --fields-terminated-by &\t& --lines-terminated-by &\n&;
2.&&&&&&sqoop在导入数据到mysql中,如何让数据不重复导入?如果存在数据问题sqoop如何处理?
Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
首先需以下要准备:
第一:hadoop的NameNode节点下lib文件夹中要有相应数据库驱动的jar包和sqoop的jar包。
第二:预先在相应的数据库创建Table,注:在HDFS的某个目录上的数据格式要和相应的表中的字段数量一致。
由于我这里使用的是Oracle数据库并且是使用Java来操作的。所以下面的代码以及截图都是以Java的例子:
首先标准化HDFS中文件格式,如下图:
Java代码如下:
Configuration conf = new Configuration();
conf.set(&fs.default.name&, &hdfs://192.168.115.5:9000&);
conf.set(&hadoop.job.ugi&, &hadooper,hadoopgroup&);
conf.set(&mapred.job.tracker&, &192.168.115.5:9001&);
ArrayList&String& list = new ArrayList&String&(); // 定义一个list
list.add(&--table&);
list.add(&A_BAAT_CLIENT&); // Oracle中的表。将来数据要导入到这个表中。
list.add(&--export-dir&);
list.add(&/home/hadoop/traffic/capuse/near7date/activeUser/capuse_near7_activeUser_.log&); // hdfs上的目录。这个目录下的数据要导入到a_baat_client这个表中。
list.add(&--connect&);
list.add(&jdbc:oracle:thin:@10.18.96.107:1521:life&); // Oracle的链接
list.add(&--username&);
list.add(&TRAFFIC&); // Oracle的用户名
list.add(&--password&);
list.add(&TRAFFIC&); // Oracle的密码
list.add(&--input-fields-terminated-by&);
list.add(&|&); // 数据分隔符号
list.add(&-m&);
list.add(&1&);// 定义mapreduce的数量。
String[] arg = new String[1];
ExportTool exporter = new ExportTool();
Sqoop sqoop = new Sqoop(exporter);
sqoop.setConf(conf);
arg = list.toArray(new String[0]);
int result = Sqoop.runSqoop(sqoop, arg);
System.out.println(&res:& + result); // 打印执行结果。
最后再在Main方法中运行即可,生成后表数据如下图所示:
通过上面的操作以及代码即可在Java中实现把HDFS数据生成对应的表数据;
不过除了可以用Java来实现,使用基本的命令也是可以的,命令如下:
在Hadoo

我要回帖

更多关于 cloudera manager官网 的文章

 

随机推荐