Storm - Guaranteeing message processing

2019-03-02 23:51|来源: 网路

https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

http://xumingming.sinaapp.com/127/twitter-storm%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1/

 

这章讨论Storm's reliability capabilities, 如何保证从spout emit出来的所有tuple都被正确的执行(fully processed)?

What does it mean for a message to be "fully processed"?

首先的问题是, 什么叫tuple或message被fully processed? 因为tuple被emit出去后, 可能会被多级bolt处理, 并且bolt也有可能由该tuple生成多组tuples, 所以情况还是比较复杂的
最终由一个tuple trigger(触发)的所有tuples会形成一个树或DAG(有向无环图)

只有当tuple tree上的所有节点都被成功处理的时候, storm才认为该tuple被fully processed
如果tuple tree上任一节点失败或者超时, 都被看作该tuple fail, 失败的tuple会被重发
Storm considers a tuple coming off a spout "fully processed" when the tuple tree has been exhausted and every message in the tree has been processed.
A tuple is considered failed when its tree of messages fails to be fully processed within a specified timeout.
This timeout can be configured on a topology-specific basis using the Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS configuration and defaults to 30 seconds.

 

What happens if a message is fully processed or fails to be fully processed?

该机制是如何实现的?
首先, 所有tuple都有一个唯一标识msgId, 当tuple被emit的时候确定

_collector.emit(new Values("field1", "field2", 3) , msgId);

其次, 看看下面的ISpout接口, 除了获取tuple的nextTuple
还有ack和fail, 当Storm detect到tuple被fully processed, 会调用ack, 如果超时或detect fail, 则调用fail
此处需要注意的是, tuple只有在被产生的那个spout task上可以被ack或fail, 具体原因看后面的实现解释就理解了

a tuple will be acked or failed by the exact same Spout task that created it. So if a Spout is executing as many tasks across the cluster, a tuple won't be acked or failed by a different task than the one that created it.

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

最后, 在spout怎么实现的, 其实比较简单.
对于Spout queue, get message只是open而不是pop, 并且把tuple状态改为pending, 防止该tuple被多次发送.
一直等到该tuple被ack, 才真正的pop该tuple, 当然该tuple如果fail, 就重新把状态改回初始状态
这也解释, 为什么tuple只能在被emit的spout task被ack或fail, 因为只有这个task的queue里面有该tuple

When KestrelSpout takes a message off the Kestrel queue, it "opens" the message.
This means the message is not actually taken off the queue yet, but instead placed in a "pending" state waiting for acknowledgement that the message is completed.
While in the pending state, a message will not be sent to other consumers of the queue. Additionally, if a client disconnects all pending messages for that client are put back on the queue.

 

What is Storm's reliability API?

前面一直没有说明的一个问题是, storm本身通过什么机制来判断tuple是否成功被fully processed?

要解决这个问题, 可以分为两个问题,
1. 如何知道tuple tree的结构?
2. 如何知道tuple tree上每个节点的运行情况, success或fail?

答案很简单, 你必须告诉它, 如何告诉它?
1. 对于tuple tree的结构, 需要知道每个tuple由哪些tuple产生, 即tree节点间的link
   tree节点间的link称为anchoring. 当每次emit新tuple的时候, 必须显式的通过API建立anchoring

Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple.
Each word tuple is anchored by specifying the input tuple as the first argument to emit.

看下面的代码例子,

_collector.emit(tuple, new Values(word)); 

emit的第一个参数是tuple, 这就是用于建anchoring
当然你也可以直接调用unanchoring的emit版本, 如果不需要保证reliable的话, 这样效率会比较高

_collector.emit(new Values(word));

同时前面说了, 可能一个tuple依赖于多个输入,

An output tuple can be anchored to more than one input tuple.
This is useful when doing streaming joins or aggregations. A multi-anchored tuple failing to be processed will cause multiple tuples to be replayed from the spouts.

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

