概述
Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流。被称作“实时的hadoop”。Storm有很多使用场景:如实时分析,在线机器学习,持续计算, 分布式RPC,ETL等等。Storm支持水平扩展,具有高容错性,保证每个消息都会得到处理,而且处理速度很快。Storm的部署和运维都很便捷,而且更为重要的是可以使用任意编程语言来开发应用。
Storm的特点
简单的编程模型
在大数据处理方面相信大家对hadoop已经耳熟能详,基于Google Map/Reduce来实现的Hadoop为开发者提供了map、reduce原语,使并行批处理程序变得非常地简单和优美。
同样,Storm也为大数据 的实时计算提供了一些简单优美的原语,这大大降低了开发并行实时处理的任务的复杂性,帮助你快速、高效的开发应用。
水平扩展
在Storm集群中真正运行topology的主要有三个实体:工作进程、线程和任务。Storm集群中的每台机器上都可以运行多个工作进程,每个 工作进程又可创建多个线程,每个线程可以执行多个任务,任务是真正进行数据处理的实体,我们开发的spout、bolt就是作为一个或者多个任务的方式执行的。
计算任务在多个线程、进程和服务器之间并行进行,支持灵活的水平扩展。
支持多种编程语言
你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
高可靠性
Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
spout发出的消息后续可能会触发产生成千上万条消息,可以形象的理解为一棵消息树,其中spout发出的消息为树根,Storm会跟踪这棵消息树的处理情况,只有当这棵消息树中的所有消息都被处理了,Storm才会认为spout发出的这个消息已经被“完全处理”。如果这棵消息树中的任何一个消息处理失败了,或者整棵消息树在限定的时间内没有“完全处理”,那么spout发出的消息就会重发。
高容错性
Storm会管理工作进程和节点的故障。
如果在消息处理过程中出了一些异常,Storm会重新安排这个出问题的处理单元。Storm保证一个处理单元永远运行(除非你显式杀掉这个处理单元)。
当然,如果处理单元中存储了中间状态,那么当处理单元重新被Storm启动的时候,需要应用自己处理中间状态的恢复。
本地模式
Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。
Storm架构
Storm集群由一个主节点和多个工作节点组成。主节点运行了一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测。每个工作节 点都运行了一个名为“Supervisor”的守护进程,用于监听工作,开始并终止工作进程。Nimbus和Supervisor都能快速失败,而且是无状态的,这样一来它们就变得十分健壮,两者的协调工作是由ApacheZooKeeper来完成的。
Stream
Stream是一个数据流的抽象。这是一个没有边界的Tuple序列,而这些Tuple序列会以一种分布式的方式并行地创建和处理。
对消息流的定义主要就是对消息流里面的tuple 进行定义,为了更好地使用tuple,需要给tuple 里的每个字段取一个名字,并且不同的tuple 字段对应的类型要相同,即两个tuple 的第一个字段类型相同,第二个字段类型相同,但是第一个字段和第二个字段的类型可以不同。默认情况下,tuple 的字段类型可以为integer、long、short、byte、string、double、float、boolean 和byte array 等基本类型,也可以自定义类型,只需要实现相应的序列化接口。
每一个消息流在定义的时候需要被分配一个id,最常见的消息流是单向的消息流,在Storm 中OutputFieldsDeclarer 定义了一些方法,让你可以定义一个Stream 而不用指定这个id。在这种情况下,这个Stream 会有个默认的id: 1。
Topologies
Topology是由Stream Grouping连接起来的Spout和Bolt节点网络。
在 Storm 中,一个实时计算应用程序的逻辑被封装在一个称为Topology 的对象中,也称为计算拓扑。Topology 有点类似于Hadoop 中的MapReduce Job,但是它们之间的关键区别在于,一个MapReduce Job 最终总是会结束的,然而一个Storm 的Topology 会一直运行。在逻辑上,一个Topology 是由一些Spout(消息的发送者)和Bolt(消息的处理者)组成图状结构,而链接Spouts 和Bolts 的则是Stream Groupings。
Spouts&Bolts
Spouts
Spouts 是Storm集群中一个计算任务(Topology)中消息流的生产者,Spouts一般是从别的数据源(例如,数据库或者文件系统)加载数据,然后向Topology中发射消息。
Spouts即可以是可靠的,也可以是不可靠的。
在一个Topology中存在两种Spouts,一种是可靠的Spouts,一种是非可靠的Spouts,可靠的Spouts 在一个tuple 没有成功处理的时候会重新发射该tuple,以保证消息被正确地处理。不可靠的Spouts 在发射一个tuple 之后,不会再重新发射该tuple,即使该tuple 处理失败。每个Spouts 都可以发射多个消息流,要实现这样的效果,可以使用OutFieldsDeclarer.declareStream 来定义多个Stream,然后使用SpoutOutputCollector 来发射指定的Stream。
在Storm 的编程接口中,Spout 类最重要的方法是nextTuple()方法,使用该方法可以发射一个消息tuple 到Topology 中,或者简单地直接返回,如果没有消息要发射。需要注意的是,nextTuple 方法的实现不能阻塞Spout,因为Storm在同一线程上调用Spout 的所有方法。Spout 类的另外两个重要的方法是ack()和fail(),一个tuple 被成功处理完成后,ack()方法被调用,否则就调用fail()方法。注意,只有对于可靠的Spout,才会调用ack()和fail()方法。
Bolts
所有消息处理的逻辑都在Bolt 中完成,在Bolt 中可以完成如过滤、分类、聚集、计算、查询数据库等操作。Bolt 可以做简单的消息处理操作,例如,Bolt 可以不做任何操作,只是将接收到的消息转发给其他的Bolt。Bolt 也可以做复杂的消息流的处理,从而需要很多个Bolt。在实际使用中,一条消息往往需要经过多个处理步骤,例如,计算一个班级中成绩在前十名的同学,首先需要对所有同学的成绩进行排序,然后在排序过的成绩中选出前十名的
成绩的同学。所以在一个Topology 中,往往有很多个Bolt,从而形成了复杂的流处理网络。
Bolts可以发射多条消息流。
- 使用OutputFieldsDeclarer.declareStream定义Stream。
- 使用OutputCollector.emit来选择要发射的Stream。
Bolts的主要方法是execute。
Bolts以Tuple作为输入,使用OutputCollector来发射Tuple,通过调用OutputCollector.ack()通知这个Tuple的发射者Spout。
Bolts一般流程。
处理一个输入Tuple,发射0个或多个Tuple,然后调用ack()通知Storm自己已经处理过这个Tuple了。Storm提供了一个IBasicBolt会自动调用ack()。
Stream Groupings
定义一个 Topology 的其中一步是定义每个Bolt 接收什么样的流作为输入。Stream Grouping 就是用来定义一个Stream 应该如何分配给Bolts 上面的多个Tasks。
Storm里有7种类型的Stream Grouping。
- Shuffle Grouping 随机分组,随机派发Stream里面的Tuple,保证每个Bolt接收到的Tuple数量大致相同。
- Fields Grouping 按字段分组,以id举例。具有相同id的Tuple会被分到相同的Bolt中的一个Task,而不同id的Tuple会被分到不同的Bolt中的Task。
- All Grouping 广播,对于每一个Tuple,所有的Bolts都会收到。
- Global Grouping 全局分组,这个Tuple被分配到Storm中的一个Bolt的其中一个Task。具体一点就是分配给id值最低的那个Task。
- Non Grouping 不分组,Stream不关心到底谁会收到它的Tuple。目前这种分组和Shuffle Grouping是一样的效果,有一点不同的是Storm会把这个Bolt放到这个Bolt的订阅者同一个线程中去执行。
- Direct Grouping 直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个Task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息Tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的Task的id(OutputCollector.emit方法也会返回Task的id)。
- Local or Shuffle Grouping 如果目标Bolt有一个或者多个Task在同一个工作进程中,Tuple将会被随机发射给这些Tasks。否则,和普通的Shuffle Grouping行为一致。
Workers
- 每个Supervisor中运行着多个Workers进程。
- 每个Worker进程中运行着多个Executor线程。
- 每个Executor线程中运行着若干个相同的Task(Spout/Bolt)。
一个 Topology 可能会在一个或者多个工作进程里面执行,每个工作进程执行整个Topology 的一部分。比如,对于并行度是300 的Topology 来说,如果我们使用50 个工作进程来执行,那么每个工作进程会处理其中的6 个Tasks(其实就是每个工作进程里面分配6 个线程)。Storm 会尽量均匀地把工作分配给所有的工作进程。
Task
在 Storm 集群上,每个Spout 和Bolt 都是由很多个Task 组成的,每个Task对应一个线程,流分组策略就是定义如何从一堆Task 发送tuple 到另一堆Task。在实现自己的Topology 时可以调用TopologyBuilder.setSpout() 和TopBuilder.setBolt()方法来设置并行度,也就是有多少个Task。
Storm安装部署
- 安装jdk。
- 搭建Zookeeper集群。
- 下载并解压Storm。
- 修改storm.yaml配置文件。
- storm.zookeeper.servers: Storm集群使用的Zookeeper集群地址。例如:
storm.zookeeper.servers:
-“192.168.145.141”
-“192.168.145.142” - 如果Zookeeper没有使用默认端口,那么还需要修改storm.zookeeper.port。
- storm.local.dir Nimbus和Supervisor进程用于存储少量状态,如jars、confs等的本地磁盘目录,需要提前创建该目录并给予足够的访问权限。然后在storm.yaml中配置该目录,例如:
storm.local.dir:”/home/application/storm/workdir”
- storm.zookeeper.servers: Storm集群使用的Zookeeper集群地址。例如:
注意事项
启动Storm后台进程时,需要对conf/storm.yaml配置文件中设置的storm.local.dir目录具有写权限。
Storm后台进程被启动时,将在Storm安装目录下的logs/子目录下生成各个进程的日志文件。
Storm UI必须和Storm Nimbus部署在同一台机器上,否则UI无法正常工作,因为UI进程会检查本机是否存在Nimbus链接。
常用命令
命令描述 | 格式 | 例子 |
---|---|---|
启动Nimbus | storm nimbus | storm nimbus |
启动Supervisor | storm supervisor | storm supervisor |
启动UI | storm ui | storm ui |
提交Topologies
格式
storm jar 【jar路径】 【拓扑包名.拓扑类名】【stormIP地址】【storm端口】【拓扑名称】【参数】
Example
|
|
停止Topologies
格式
storm kill [拓扑名称]
Example
|
|
API
Spouts
Spout是Stream的消息产生源, Spout组件的实现可以通过继承BaseRichSpout类或者其他*Spout类来完成,也可以通过实现IRichSpout接口来实现。
open
当一个Task被初始化的时候会调用open()。一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化。
getComponentConfiguration
此方法用于声明针对当前组件的特殊的Configuration配置。
nextTuple
这是Spout类中最重要的一个方法。发射一个Tuple到Topology都是通过这个方法来实现的。
declareOutputFields
此方法用于声明当前Spout的Tuple发送流。Stream的定义是通过OutputFieldsDeclare.declare方法完成的,其中的参数包括了发送的Fields。
另外,除了上述几个方法之外,还有ack、fail和close方法等。
Storm在监测到一个Tuple被成功处理之后会调用ack方法,处理失败会调用fail方法。这两个方法在BaseRichSpout等类中已经被隐式的实现了。
Bolts
Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。Bolt组件的实现可以通过继承BasicRichBolt类或者IRichBolt接口来完成。
prepare
此方法与Spouts的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发射Tuple。Bolt中Tuple的发射可以在prepare中、execute中、cleanup等方法中进行,一般都是在execute中。
getComponentConfiguration
与Spouts类似。
execute
这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是在execute中通过调用emit方法来完成的。
emit有两种情况,一种是emit方法中有两个参数,另一个种是有一个参数。
- emit有一个参数:此唯一的参数是发送到下游Bolt的Tuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple不再属于同一棵Tuple树。新的Tuple另起一个新的Tuple树。
- emit有两个参数:第一个参数是旧的Tuple的输入流,第二个参数是发往下游Bolt的新的Tuple流。此时,新的Tuple和旧的Tuple是仍然属于同一棵Tuple树,即,如果下游的Bolt处理Tuple失败,则会向上传递到当前Bolt,当前Bolt根据旧的Tuple流继续往上游传递,申请重发失败的Tuple。保证Tuple处理的可靠性。
declareOutputFields
用于声明当前Bolt发送的Tuple中包含的字段。
Topology Example
|
|
|
|
|
|
|
|