STORM 官方文档翻译 -- 示例

在本教程中,你将学习如何创建Storm topology并将其部署到Storm群集。 Java将是使用的主要语言,但是一些例子将使用Python来体现Storm的多语言支持。

前言

本教程使用storm-starter项目中的示例。建议你clone项目并遵循示例。阅读设置开发环境创建Storm项目以使你的机器可以开发Storm程序。

Storm群集的部件

Storm集群从表面看来有点像Hadoop集群。只是Hadoop上运行的是“MapReduce”job,而Storm上运行“topology”。 “Job”和“topology”的差别很大,最大的一个就是MapReduce作业最终会完成,而Topology会永远执行(或直到你终止它)来处理消息。

Storm集群上有两种节点:主节点和工作节点。主节点运行一个名为“Nimbus”的守护进程,类似于Hadoop的“JobTracker”。 Nimbus负责在集群上分发代码,将任务分配给机器,并监控故障。

每个工作节点运行一个称为“Supervisor”的守护程序。Supervisor监听Nimbus分配给其的任务并在必要的时候启动和停止工作进程。每个工作进程执行一个topology的子集;一个topology由多个工作进程并行处理数据。

storm-cluster

Nimbus与Supervisor之间的所有协调都是通过Zookeeper集群完成的。此外,Nimbus守护进程和Supervisor守护进程是快速故障(ail-fast)和无状态(stateless)的;所有状态都保存在Zookeeper或本地磁盘上。这意味着你可以kill -9 Nimbus或者Supervisor,他们可以像没有发生任何事情一样重新启动。这种设计使得Storm非常稳定。

Topology

要使用Storm进行实时计算,你可以创建“拓扑(Topology)”。Topology是一个计算图。拓扑中的每个节点包含处理逻辑和连接各节点数据传递的指示。
运行Topology很简单。首先,将所有代码和依赖打包成jar包。然后,运行如下命令:

storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2

以上命令运行类org.apache.storm.MyTopology参数为arg1arg2。该类的main函数定义了Topology结构,并将其提交给Nimbus。storm jar负责连接Nimbus并上传jar包。

由于Topology的定义是Thrift结构的,并且Nimbus是Thrift服务,因此可以使用任何编程语言创建和提交Topology。上述示例是从基于JVM的语言中启动和停止Topology最简单的方法。更多信息,请参阅Running topologies on a production cluster

Streams

Storm的核心抽象是“流(stream)”。流是无限序列的元组。 Storm提供了可靠的方式在一个分布式系统中将一个流转换为另一个流。例如,你可以将一小段推特流转换为热门话题流。

Storm提供的流转换的组件为“spouts”和“bolt”。spout和bolt具有实现的接口,用于运行特定于应用程序的逻辑。

spout是流的数据源。例如,一个spout可以从Kestrel队列中读取元组,并将其作为流发送。或者一个spout可能连接到Twitter API并发出一段tweet流。

一个bolt可以接收多个输入流,并做一些处理,然后也可以发出成为新的流。像计算热门话题这样的复杂的刘转换,需要多个步骤,因此需要多个bolt。bolt执行函数、过滤元祖、聚合计算、join和连接数据库等操作。

spout和bolt组成了Topology网络,topology是流转换的图,其中每一个节点是一个spout或bolt。图中的边缘表示每个bolt订阅的流。当一个spout或bolt将一个元祖发送给一个流,所有订阅该流的bolt都将接收到该元祖。

topology

topology中两个节点之间的连线表明元祖应该如何传递。例如,如果Spout A和Bolt B之间有一个链接,Spout A和Bolt C有一个链接,以及Bolt B和Bolt C有一个连接,那么每次Spout A发出一个元组,它会将元组发送给Bolt B和Bolt C.而所有Bolt B的输出元组也将转到Bolt C。

Storm的Topology中的每一个节点都是并行执行的。在你的topology中,你可以指定每个节点需要多少并行度。然后Storm会在整个集群中生成指定线程数来执行。

topology是永远运行的,或者直到你终止它。 Storm会自动重新分配失败的任务。此外,Storm保证不丢失任何数据,即使机器宕机或信息丢失。

Data model

Storm使用元组作为其数据模型。元组是值的列表,元组中的一个字段可以是任何类型的对象。开箱即用,Storm支持所有原始类型,字符串和字节数组作为元组字段值。要使用另一种类型的对象,你只需要为该类型实现一个序列化

topology中的每个节点都必须声明其发出的元组的字段。例如,下面这个bolt声明它发出元祖带有“double”和“triple”字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}
}

declareOutputFields函数声明输出字段为[“double”,“triple”]。bolt的其余部分将在下面的章节中进行说明。