对于Multi-anchoring的情况会导致tuple tree变为tuple DGA, 当前storm的版本已经可以很好的支持DAG
Multi-anchoring adds the output tuple into multiple tuple trees.
Note that it's also possible for multi-anchoring to break the tree structure and create tuple DAGs,

image

2. 对于tuple tree上每个节点的运行情况, 你需要在每个bolt的逻辑处理完后, 显式的调用OutputCollector的ack和fail来汇报 

This is done by using the ack and fail methods on the OutputCollector.
You can use the fail method on the OutputCollector to immediately fail the spout tuple at the root of the tuple tree.

看下面的例子, 在execute函数的最后会调用,
_collector.ack(tuple);

我比较迷惑, 为啥ack是OutputCollector的function, 而不是tuple的function?
而且就算ack也是应该对bolt的input进行ack, 为啥是output, 可能因为所有input都是其他bolt的output产生...这个设计的比较不合理

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;
        
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

storm为了保证reliable, 必然是要牺牲效率的, 此处storm会在task memory里面去记录你汇报的tuple tree的结构和运行情况.
而只有当某tuple节点被ack或fail后才会被从内存中删除, 所以如果你总是不去ack或fail, 那么会导致task的out of memory

Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don't ack/fail every tuple, the task will eventually run out of memory.

 

简单的版本, BasicBolt

面的机制, 会给程序员造成负担, 尤其对于很多简单的case, 比如filter, 每次都要去显式的建立anchoring和ack…

所以storm提供简单的版本, 会自动的建立anchoring, 并在bolt执行完自动调用ack

A lot of bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the execute method. These bolts fall into the categories of filters and simple functions. Storm has an interface called BasicBolt that encapsulates this pattern for you.

public class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

How do I make my applications work correctly given that tuples can be replayed?

问题是如何保证"fully fault-tolerant exactly-once messaging semantics”, 因为replay会导致一个message在bolt上多次出现, 这样对类似计数这样的应用会有很大影响.
从Storm0.7开始, 给出的transactional topologies功能就比较好的解决这个问题

As always in software design, the answer is "it depends." Storm 0.7.0 introduced the "transactional topologies" feature, which enables you to get fully fault-tolerant exactly-once messaging semantics for most computations. Read more about transactional topologies here.

 

How does Storm implement reliability in an efficient way?

现在讨论的是Storm如何实现reliablility机制, Storm实现一组特殊的'acker’ task来track每一个spout tuple, 同时acker task的个数你可以根据tuple的数量来配置

A Storm topology has a set of special "acker" tasks that track the DAG of tuples for every spout tuple.
When an acker sees that a DAG is complete, it sends a message to the spout task that created the spout tuple to ack the message.
You can set the number of acker tasks for a topology in the topology configuration using Config.TOPOLOGY_ACKERS. Storm defaults TOPOLOGY_ACKERS to one task -- you will need to increase this number for topologies processing large amounts of messages.

所有被产生的tuple都会有一个随机的64bit的id用于被track
tuple之间通过emit时的anchor形成tuple tree, 并且每个tuple都知道产生它的spout tuple的id (通过不断的copy传递)

当任何tuple被acked的时候, 都会send message到相应的acker, 具体例子如下图

When a tuple is created in a topology, whether in a spout or a bolt, it is given a random 64 bit id. These ids are used by ackers to track the tuple DAG for every spout tuple.

Every tuple knows the ids of all the spout tuples for which it exists in their tuple trees. When you emit a new tuple in a bolt, the spout tuple ids from the tuple's anchors are copied into the new tuple. When a tuple is acked, it sends a message to the appropriate acker tasks with information about how the tuple tree changed. In particular it tells the acker "I am now completed within the tree for this spout tuple, and here are the new tuples in the tree that were anchored to me".

For example, if tuples "D" and "E" were created based on tuple "C", here's how the tuple tree changes when "C" is acked:

