Storm

1.介绍

1.1 什么是Apache storm

  Apache Storm是一个分布式实时大数据处理系统。Storm设计用于在容错和水平可扩展方法中处理大量数据。它是一个流数据框架,具有最高的摄取率。虽然Storm是无状态的,它通过Apache ZooKeeper管理分布式环境和集群状态。它很简单,您可以并行地对实时数据执行各种操作。Apache Storm继续成为实时数据分析的领导者。Storm易于设置和操作,并且它保证每个消息将通过拓扑至少处理一次。

1.2 storm vs hadoop

  基本上Hadoop和Storm框架用于分析大数据。两者互补,在某些方面有所不同。Apache Storm执行除持久性之外的所有操作,而Hadoop在所有方面都很好,但滞后于实时计算。下表比较了Storm和Hadoop的属性。

Storm Hadoop
实时流处理 批量处理
无状态 有状态
主/从架构与基于**ZooKeeper**的协调。主节点称为nimbus,从属节点是**主管**。 具有/不具有基于ZooKeeper的协调的主 - 从结构。主节点是**作业跟踪器**,从节点是**任务跟踪器**。
Storm流过程在集群上每秒可以访问数万条消息。 Hadoop分布式文件系统(HDFS)使用MapReduce框架来处理大量的数据,需要几分钟或几小时。
Storm拓扑运行直到用户关闭或意外的不可恢复故障。 MapReduce作业按顺序执行并最终完成。
两者都是分布式和容错的
如果nimbus / supervisor死机,重新启动使它从它停止的地方继续,因此没有什么受到影响。 如果JobTracker死机,所有正在运行的作业都会丢失。

1.3 storm优势

  • Storm是开源的,强大的,用户友好的。它可以用于小公司和大公司。
  • Storm是容错的,灵活的,可靠的,并且支持任何编程语言。
  • 允许实时流处理。
  • Storm是令人难以置信的快,因为它具有巨大的处理数据的力量。
  • Storm可以通过线性增加资源来保持性能,即使在负载增加的情况下。它是高度可扩展的。
  • Storm在几秒钟或几分钟内执行数据刷新和端到端传送响应取决于问题。它具有非常低的延迟。
  • Storm有操作智能。
  • Storm提供保证的数据处理,即使群集中的任何连接的节点死或消息丢失。

2. storm 核心

  Apache Storm从一端读取实时数据的原始流,并将其传递通过一系列小处理单元,并在另一端输出处理/有用的信息。下图描述了Apache Storm的核心概念。

storm

2.1 storm 组件

组件 描述
Tuple Tuple是Storm中的主要数据结构。它是有序元素的列表。默认情况下,Tuple支持所有数据类型。通常,它被建模为一组逗号分隔的值,并传递到Storm集群。
Stream 流是元组的无序序列。
Spouts 流的源。通常,Storm从原始数据源(如Twitter Streaming API,Apache Kafka队列,Kestrel队列等)接受输入数据。否则,您可以编写spouts以从数据源读取数据。“ISpout”是实现spouts的核心接口,一些特定的接口是IRichSpout,BaseRichSpout,KafkaSpout等。
Bolts Bolts是逻辑处理单元。Spouts将数据传递到Bolts和Bolts过程,并产生新的输出流。Bolts可以执行过滤,聚合,加入,与数据源和数据库交互的操作。Bolts接收数据并发射到一个或多个Bolts。 “IBolt”是实现Bolts的核心接口。一些常见的接口是IRichBolt,IBasicBolt等。
Nimbus 即Storm的Master,负责资源分配和任务调度。一个Storm集群只有一个Nimbus。
Supervisor 即Storm的Slave,负责接收Nimbus分配的任务,管理所有Worker,一个Supervisor节点中包含多个Worker进程。
Worker 工作进程,每个工作进程中都有多个Task。
Task 任务,在 Storm 集群中每个 Spout 和 Bolt 都由若干个任务(tasks)来执行。每个任务都与一个执行线程相对应。
Topology 计算拓扑,Storm 的拓扑是对实时计算应用逻辑的封装,它的作用与 MapReduce 的任务(Job)很相似,区别在于 MapReduce 的一个 Job 在得到结果之后总会结束,而拓扑会一直在集群中运行,直到你手动去终止它。拓扑还可以理解成由一系列通过数据流(Stream Grouping)相互关联的 Spout 和 Bolt 组成的的拓扑结构。
Stream 数据流(Streams)是 Storm 中最核心的抽象概念。一个数据流指的是在分布式环境中并行创建、处理的一组元组(tuple)的无界序列。数据流可以由一种能够表述数据流中元组的域(fields)的模式来定义。
Spout 数据源(Spout)是拓扑中数据流的来源。一般 Spout 会从一个外部的数据源读取元组然后将他们发送到拓扑中。根据需求的不同,Spout 既可以定义为可靠的数据源,也可以定义为不可靠的数据源。一个可靠的 Spout能够在它发送的元组处理失败时重新发送该元组,以确保所有的元组都能得到正确的处理;相对应的,不可靠的 Spout 就不会在元组发送之后对元组进行任何其他的处理。一个 Spout可以发送多个数据流。
Bolt 拓扑中所有的数据处理均是由 Bolt 完成的。通过数据过滤(filtering)、函数处理(functions)、聚合(aggregations)、联结(joins)、数据库交互等功能,Bolt 几乎能够完成任何一种数据处理需求。一个 Bolt 可以实现简单的数据流转换,而更复杂的数据流变换通常需要使用多个 Bolt 并通过多个步骤完成。
Stream grouping 为拓扑中的每个 Bolt 的确定输入数据流是定义一个拓扑的重要环节。数据流分组定义了在 Bolt 的不同任务(tasks)中划分数据流的方式。在 Storm 中有八种内置的数据流分组方式。
Reliability 可靠性。Storm 可以通过拓扑来确保每个发送的元组都能得到正确处理。通过跟踪由 Spout 发出的每个元组构成的元组树可以确定元组是否已经完成处理。每个拓扑都有一个“消息延时”参数,如果 Storm 在延时时间内没有检测到元组是否处理完成,就会将该元组标记为处理失败,并会在稍后重新发送该元组。