A simple topology

我们来看看一个简单的Topology结构,以便更多地了解这些概念,并了解如何编码。我们来看一下storm-starter中ExclamationTopology的定义:

1
2
3
4
5
6
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");

这个Topology包含一个spout和两个bolt。spout发送单词,然后每个bolt在接收到的单词后面添加”!!!”。spout发送数据到第一个bolt然后这个bolt把数据发送到第二个bolt。如果spout发送了[“bog”]和[“json”],那么第二个bolt发送的单词会是[“bob!!!!!!”] 和 [“john!!!!!!”].

代码中使用setSpoutsetBolt方法定义节点。方法有三个参数作为输入:用户指定的id,包含处理逻辑的对象以及所需节点的并行数量作为。在这个例子中,spout的id是“words”,bolt的id则是“exclaim1”和“exclaim2”。

包含处理逻辑的对象实现了spout对应的IRichSpout接口和bolt对应的IRichBolt接口。

最后一个参数,设置节点的并行数是可选的。群集中执行该组件的线程数量。如果省略它,Storm将只为该节点分配一个线程。

setBolt返回一个用于定义Bolt输入的InputDeclarer对象。本例中组件“exclaim1”声明它想使用一个shuffle grouping来读取组件“words”发出的所有元组,组件“exclaim2”声明它想使用一个shuffle grouping来读取组件“exclaim1”发出的所有元组。 “shuffle grouping”意味从输入源发送出的元组会被随机发送到一个bolt的任务。组件之间的数据分组有很多种方法。后面会有说明。

如果你希望组件”exclaim2”同时接收租房间”words”和”exclaim1”发出的元组, 你可以按照下面代码定义组件”exclaim2”:

1
2
3
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");

正如你看到的,输入声明可以指定多个bolt来源。

我们来研究这个拓扑中的spout和bolt的实现。spout负责将新消息发送到topology中。本例中的TestWordSpout每隔100ms随机发送列表[“nathan”,“mike”,“jackson”,“golda”,“bertels”]中的一个单词。其nextTuple()的实现如下所示:

1
2
3
4
5
6
7
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}

正如你所看到的,实现非常简单

下面是实现在字符串尾部添加”!!!” 的ExclamationBolt对象完整实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static class ExclamationBolt implements IRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}

prepare方法为bolt提供了一个用于从该bolt发送元组的OutputCollector。bolt可以随时从prepareexecutecleanup方法,甚至在另一个线程中异步发出元组。这里的prepare简单的将一个OutputCollector保存为一个实例变量,以便稍后在execute方法中使用。

execute方法从bolt接收一个元组。ExclamationBolt从元组中获取第一个字段,然后对其附加“!!!”字符串后再发送。如果一个bolt订阅了多个输入源,你可以使用Tuple#getSourceComponent方法找出元组来自哪个组件。

execute方法中还做了一些其他事情,既作为emit的第一个参数的输入的元组,最后一行则是确认(ack)发送。这些是Storm可靠性API的一部分,用于保证无数据丢失,将在本教程的后续部分进行说明。

当Bolt关闭时调用cleanup方法,清理打开的资源。不过在群机上的话这个方法基本用不上.如果机器的任务正在运行,则无法调用该方法。cleanup方法适用于本地模式运行Topology的情况(在此过程中模拟Storm集群),并且希望能够运行终止Topology后资源不会发生泄漏。

declareOutputFields方法申明了ExclamationBolt输出的元组,这里只包含一个字段word.

getComponentConfiguration方法允许对组件进行各方面的配置。这部分内容可以看这里

一般情况下bolt里面不需要实现cleanupgetComponentConfiguration方法.你可以简单的使用基类提供的实现.通过扩展BaseRichBolt可以更简洁地写出ExclamationBolt:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

在本地模式运行ExclamationTopology

让我们看下如何在本地模式运行ExclamationTopology.

Storm有两种运行模式:本地模式和分布式模式。在本地模式下,Storm通过用线程模拟工作节点来完成执行。本地模式对于测试和开发Topology非常好用的。当你运行storm-starter的Topology,它将以本地模式运行,你可以看到每个组件发出的消息。本地模式更多内容请查看local mode

在分布式模式下,当向master提交topology时,你的程序会被master分发给worker运行.如果某个work宕机,mster会重新分配一台机器执行代码.具体可以查看Running topologies on a production cluster

以下是让ExclamationTopology运行在本地模式下的代码:

1
2
3
4
5
6
7
8
9
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

首先,代码通过创建一个LocalCluster对象定义一个集群。将topology提交到此虚拟集群与向分布式集群提topology结构相同。它通过调用submitTopologyLocalCluster提交一个topology,该方法接收三个命令:运行topology的名称,topology的配置以及topology本身.