image

 

当然storm具体怎样通过acker task来track所有的tuples, 还需要解决下面几个问题:

1. 当有多个acker的时候, 当一个tuple被acked的时候, 如果知道给哪一个acker发送message?
因为每个tuple都知道产生它的spout tuple id, 所以使用mod hash(hash方法, m mod n)来分配spout tuple id, 以保证一个spout tuple id所产生的所有tuple tree都会被分配到一个acker上
当某一个tuple被acked的时候, 只要通过hash找到相应的acker即可

You can have an arbitrary number of acker tasks in a topology. This leads to the following question: when a tuple is acked in the topology, how does it know to which acker task to send that information? Storm uses mod hashing to map a spout tuple id to an acker task. Since every tuple carries with it the spout tuple ids of all the trees they exist within, they know which acker tasks to communicate with.

2. 如果有多个spout task的时候, storm在最终ack spout tuple的时候, 如何知道对应于哪个spout task, 因为必须在产生tuple的那个spout task进行ack?
答案很简单, spout task在emit一个新的tuple的时候, 会发message告诉相应的acker它的task id, 所以acker是知道tupleid和taskid的map的

How the acker tasks track which spout tasks are responsible for each spout tuple they're tracking?

When a spout task emits a new tuple, it simply sends a message to the appropriate acker telling it that its task id is responsible for that spout tuple. Then when an acker sees a tree has been completed, it knows to which task id to send the completion message.

3. 如果Acker在内存里面显式的监控所有的tuple tree, 会有扩展问题, 当面对海量tuple或复杂workflow的时候, 很有可能会爆内存, 怎么解决这个问题?
Storm这里采用了一个特别的方法, 这个是storm的主要的突破之一, 该方法的好处就是对于每个spout tuple, 所需要的内存是固定的无论多复杂, 并且只有about 20 bytes
Acker只需要为每个spout tuple存储spout tuple id, task id, ack val
这个ack val, 64 bit number, 用于表示整个tuple tree的状况, 产生方法是tuple tree中所有created和acked的tuple的id进行异或(同为0, 异为1)
当ack val值为0的时候, 即表示tuple tree被完成

这个思路非常巧妙, 两个相同的数去异或为0, 而created和acked时, 会进行两次异或, 所以所有created的tuple都被acked时, 异或值最终为0
我考虑到不同的tupleid之间的位有重叠时, 是否会有干扰, 简单的试一下, 没有干扰

具体acker工作原理参考, Twitter Storm源代码分析之acker工作流程

Acker tasks do not track the tree of tuples explicitly. For large tuple trees with tens of thousands of nodes (or more), tracking all the tuple trees could overwhelm the memory used by the ackers. Instead, the ackers take a different strategy that only requires a fixed amount of space per spout tuple (about 20 bytes). This tracking algorithm is the key to how Storm works and is one of its major breakthroughs. An acker task stores a map from a spout tuple id to a pair of values. The first value is the task id that created the spout tuple which is used later on to send completion messages. The second value is a 64 bit number called the "ack val". The ack val is a representation of the state of the entire tuple tree, no matter how big or how small. It is simply the xor of all tuple ids that have been created and/or acked in the tree. When an acker task sees that an "ack val" has become 0, then it knows that the tuple tree is completed.

 

最后, 考虑task fail的情况,
一般task fail, 导致超时, spout会replay
Acker task fail, 会导致它跟踪的所有tuple无法被ack, 所以会全部超时被spout重发
Spout task fail, 如果spout本身fail, 那么需要源头来负责replay, 比如RabbitMQ或Kafka

Now that you understand the reliability algorithm, let's go over all the failure cases and see how in each case Storm avoids data loss:

  • Task dies: In this case the spout tuple ids at the root of the trees for the failed tuple will time out and be replayed.
  • Acker task dies: In this case all the spout tuples the acker was tracking will time out and be replayed.
  • Spout task dies: In this case the source that the spout talks to is responsible for replaying the messages. For example, queues like Kestrel and RabbitMQ will place all pending messages back on the queue when a client disconnects.