storm

storm

2.2 拓扑

  Spouts和Bolts连接在一起,形成拓扑结构。实时应用程序逻辑在Storm拓扑中指定。简单地说,拓扑是有向图,其中顶点是计算,边缘是数据流。

  简单拓扑从spouts开始。Spouts将数据发射到一个或多个Bolts。Bolt表示拓扑中具有最小处理逻辑的节点,并且Bolts的输出可以发射到另一个Bolts作为输入。

  Storm保持拓扑始终运行,直到您终止拓扑。Apache Storm的主要工作是运行拓扑,并在给定时间运行任意数量的拓扑。

2.3 任务

  现在你有一个关于Spouts和Bolts的基本想法。它们是拓扑的最小逻辑单元,并且使用单个Spout和Bolt阵列构建拓扑。应以特定顺序正确执行它们,以使拓扑成功运行。Storm执行的每个Spout和Bolt称为“任务”。简单来说,任务是Spouts或Bolts的执行。在给定时间,每个Spout和Bolt可以具有在多个单独的螺纹中运行的多个实例。

2.4 进程

  拓扑在多个工作节点上以分布式方式运行。Storm将所有工作节点上的任务均匀分布。工作节点的角色是监听作业,并在新作业到达时启动或停止进程。

2.5 流分组

  数据流从Spouts流到Bolts,或从一个Bolts流到另一个Bolts。流分组控制元组在拓扑中的路由方式,并帮助我们了解拓扑中的元组流。有四个内置分组,如下所述。

2.6 随机分组

  在随机分组中,相等数量的元组随机分布在执行Bolts的所有工人中。下图描述了结构。

storm

2.7 字段分组

  元组中具有相同值的字段组合在一起,其余的元组保存在外部。然后,具有相同字段值的元组被向前发送到执行Bolts的同一进程。例如,如果流由字段“字”分组,则具有相同字符串“Hello”的元组将移动到相同的工作者。下图显示了字段分组的工作原理。

storm

2.8 全局分组

  所有流可以分组并向前到一个Bolts。此分组将源的所有实例生成的元组发送到单个目标实例(具体来说,选择具有最低ID的工作程序)。

storm

2.9 所有分组

  所有分组将每个元组的单个副本发送到接收Bolts的所有实例。这种分组用于向Bolts发送信号。所有分组对于连接操作都很有用。

storm

3. 部署

3.1 下载地址

http://mirror.bit.edu.cn/apache/storm/

3.2 集群环境

序列号 IP地址 主机名 角色 安装软件
1 192.168.186.10 master hadoop master zookeeper
2 192.168.186.11 slave1 hadoop slave1 zookeeper
3 192.168.186.12 slave2 hadoop slave2 zookeeper

仍然是这三台机器

3.3 安装

安装之前线检查jdk

[root@master ~]# java -version
java version "1.8.0_172"
Java(TM) SE Runtime Environment (build 1.8.0_172-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)

