Storm
1.介绍¶
1.1 什么是Apache storm¶
Apache Storm是一个分布式实时大数据处理系统。Storm设计用于在容错和水平可扩展方法中处理大量数据。它是一个流数据框架,具有最高的摄取率。虽然Storm是无状态的,它通过Apache ZooKeeper管理分布式环境和集群状态。它很简单,您可以并行地对实时数据执行各种操作。Apache Storm继续成为实时数据分析的领导者。Storm易于设置和操作,并且它保证每个消息将通过拓扑至少处理一次。
1.2 storm vs hadoop¶
基本上Hadoop和Storm框架用于分析大数据。两者互补,在某些方面有所不同。Apache Storm执行除持久性之外的所有操作,而Hadoop在所有方面都很好,但滞后于实时计算。下表比较了Storm和Hadoop的属性。
Storm | Hadoop |
---|---|
实时流处理 | 批量处理 |
无状态 | 有状态 |
主/从架构与基于**ZooKeeper**的协调。主节点称为nimbus,从属节点是**主管**。 | 具有/不具有基于ZooKeeper的协调的主 - 从结构。主节点是**作业跟踪器**,从节点是**任务跟踪器**。 |
Storm流过程在集群上每秒可以访问数万条消息。 | Hadoop分布式文件系统(HDFS)使用MapReduce框架来处理大量的数据,需要几分钟或几小时。 |
Storm拓扑运行直到用户关闭或意外的不可恢复故障。 | MapReduce作业按顺序执行并最终完成。 |
两者都是分布式和容错的 | |
如果nimbus / supervisor死机,重新启动使它从它停止的地方继续,因此没有什么受到影响。 | 如果JobTracker死机,所有正在运行的作业都会丢失。 |
1.3 storm优势¶
- Storm是开源的,强大的,用户友好的。它可以用于小公司和大公司。
- Storm是容错的,灵活的,可靠的,并且支持任何编程语言。
- 允许实时流处理。
- Storm是令人难以置信的快,因为它具有巨大的处理数据的力量。
- Storm可以通过线性增加资源来保持性能,即使在负载增加的情况下。它是高度可扩展的。
- Storm在几秒钟或几分钟内执行数据刷新和端到端传送响应取决于问题。它具有非常低的延迟。
- Storm有操作智能。
- Storm提供保证的数据处理,即使群集中的任何连接的节点死或消息丢失。
2. storm 核心¶
Apache Storm从一端读取实时数据的原始流,并将其传递通过一系列小处理单元,并在另一端输出处理/有用的信息。下图描述了Apache Storm的核心概念。
2.1 storm 组件¶
组件 | 描述 |
---|---|
Tuple | Tuple是Storm中的主要数据结构。它是有序元素的列表。默认情况下,Tuple支持所有数据类型。通常,它被建模为一组逗号分隔的值,并传递到Storm集群。 |
Stream | 流是元组的无序序列。 |
Spouts | 流的源。通常,Storm从原始数据源(如Twitter Streaming API,Apache Kafka队列,Kestrel队列等)接受输入数据。否则,您可以编写spouts以从数据源读取数据。“ISpout”是实现spouts的核心接口,一些特定的接口是IRichSpout,BaseRichSpout,KafkaSpout等。 |
Bolts | Bolts是逻辑处理单元。Spouts将数据传递到Bolts和Bolts过程,并产生新的输出流。Bolts可以执行过滤,聚合,加入,与数据源和数据库交互的操作。Bolts接收数据并发射到一个或多个Bolts。 “IBolt”是实现Bolts的核心接口。一些常见的接口是IRichBolt,IBasicBolt等。 |
Nimbus | 即Storm的Master,负责资源分配和任务调度。一个Storm集群只有一个Nimbus。 |
Supervisor | 即Storm的Slave,负责接收Nimbus分配的任务,管理所有Worker,一个Supervisor节点中包含多个Worker进程。 |
Worker | 工作进程,每个工作进程中都有多个Task。 |
Task | 任务,在 Storm 集群中每个 Spout 和 Bolt 都由若干个任务(tasks)来执行。每个任务都与一个执行线程相对应。 |
Topology | 计算拓扑,Storm 的拓扑是对实时计算应用逻辑的封装,它的作用与 MapReduce 的任务(Job)很相似,区别在于 MapReduce 的一个 Job 在得到结果之后总会结束,而拓扑会一直在集群中运行,直到你手动去终止它。拓扑还可以理解成由一系列通过数据流(Stream Grouping)相互关联的 Spout 和 Bolt 组成的的拓扑结构。 |
Stream | 数据流(Streams)是 Storm 中最核心的抽象概念。一个数据流指的是在分布式环境中并行创建、处理的一组元组(tuple)的无界序列。数据流可以由一种能够表述数据流中元组的域(fields)的模式来定义。 |
Spout | 数据源(Spout)是拓扑中数据流的来源。一般 Spout 会从一个外部的数据源读取元组然后将他们发送到拓扑中。根据需求的不同,Spout 既可以定义为可靠的数据源,也可以定义为不可靠的数据源。一个可靠的 Spout能够在它发送的元组处理失败时重新发送该元组,以确保所有的元组都能得到正确的处理;相对应的,不可靠的 Spout 就不会在元组发送之后对元组进行任何其他的处理。一个 Spout可以发送多个数据流。 |
Bolt | 拓扑中所有的数据处理均是由 Bolt 完成的。通过数据过滤(filtering)、函数处理(functions)、聚合(aggregations)、联结(joins)、数据库交互等功能,Bolt 几乎能够完成任何一种数据处理需求。一个 Bolt 可以实现简单的数据流转换,而更复杂的数据流变换通常需要使用多个 Bolt 并通过多个步骤完成。 |
Stream grouping | 为拓扑中的每个 Bolt 的确定输入数据流是定义一个拓扑的重要环节。数据流分组定义了在 Bolt 的不同任务(tasks)中划分数据流的方式。在 Storm 中有八种内置的数据流分组方式。 |
Reliability | 可靠性。Storm 可以通过拓扑来确保每个发送的元组都能得到正确处理。通过跟踪由 Spout 发出的每个元组构成的元组树可以确定元组是否已经完成处理。每个拓扑都有一个“消息延时”参数,如果 Storm 在延时时间内没有检测到元组是否处理完成,就会将该元组标记为处理失败,并会在稍后重新发送该元组。 |
2.2 拓扑¶
Spouts和Bolts连接在一起,形成拓扑结构。实时应用程序逻辑在Storm拓扑中指定。简单地说,拓扑是有向图,其中顶点是计算,边缘是数据流。
简单拓扑从spouts开始。Spouts将数据发射到一个或多个Bolts。Bolt表示拓扑中具有最小处理逻辑的节点,并且Bolts的输出可以发射到另一个Bolts作为输入。
Storm保持拓扑始终运行,直到您终止拓扑。Apache Storm的主要工作是运行拓扑,并在给定时间运行任意数量的拓扑。
2.3 任务¶
现在你有一个关于Spouts和Bolts的基本想法。它们是拓扑的最小逻辑单元,并且使用单个Spout和Bolt阵列构建拓扑。应以特定顺序正确执行它们,以使拓扑成功运行。Storm执行的每个Spout和Bolt称为“任务”。简单来说,任务是Spouts或Bolts的执行。在给定时间,每个Spout和Bolt可以具有在多个单独的螺纹中运行的多个实例。
2.4 进程¶
拓扑在多个工作节点上以分布式方式运行。Storm将所有工作节点上的任务均匀分布。工作节点的角色是监听作业,并在新作业到达时启动或停止进程。
2.5 流分组¶
数据流从Spouts流到Bolts,或从一个Bolts流到另一个Bolts。流分组控制元组在拓扑中的路由方式,并帮助我们了解拓扑中的元组流。有四个内置分组,如下所述。
2.6 随机分组¶
在随机分组中,相等数量的元组随机分布在执行Bolts的所有工人中。下图描述了结构。
2.7 字段分组¶
元组中具有相同值的字段组合在一起,其余的元组保存在外部。然后,具有相同字段值的元组被向前发送到执行Bolts的同一进程。例如,如果流由字段“字”分组,则具有相同字符串“Hello”的元组将移动到相同的工作者。下图显示了字段分组的工作原理。
2.8 全局分组¶
所有流可以分组并向前到一个Bolts。此分组将源的所有实例生成的元组发送到单个目标实例(具体来说,选择具有最低ID的工作程序)。
2.9 所有分组¶
所有分组将每个元组的单个副本发送到接收Bolts的所有实例。这种分组用于向Bolts发送信号。所有分组对于连接操作都很有用。
3. 部署¶
3.1 下载地址¶
http://mirror.bit.edu.cn/apache/storm/
3.2 集群环境¶
序列号 | IP地址 | 主机名 | 角色 | 安装软件 |
---|---|---|---|---|
1 | 192.168.186.10 | master | hadoop master |
zookeeper |
2 | 192.168.186.11 | slave1 | hadoop slave1 |
zookeeper |
3 | 192.168.186.12 | slave2 | hadoop slave2 |
zookeeper |
仍然是这三台机器
3.3 安装¶
安装之前线检查jdk
[root@master ~]# java -version java version "1.8.0_172" Java(TM) SE Runtime Environment (build 1.8.0_172-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)
其他两台机器省略,保证三台机器能均安装
JDK
,,同时检查zookeeper
是否安装[storm 依赖zookeeper
]
[root@master ~]# cd /usr/local/src/ [root@master src]# ls apache-storm-0.9.3.master.tgz apache-storm-0.9.3.master.tgz [root@master src]# tar xf apache-storm-0.9.3.master.tgz
3.4 配置¶
[root@master conf]# egrep -v '#|^$' storm.yaml storm.zookeeper.servers: - "master" - "slave1" - "slave2" nimbus.host: "master" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 - 6704
配置文件解释
配置项 | 配置说明 |
---|---|
storm.zookeeper.servers | ZooKeeper服务器列表 |
storm.zookeeper.port | ZooKeeper连接端口 |
storm.local.dir | storm使用的本地文件系统目录(必须存在并且storm进程可读写) |
storm.cluster.mode | Storm集群运行模式([distributed|local]) |
storm.local.mode.zmq | Local模式下是否使用ZeroMQ作消息系统,如果设置为false则使用Java消息系统。默认为false |
storm.zookeeper.root | ZooKeeper中Storm的根目录位置 |
storm.zookeeper.session.timeout | 客户端连接ZooKeeper超时时间 |
storm.id | 运行中拓扑的id,由storm name和一个唯一随机数组成。 |
nimbus.host | nimbus服务器地址 |
nimbus.thrift.port | nimbus的thrift监听端口 |
nimbus.childopts | 通过storm-deploy项目部署时指定给nimbus进程的jvm选项 |
nimbus.task.timeout.secs | 心跳超时时间,超时后nimbus会认为task死掉并重分配给另一个地址。 |
nimbus.monitor.freq.secs | nimbus检查心跳和重分配任务的时间间隔.注意如果是机器宕掉nimbus会立即接管并处理。 |
nimbus.supervisor.timeout.secs | supervisor的心跳超时时间,一旦超过nimbus会认为该supervisor已死并停止为它分发新任务. |
nimbus.task.launch.secs | task启动时的一个特殊超时设置.在启动后第一次心跳前会使用该值来临时替代nimbus.task.timeout.secs. |
nimbus.reassign | 当发现task失败时nimbus是否重新分配执行。默认为真,不建议修改。 |
nimbus.file.copy.expiration.secs | nimbus判断上传/下载链接的超时时间,当空闲时间超过该设定时nimbus会认为链接死掉并主动断开 |
ui.port | Storm UI的服务端口 |
drpc.servers | DRPC服务器列表,以便DRPCSpout知道和谁通讯 |
drpc.port | Storm DRPC的服务端口 |
supervisor.slots.ports | supervisor上能够运行workers的端口列表.每个worker占用一个端口,且每个端口只运行一个worker.通过这项配置可以调整每台机器上运行的worker数.(调整slot数/每机) |
supervisor.childopts | 在storm-deploy项目中使用,用来配置supervisor守护进程的jvm选项 |
supervisor.worker.timeout.secs | supervisor中的worker心跳超时时间,一旦超时supervisor会尝试重启worker进程. |
supervisor.worker.start.timeout.secs | supervisor初始启动时,worker的心跳超时时间,当超过该时间supervisor会尝试重启worker。因为JVM初始启动和配置会带来的额外消耗,从而使得第一次心跳会超过supervisor.worker.timeout.secs的设定 |
supervisor.enable | supervisor是否应当运行分配给他的workers.默认为true,该选项用来进行Storm的单元测试,一般不应修改. |
supervisor.heartbeat.frequency.secs | supervisor心跳发送频率(多久发送一次) |
supervisor.monitor.frequency.secs | supervisor检查worker心跳的频率 |
worker.childopts | supervisor启动worker时使用的jvm选项.所有的”%ID%”字串会被替换为对应worker的标识符 |
worker.heartbeat.frequency.secs | worker的心跳发送时间间隔 |
task.heartbeat.frequency.secs | task汇报状态心跳时间间隔 |
task.refresh.poll.secs | task与其他tasks之间链接同步的频率.(如果task被重分配,其他tasks向它发送消息需要刷新连接).一般来讲,重分配发生时其他tasks会理解得到通知。该配置仅仅为了防止未通知的情况。 |
topology.debug | 如果设置成true,Storm将记录发射的每条信息。 |
topology.optimize | master是否在合适时机通过在单个线程内运行多个task以达到优化topologies的目的. |
topology.workers | 执行该topology集群中应当启动的进程数量.每个进程内部将以线程方式执行一定数目的tasks.topology的组件结合该参数和并行度提示来优化性能 |
topology.ackers | topology中启动的acker任务数.Acker保存由spout发送的tuples的记录,并探测tuple何时被完全处理.当Acker探测到tuple被处理完毕时会向spout发送确认信息.通常应当根据topology的吞吐量来确定acker的数目,但一般不需要太多.当设置为0时,相当于禁用了消息可靠性,storm会在spout发送tuples后立即进行确认. |
topology.message.timeout.secs | topology中spout发送消息的最大处理超时时间.如果一条消息在该时间窗口内未被成功ack,Storm会告知spout这条消息失败。而部分spout实现了失败消息重播功能。 |
topology.kryo.register | 注册到Kryo(Storm底层的序列化框架)的序列化方案列表.序列化方案可以是一个类名,或者是com.esotericsoftware.kryo.Serializer的实现. |
topology.skip.missing.kryo.registrations | Storm是否应该跳过它不能识别的kryo序列化方案.如果设置为否task可能会装载失败或者在运行时抛出错误. |
topology.max.task.parallelism | 在一个topology中能够允许的最大组件并行度.该项配置主要用在本地模式中测试线程数限制. |
topology.max.spout.pending | 一个spout task中处于pending状态的最大的tuples数量.该配置应用于单个task,而不是整个spouts或topology. |
topology.state.synchronization.timeout.secs | 组件同步状态源的最大超时时间(保留选项,暂未使用) |
topology.stats.sample.rate | 用来产生task统计信息的tuples抽样百分比 |
topology.fall.back.on.java.serialization | topology中是否使用java的序列化方案 |
zmq.threads | 每个worker进程内zeromq通讯用到的线程数 |
zmq.linger.millis | 当连接关闭时,链接尝试重新发送消息到目标主机的持续时长.这是一个不常用的高级选项,基本上可以忽略. |
java.library.path | JVM启动(如Nimbus,Supervisor和workers)时的java.library.path设置.该选项告诉JVM在哪些路径下定位本地库. |
3.5 配置环境变量¶
[root@master conf]# cd /usr/local/ [root@master local]# ls bin etc games hadoop-2.6.5 include jdk1.8.0_172 lib lib64 libexec sbin share src zookeeper zookeeper-3.4.5 [root@master local]# mv src/apache-storm-0.9.3 /usr/local/ [root@master local]# ln -sf /usr/local/apache-storm-0.9.3 /usr/local/storm [root@master storm]# tail -2 /etc/profile export STORM_HOME=/usr/local/storm export PATH=$PATH:$STORM_HOME/bin [root@master storm]# source /etc/profile
3.6 包分发和配置环境变量¶
[root@master ~]# rsync -az /usr/local/{storm,apache-storm-0.9.3} slave1:/usr/local/ [root@master ~]# rsync -az /usr/local/{storm,apache-storm-0.9.3} slave2:/usr/local/ [root@slave1 ~]# tail -2 /etc/profile export STORM_HOME=/usr/local/storm export PATH=$PATH:$STORM_HOME/bin [root@slave1 ~]# source /etc/profile [root@slave2 ~]# tail -2 /etc/profile export STORM_HOME=/usr/local/storm export PATH=$PATH:$STORM_HOME/bin [root@slave2 ~]# source /etc/profile
3.7 启动集群¶
- master 节点启动nimbus和storm ui
[root@master ~]# nohup storm ui >/dev/null 2>&1 & [1] 12082 [root@master ~]# nohup storm nimbus >/dev/null 2>&1 & [2] 12128 [root@master ~]# nohup storm logviewer >/dev/null 2>&1 & [3] 12257
- slave 节点启动Supervisor节点
[root@slave1 local]# nohup storm supervisor >/dev/null 2>&1 & [1] 9210 [root@slave1 local]# nohup storm logviewer >/dev/null 2>&1 & [2] 9302 [root@slave2 ~]# nohup storm supervisor >/dev/null 2>&1 & [1] 11844 [root@slave2 ~]# nohup storm logviewer >/dev/null 2>&1 & [2] 11960
3.8 检查¶
[root@master ~]# jps 12128 nimbus 7457 SecondaryNameNode 12257 logviewer 12082 core 7273 NameNode 10556 QuorumPeerMain 7613 ResourceManager 12351 Jps [root@slave1 local]# jps 7218 NodeManager 8450 QuorumPeerMain 9302 logviewer 9367 Jps 7113 DataNode 9210 supervisor [root@slave2 ~]# jps 11844 supervisor 7174 DataNode 11960 logviewer 12025 Jps 9402 QuorumPeerMain 7279 NodeManager
3.9 访问¶
访问ui页面: http://master:8080/
界面简单介绍:
- Used slots:使用的worker数。
- Free slots:空闲的worker数。
- Executors:每个worker的物理线程数。