As you have seen, Storm's reliability mechanisms are completely distributed, scalable, and fault-tolerant.

 

Tuning reliability

当然reliability必然会给系统带来较大的overload, 比如number of messages就会翻倍, 由于和acker之间的通信
所以如果不需要reliability, 可以通过下面的方法将其关闭

Acker tasks are lightweight, so you don't need very many of them in a topology. You can track their performance through the Storm UI (component id "__acker"). If the throughput doesn't look right, you'll need to add more acker tasks.

If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations -- then you can improve performance by not tracking the tuple tree for spout tuples. Not tracking a tuple tree halves the number of messages transferred since normally there's an ack message for every tuple in the tuple tree. Additionally, it requires less ids to be kept in each downstream tuple, reducing bandwidth usage.

There are three ways to remove reliability.

1. The first is to set Config.TOPOLOGY_ACKERS to 0. In this case, Storm will call the ack method on the spout immediately after the spout emits a tuple. The tuple tree won't be tracked.

2. The second way is to omit a message id in the SpoutOutputCollector.emit method.

3. Finally, emit them as unanchored tuples


转自:http://www.cnblogs.com/fxjwind/archive/2013/05/08/3066988

相关问答

更多

Apache Kafka对Apache Storm(Apache Kafka vs Apache Storm)

您可以使用Apache Kafka作为分布式和强大的队列,可以处理大量数据,并使您能够将邮件从一个端点传递到另一个端点。 风暴不是队列。 它是一种具有分布式实时处理能力的系统,意味着可以对实时数据进行并行执行各种操作。 这些工具的共同流程(据我所知)如下: 实时系统 - > Kafka - > Storm - > NoSql - > BI(可选) 所以你有你的实时应用程序处理大量的数据,发送到卡夫卡队列。 风暴从卡夫卡拉取数据并应用一些必要的操纵。 在这一点上,您通常希望从这些数据中获得一些好处, ...

在本地运行Storm拓扑时出错(Error running Storm topology locally)