其他两台机器省略,保证三台机器能均安装JDK,,同时检查zookeeper是否安装[storm 依赖zookeeper]

[root@master ~]# cd /usr/local/src/
[root@master src]# ls apache-storm-0.9.3.master.tgz 
apache-storm-0.9.3.master.tgz
[root@master src]# tar xf apache-storm-0.9.3.master.tgz 

3.4 配置

[root@master conf]# egrep -v '#|^$' storm.yaml 
storm.zookeeper.servers:
    - "master"
    - "slave1"
    - "slave2"
nimbus.host: "master"
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
    - 6704

配置文件解释

配置项 配置说明
storm.zookeeper.servers ZooKeeper服务器列表
storm.zookeeper.port ZooKeeper连接端口
storm.local.dir storm使用的本地文件系统目录(必须存在并且storm进程可读写)
storm.cluster.mode Storm集群运行模式([distributed|local])
storm.local.mode.zmq Local模式下是否使用ZeroMQ作消息系统,如果设置为false则使用Java消息系统。默认为false
storm.zookeeper.root ZooKeeper中Storm的根目录位置
storm.zookeeper.session.timeout 客户端连接ZooKeeper超时时间
storm.id 运行中拓扑的id,由storm name和一个唯一随机数组成。
nimbus.host nimbus服务器地址
nimbus.thrift.port nimbus的thrift监听端口
nimbus.childopts 通过storm-deploy项目部署时指定给nimbus进程的jvm选项
nimbus.task.timeout.secs 心跳超时时间,超时后nimbus会认为task死掉并重分配给另一个地址。
nimbus.monitor.freq.secs nimbus检查心跳和重分配任务的时间间隔.注意如果是机器宕掉nimbus会立即接管并处理。
nimbus.supervisor.timeout.secs supervisor的心跳超时时间,一旦超过nimbus会认为该supervisor已死并停止为它分发新任务.
nimbus.task.launch.secs task启动时的一个特殊超时设置.在启动后第一次心跳前会使用该值来临时替代nimbus.task.timeout.secs.
nimbus.reassign 当发现task失败时nimbus是否重新分配执行。默认为真,不建议修改。
nimbus.file.copy.expiration.secs nimbus判断上传/下载链接的超时时间,当空闲时间超过该设定时nimbus会认为链接死掉并主动断开
ui.port Storm UI的服务端口
drpc.servers DRPC服务器列表,以便DRPCSpout知道和谁通讯
drpc.port Storm DRPC的服务端口
supervisor.slots.ports supervisor上能够运行workers的端口列表.每个worker占用一个端口,且每个端口只运行一个worker.通过这项配置可以调整每台机器上运行的worker数.(调整slot数/每机)
supervisor.childopts 在storm-deploy项目中使用,用来配置supervisor守护进程的jvm选项
supervisor.worker.timeout.secs supervisor中的worker心跳超时时间,一旦超时supervisor会尝试重启worker进程.
supervisor.worker.start.timeout.secs supervisor初始启动时,worker的心跳超时时间,当超过该时间supervisor会尝试重启worker。因为JVM初始启动和配置会带来的额外消耗,从而使得第一次心跳会超过supervisor.worker.timeout.secs的设定
supervisor.enable supervisor是否应当运行分配给他的workers.默认为true,该选项用来进行Storm的单元测试,一般不应修改.
supervisor.heartbeat.frequency.secs supervisor心跳发送频率(多久发送一次)
supervisor.monitor.frequency.secs supervisor检查worker心跳的频率
worker.childopts supervisor启动worker时使用的jvm选项.所有的”%ID%”字串会被替换为对应worker的标识符
worker.heartbeat.frequency.secs worker的心跳发送时间间隔
task.heartbeat.frequency.secs task汇报状态心跳时间间隔
task.refresh.poll.secs task与其他tasks之间链接同步的频率.(如果task被重分配,其他tasks向它发送消息需要刷新连接).一般来讲,重分配发生时其他tasks会理解得到通知。该配置仅仅为了防止未通知的情况。
topology.debug 如果设置成true,Storm将记录发射的每条信息。
topology.optimize master是否在合适时机通过在单个线程内运行多个task以达到优化topologies的目的.
topology.workers 执行该topology集群中应当启动的进程数量.每个进程内部将以线程方式执行一定数目的tasks.topology的组件结合该参数和并行度提示来优化性能
topology.ackers topology中启动的acker任务数.Acker保存由spout发送的tuples的记录,并探测tuple何时被完全处理.当Acker探测到tuple被处理完毕时会向spout发送确认信息.通常应当根据topology的吞吐量来确定acker的数目,但一般不需要太多.当设置为0时,相当于禁用了消息可靠性,storm会在spout发送tuples后立即进行确认.
topology.message.timeout.secs topology中spout发送消息的最大处理超时时间.如果一条消息在该时间窗口内未被成功ack,Storm会告知spout这条消息失败。而部分spout实现了失败消息重播功能。
topology.kryo.register 注册到Kryo(Storm底层的序列化框架)的序列化方案列表.序列化方案可以是一个类名,或者是com.esotericsoftware.kryo.Serializer的实现.
topology.skip.missing.kryo.registrations Storm是否应该跳过它不能识别的kryo序列化方案.如果设置为否task可能会装载失败或者在运行时抛出错误.
topology.max.task.parallelism 在一个topology中能够允许的最大组件并行度.该项配置主要用在本地模式中测试线程数限制.
topology.max.spout.pending 一个spout task中处于pending状态的最大的tuples数量.该配置应用于单个task,而不是整个spouts或topology.
topology.state.synchronization.timeout.secs 组件同步状态源的最大超时时间(保留选项,暂未使用)
topology.stats.sample.rate 用来产生task统计信息的tuples抽样百分比
topology.fall.back.on.java.serialization topology中是否使用java的序列化方案
zmq.threads 每个worker进程内zeromq通讯用到的线程数
zmq.linger.millis 当连接关闭时,链接尝试重新发送消息到目标主机的持续时长.这是一个不常用的高级选项,基本上可以忽略.
java.library.path JVM启动(如Nimbus,Supervisor和workers)时的java.library.path设置.该选项告诉JVM在哪些路径下定位本地库.