名称用于标识topology,以便以后可以将其终止。Topology将无限期运行,直到你将其终止。

配置用于调整运行topology的各个方面。这里指定的两个配置是非常常见的:

1.TOPOLOGY_WORKERS(通过setNumWorkers设置)指定运行Topology进行数量.topology中的每个组件将用多个线程执行。分配给给定组件的线程数通过setBoltsetSpout方法进行配置。每个工作进程在其中包含一定数量的组件的线程数。例如,你在配置中指定了50个工作进程和300个线程。那么每个工作进程将执行6个线程,每个线程可以属于不同的组件。你可以通过调整每个组件的并行性以及这些线程在其中运行的工作进程数来调整Storm topology的性能。

2.TOPOLOGY_DEBUG(通过setDebug设置),当设置为true时,告诉Storm记录组件发出的每个消息。这在测试topology结构时在本地模式下很有用,不过在群集中运行时建议将其关闭.

还有一些其他的配置属性,具体细节可以查看the Javadoc for Config.

要了解如何设置开发环境,以便可以以本地模式运行topology(例如在Eclipse中),请参阅创建新的Storm项目

Stream groupings

stream grouping告诉topology如何在两个组件之间发送元组。记住,spout和bolt在整个集群中是并行执行的.一个执行topology的任务图看起来是这样的:

topology-tasks

当Bolt A的任务向Bolt B发出一个元组时,哪个任务应该发送元组?

“stream grouping”通过告诉Storm如何在任务集之间发送元组来处理这个问题。在我们研究不同类型的流分组之前,让我们来看一下storm-starter中的另一个topology。WordCountTopology从一个spout读取句子,并从WordCountBolt中输出单词总数:

1
2
3
4
5
6
7
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));

SplitSentence将接收到句子中的每个单词都发出去.WordCount将在内存中通过一个map记录这些单词,每当WordCount接收到一个单词.他就会更新状态,并发送新的单词数.

有几种不同的stream groupings.

最简单的分组称为“shuffle grouping”,它将元组发送到随机任务。在WordCountTopology中使用随机分组将来自RandomSentenceSpout的元组发送到SplitSentencebolt。这样的处理方式会达到很好的负载均衡效果.

一个更有趣的分组是“fields grouping”。SplitSentencebolt和WordCountbolt之间使用的是fields grouping。对于WordCount blot的功能来说,同样的单词总能被相同的任务执行是至关重要的。否则,多个任务将看到相同的单词,每个任务都会发出都是不正确的计数值,field grouping允许你指定字段进行分组。相同的字段值会被发送到同一个人物。WordCount使用“word”字段来订阅SplitSentence的输出流,所以相同的单词总是传送到相同的任务,并且blot将产生正确的输出。

Field group是实现流连接,流聚合以及大量其他用例的基础。底层使用mod哈希实现Field group。

关于其他的stream group你可以查看Concepts

在其他语言中定义Bolt

bolt可以用任何语言定义。用另一种语言编写的bolt作为子进程执行,Storm通过stdin/stdout的JSON消息与子进程进行通信。通信协议只需一个100行代码的适配器库(adapter library),而且Storm配有适用于Ruby,Python和Fancy的适配器库。

下面是WordCountTopologySplitSentence bolt的定义:

1
2
3
4
5
6
7
8
9
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

SplitSentence重写了ShellBolt并且声明它使用python运行splitsentence.py. 这里是splitsentence.py的实现代码:

1
2
3
4
5
6
7
8
9
import storm
class SplitSentenceBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
SplitSentenceBolt().run()

有关使用其他语言编写spout和bolt的更多信息,请参阅在Storm中使用非JVM语言

Guaranteeing message processing

在前面的文档中,我们跳过了元组的发布方式。这些是Storm可靠性API的一部分,关于Storm如何保证spout来的每个消息都将被完全处理。请参阅Guaranteeing message processing

Transactional topologies

Storm保证每个消息通过Topology发送至少一次。一个常见的问题是“你怎么保证计算的正确?会不会多算?” Storm具有称为transactional topologies的特性,可让大多数计算进行一次消息传递。更多信息请看这里

Distributed RPC

本教程介绍了如何在Storm之上进行基本流处理。还有更多的事情你可以用Storm处理。 Storm的最有趣的应用之一是分布式RPC,你可以在其中并行计算,在这里阅读更多关于分布式RPC的信息。

Conclusion

本教程概述了开发,测试和部署Storm Topology。其余的文档可以深入了解使用Storm的各个方面。

原文地址:http://storm.apache.org/releases/current/Tutorial.html

打赏支持:支付宝/微信,感谢赏口饭吃