经过几次测试后,我设法解决了这个问题。 使用以下测试环境运行: - Windows 7 SP1 --Apache Storm 1.0.3 - Java 1.8.0_111 - Eclipse Mars.2(4.5.2) 在本地群集上运行拓扑的方法示例: private void runTopology(final StormTopology topology, final String topologyName, final long timeout) { LocalCluster local ...

有没有办法在Storm螺栓的一部分中确认元组(Is there any way to ACK tuples in a part of Storm bolts)

你必须从喷口开始。 为了清楚起见,您不能保证使用所谓的“可靠拓扑”来传递消息。 相反,你可以保证一个元组和它的所有“后代元组”都被完全传递和处理,或者会通知喷口故障。 可以自动重新发送失败的消息,但最终会有一个小窗口,其中不再重试元组。 为了使其工作,spout具有一些可靠的元组行为,螺栓不具备:(1)使用元组发出对象id的能力和(2)当元组最终成功或失败时使用该id调用的方法(分别为ack(id)和fail(id))。 由于螺栓没有这些行为,因此无法从螺栓开始可靠的元组处理。 考虑使用TOPOL ...

apache storm topology id从n跳到n + 2(apache storm topology id skips from n to n+2)

我想我找到了答案。 在群集中,可能会运行更多拓扑。 设A , B , C , D为4个拓扑,在同一个集群中运行。 这是我的情况。 当您启动拓扑时,他们将为每个拓扑分配连续的数字,但是每个群集 (这是我的错误)。 因此我们从: A-1-... B-2-... C-3-... D-4-... 如果你重新启动C ,你就可以了 C-5... 那么C-4在哪里? 它根本不存在,因为D已经占用了4 。 因此,从n yo n+2跳过是正常的。 您可能会发现n+1分配给另一个拓扑。 (QED) I guess ...

风暴(具有多个工作节点)如何在从kafka主题读取时保证消息处理(How does storm (with multiple worker nodes) guarantee message processing while reading from a kafka topic)

我想有一些错过的理解。 以下声明似乎是错误的: 螺栓完成后,喷口向前移动到下一条消息。 在任何给定时间,在风暴集群中并行处理6条消息 =>鲸鱼喷水不等待痘痘; 无论螺栓的处理速度如何,它都会以最大速度反复取出元组 - 只要Kafka中有新消息可用。 (或者您是否通过max.spout.pending限制了飞行中的元组数量?)。 因此,许多消息是并行处理的(即使只有#executors被赋予UDF - 许多其他消息在内部Storm队列中缓冲)。 据我所知(但我不是100%肯定), KafkaSpou ...

同一个Storm spout是否并行接收acked / failed消息(Does same Storm spout receive acked/failed message in parallel)

如果我正确解释Storm的保证消息处理文档,那么发出元组的Spout将始终收到ack / fail调用: “请注意,元组将被创建它的完全相同的Spout任务激活或失败。因此,如果Spout在整个集群中执行尽可能多的任务,则不会通过不同于任务的任务执行或失败元组。创造了它。“ If I'm interpreting Storm's guaranteed message processing documentation correctly, then the Spout that emitted th ...

当消息“完全处理”时,Storm如何知道?(How does Storm know when a message is “fully processed”?)

Storm通过UDF代码必须使用的锚定机制在整个拓扑中跟踪元组。 这种锚定导致所谓的元组树,树的根是由喷口发出的元组,所有其他节点(以树结构连接)表示使用输入元组作为锚点的螺栓发出的元组 (这只是一个逻辑模型,并没有在Storm中以这种方式实现)。 例如,Spout发出一个句子元组,由句子中的第一个螺栓分开,一些单词由第二个螺栓过滤,第三个螺栓应用单词计数。 最后,接收器螺栓将结果写入文件。 树看起来像这样: "this is an example sentence" -+-> "this" ...

在Storm拓扑上,Jedis“无法获得池资源”(Jedis “couldn't get pool resource” on a Storm topology)

我昨天得到了答案,我需要的是将localhost更改为127.0.0.1,然后我在终端上启动Redis数据库,在监视器的第二个终端上启动,我的发布方法正在运行。 I came with the answer yesterday, what I needed is to change the localhost to 127.0.0.1, then I launched the Redis database on a terminal, on a second terminal the monito ...

Storm SQS消息没有被激活(Storm SQS messages not getting acked)

您必须在所有螺栓中ack所有传入的元组,即将collector.ack(input)添加到TransformerBolt.execute(Tuple input) 。 您看到的日志消息是正确的:您的代码调用collector.ack(...)并记录此调用。 拓扑中对ack的调用不是对Spout.ack(...)的调用:每次Spout发出带有消息ID的元组时,此ID都会由拓扑的正在运行的ackers注册。 那些ackers会在Bolt的每个ack上收到一条消息,收集这些消息并在收到元组的所有ack时 ...

Storm acker的困惑和有保证的消息处理(Confusion of Storm acker and guaranteed message processing)

关于消息ID是通用的:在内部它可能是一个64位值,但是这个64位值是作为Spout内emit()提供的msgID对象的散列来计算的。 因此,您可以将任何对象作为消息标识(两个对象散列到相同值的概率接近于零)。 关于使用str :我认为在这个例子中, str包含一行(而不是一个单词),并且文档包含两次完全相同的行(如果没有空行可能很多)是不太可能的。 关于计数器作为消息ID:对于你的观察你是绝对正确的 - 如果多个喷嘴并行运行,这会给消息ID冲突并破坏容错。 如果你想“修复”计数器方法,每个计数器应 ...