3.5 配置环境变量

[root@master conf]# cd /usr/local/
[root@master local]# ls
bin  etc  games  hadoop-2.6.5  include  jdk1.8.0_172  lib  lib64  libexec  sbin  share  src  zookeeper  zookeeper-3.4.5
[root@master local]# mv src/apache-storm-0.9.3 /usr/local/
[root@master local]# ln -sf /usr/local/apache-storm-0.9.3 /usr/local/storm
[root@master storm]# tail -2 /etc/profile
export STORM_HOME=/usr/local/storm
export PATH=$PATH:$STORM_HOME/bin
[root@master storm]# source /etc/profile

3.6 包分发和配置环境变量

[root@master ~]# rsync -az /usr/local/{storm,apache-storm-0.9.3} slave1:/usr/local/
[root@master ~]# rsync -az /usr/local/{storm,apache-storm-0.9.3} slave2:/usr/local/

[root@slave1 ~]# tail -2 /etc/profile
export STORM_HOME=/usr/local/storm
export PATH=$PATH:$STORM_HOME/bin
[root@slave1 ~]# source /etc/profile
[root@slave2 ~]# tail -2 /etc/profile
export STORM_HOME=/usr/local/storm
export PATH=$PATH:$STORM_HOME/bin
[root@slave2 ~]# source /etc/profile

3.7 启动集群

  • master 节点启动nimbus和storm ui
[root@master ~]# nohup storm ui >/dev/null 2>&1 &
[1] 12082
[root@master ~]# nohup storm nimbus >/dev/null 2>&1 &
[2] 12128
[root@master ~]# nohup storm logviewer >/dev/null 2>&1 &
[3] 12257
  • slave 节点启动Supervisor节点
[root@slave1 local]# nohup storm supervisor >/dev/null 2>&1 &
[1] 9210
[root@slave1 local]# nohup storm logviewer >/dev/null 2>&1 &
[2] 9302


[root@slave2 ~]# nohup storm supervisor >/dev/null 2>&1 &
[1] 11844
[root@slave2 ~]# nohup storm logviewer >/dev/null 2>&1 &
[2] 11960

3.8 检查

[root@master ~]# jps
12128 nimbus
7457 SecondaryNameNode
12257 logviewer
12082 core
7273 NameNode
10556 QuorumPeerMain
7613 ResourceManager
12351 Jps

[root@slave1 local]# jps
7218 NodeManager
8450 QuorumPeerMain
9302 logviewer
9367 Jps
7113 DataNode
9210 supervisor

[root@slave2 ~]# jps
11844 supervisor
7174 DataNode
11960 logviewer
12025 Jps
9402 QuorumPeerMain
7279 NodeManager

3.9 访问

  访问ui页面: http://master:8080/

storm

界面简单介绍:

  • Used slots:使用的worker数。
  • Free slots:空闲的worker数。
  • Executors:每个worker的物理线程数。