Apache Samza - Reliable Stream Processing atop Apache Kafka and Hadoop YARN

2019-03-02 23:51|来源: 网路



前两年一直在使用Kafka, 虽说Kafka一直说可用于online分析, 但是实际在使用的时候会发现问题很多, 比如deploy, 调度, failover等, 我们也做了一些相应的工作
Samza算是把这个补全了, 可以更加简单的在Kafka上进行online分析, 所以看着比较亲切


1 Background

首先对messaging系统和stream processing系统的阐述, 很清晰
messaging系统, 数据的传递, 是比较low-level infrastructure, 可以使用各种方案, 纯message queue, pub-sub system 或log aggregation system
stream processing系统, 关注的是更高层的(processing layer)问题, 可以看下面写的一些, 容错, 事务等

What is messaging?

Messaging systems are a popular way of implementing near-realtime asynchronous computation. Messages can be added to a message queue (Active MQ, Rabbit MQ), pub-sub system (Kestrel, Kafka), or log aggregation system (Flume, Scribe) when something happens. Downstream "consumers" read messages from these systems, and process or take action based on the message contents.

What is stream processing?

A messaging system is a fairly low-level piece of infrastructure---it stores messages and waits for consumers to consume them. When you start writing code that produces or consumes messages, you quickly find that there are a lot of tricky problems that have to be solved in the processing layer. Samza aims to help with these problems.

Consider the counting example, above (count page views and update a dashboard).
Fault Tolerant
What happens when the machine that your consumer is running on fails, and your "current count" is lost. How do you recover? Where should the processor be run when it restarts?
Transaction, one and only one
What if the underlying messaging system sends you the same message twice, or loses a message? Your counts will be off.
Partitioned and Distributed
What if you want to count page views grouped by the page URL? How can you do that in a distributed environment?

Stream processing is a higher level of abstraction on top of messaging systems, and it's meant to address precisely this category of problems.



Samza是比相对比较简单的系统, 因为他站在了巨人的肩膀上

提供API, process message API和pluggable API
State management, stream processor checkpoint和local state

很简单, 其实如果熟悉Kafka, 那么Samza相当的好理解

Samza is a stream processing framework with the following features:

  • Simple API: Samza provides a very simple call-back based "process message" API.
  • Managed state: Samza manages snapshotting and restoration of a stream processor's state. Samza will restore a stream processor's state to a snapshot consistent with the processor's last read messages when the processor is restarted. Samza is built to handle large amounts of state (even many gigabytes per partition).
  • Fault tolerance: Samza will work with YARN to transparently migrate your tasks whenever a machine in the cluster fails.
  • Durability: Samza uses Kafka to guarantee that no messages will ever be lost.
  • Scalability: Samza is partitioned and distributed at every level. Kafka provides ordered, partitioned, replayable, fault-tolerant streams. YARN provides a distributed environment for Samza containers to run in.
  • Pluggable: Though Samza works out of the box with Kafka and YARN, Samza provides a pluggable API that lets you run Samza with other messaging systems and execution environments.
  • Processor isolation: Samza works with Apache YARN, to give security and resource scheduling, and resource isolation through Linux CGroups.


2 Concepts

This page gives an introduction to the high-level concepts in Samza.


流, 很容易理解, 由于这里message系统是可以pluggable的, 默认实现的是Kafka, 当然可以替换成其他的DB或HDFS

Samza processes streams. A stream is composed of immutable messages of a similar type or category.
Samza supports pluggable systems that implement the stream abstraction: in Kafka a stream is a topic, in a database we might read a stream by consuming updates from a table, in Hadoop we might tail a directory of files in HDFS.



Job的概念比较小, 只是指一个transformation逻辑

A Samza job is code that performs a logical transformation on a set of input streams to append output messages to set of output streams.

Partitions, 等同于kafka partition

Each stream is broken into one or more partitions. Each partition in the stream is a totally ordered sequence of messages.


Job可以有多个task, 表示Job的最大并行度
对应于Kafka的consumer, task数目不能超过partition的数目, 因为kafka是不允许consumer并发读一个partition的, 所以多了浪费
并且partition和task的分配关系是不会改变的, 当task fail, 所能做的只是在其他地方重启这个task, 而不是换个task来处理这些partition, 因为state是记录在consumer端的, 换一个就不知道之前读到哪儿了

A job is itself distributed by breaking it into multiple tasks. The task is the unit of parallelism of the job, just as the partition is to the stream. Each task consumes data from one partition for each of the job's input streams.
There cannot be more tasks than input partitions.
The partitions assigned to a task will never change: if a task is on a machine that fails the task will be restarted elsewhere still consuming the same stream partitions.


Dataflow Graphs

多个job就可以形成data flow, Job之间通过kafka是完全解耦合的

We can compose multiple jobs to create data flow graph where the nodes are streams containing data and the edges are jobs performing transformations.
This composition is done purely through the streams the jobs take as input and output—the jobs are otherwise totally decoupled: They need not be implemented in the same code base, and adding, removing, or restarting a downstream job will not impact an upstream job.



Yarn的概念, 表示实际的进程, 可以包含多个task

Partitions and tasks are both logical units of parallelism, they don't actually correspond to any particular assignment of computational resources (CPU, memory, disk space, etc). Containers are the unit of physical parallelism, and a container is essentially just a unix process (or linux cgroup). Each container runs one or more tasks.


3 Architecture

架构很容易理解, 尤其在和Hadoop对比后...

Samza is made up of three layers:

  1. A streaming layer.
  2. An execution layer.
  3. A processing layer.

Samza provides out of the box support for all three layers.

  1. Streaming: Kafka
  2. Execution: YARN
  3. Processing: Samza API

image image

实际的交互图, 不同颜色表示不同的instance, 图中用4台机器



Application Master

首先AM本身也是run在一个NM上, 可以认为是job的master, 负责给task分配partition, 向RM申请资源, 和其他TaskRunner的failover.
但当AM本身fail后, 会kill所有的TaskRunner, 等AM恢复后, 重新启动所有的TaskRunner

Samza's main integration with YARN comes in the form of a Samza ApplicationMaster. This is the chunk of code responsible for managing a Samza job in a YARN grid. It decides what to do when a stream processor fails, which machines a Samza job's TaskRunner should run on, and so on.

When the Samza ApplicationMaster starts up, it does the following:

  1. Receives configuration from YARN via the STREAMING_CONFIG environment variable.
  2. Starts a JMX server on a random port.
  3. Instantiates a metrics registry and reporters to keep track of relevant metrics.
  4. Registers the AM with YARN's RM.
  5. Get the total number of partitions for the Samza job using each input stream's PartitionManager (see the Streams page for details).
  6. Read the total number of containers requested from the Samza job's configuration.
  7. Assign each partition to a container (called a Task Group in Samza's AM dashboard).
  8. Make a ResourceRequest to YARN for each container.
  9. Poll the YARN RM every second to check for allocated and released containers.

If the AM, itself, fails, YARN will handle restarting the AM. When the AM is restarted, all containers that were running will be killed, and the AM will start from scratch.



TaskRunner就是run在NM上的真正的执行的container, 进程, 可以包含多个StreamTask instances
需要注意的是, 这里多个StreamTask并不是多个线程, 对于TaskRunner似乎只有一个event loop主线程存在
这里StreamTask instances的个数取决于partition的个数, 有几个partition就会创建几个StreamTask instances, 为什么要创建那么多StreamTask instances?
首先一个Job可能有多个源, 每个partition对应的task逻辑可能不一样, 当然这不是主要原因
主要因为, 每个StreamTask instances都要hold相关partition相关的local state, 比如count


The TaskRunner is Samza's stream processing container. It is responsible for managing the startup, execution, and shutdown of one or more StreamTask instances.

When the a TaskRunner starts up, it does the following:

  1. Get last checkpointed offset for each input stream/partition pair, 检查checkpointed, 恢复之前的offset, 可以继续读取
  2. Create a "reader" thread for every input stream/partition pair
  3. Start metrics reporters to report metrics
  4. Start a checkpoint timer to save your task's input stream offsets every so often
  5. Start a window timer to trigger your StreamTask's window method, if it is defined, 启动window timer, 支持slide window的应用
  6. Instantiate and initialize your StreamTask once for each input stream partition, 为每个partition创建StreamTask实例
  7. Start an event loop that takes messages from the input stream reader threads, and gives them to your StreamTasks
  8. Notify lifecycle listeners during each one of these steps

Event Loop

这里比较明确的说, TaskRunner只有一个event loop线程, 典型的producer/consumer模式
SystemConsumer线程会把从kafka读到的数据放到centralized message queue中, 然后event loop线程负责处理

The event loop is the TaskRunner's single thread that is in charge of reading, writing, metrics flushing, checkpointing, and windowing.
Each SystemConsumer reads messages on its own thread, but writes messages into a centralized message queue. The TaskRunner uses this queue to funnel all of the messages into the event loop. Here's how the event loop works:

  1. Take a message from the incoming message queue (the queue that the SystemConsumers are putting their messages)
  2. Give the message to the appropriate StreamTask by calling process() on it
  3. Call window() on the StreamTask if it implements WindowableTask, and the window time has expired
  4. Send any StreamTask output from the process() and window() call to the appropriate SystemProducers
  5. Write checkpoints for any partitions that are past the defined checkpoint commit interval

The TaskRunner does this, in a loop, until it is shutdown.


4 Features

API Overview

StreamTask, 处理接口

上面看到在TaskRunner需要实现StreamTask的实例, StreamTask中定义了真正的处理逻辑, 用户需要实现他 

/** User processing tasks implement this. */
public interface StreamTask {
  void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception;


1. IncomingMessageEnvelope, 用于封装incoming message, 包含the message, the key, and SystemStreamPartition(the stream that the message came from)
其中SystemStreamPartition封装了这个message从哪儿来的信息, 包含stream name, system(kafka, databus), and stream(Kafka topic's name)

2. MessageCollector用于发送outStream

/** When a task wishes to send a message, it uses this class. */
public interface MessageCollector {
  void send(OutgoingMessageEnvelope envelope);


class MyStreamerTask extends StreamTask {
  def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
    val msg = envelope.getMessage.asInstanceOf[GenericRecord]
    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "SomeTopicPartitionedByMemberId"), msg.get("member_id"), msg))

SystemConsumer and SystemProducer接口, 用于从message系统读写数据

The TaskRunner reads and writes messages using the SystemConsumer and SystemProducer interfaces.

public interface SystemConsumer {
  void start();
  void stop();
  void register(SystemStreamPartition systemStreamPartition, String lastReadOffset);
  List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitions, long timeout) throws InterruptedException;

public interface SystemProducer {
  void start();
  void stop();
  void register(String source);
  void send(String source, OutgoingMessageEnvelope envelope);
  void flush(String source);

这个接口是plugging的, 默认实现了支持Kafka的KafkaSystemConsumer/KafkaSystemProducer, 当然可以实现其他的message系统...

Samza supports reads and writes to Kafka (i.e. it has a KafkaSystemConsumer/KafkaSystemProducer), but the interfaces are pluggable, and most message bus systems can be plugged in, with some degree of support.

TaskRunner会用SystemConsumer从kafka读取数据, 放到一个queue里面, 然后TaskRunner的event loop负责处理, 完了将输出通过SystemProducer写回kafka



因为Kafka的数据是存在disk上的, 所以当TaskRunner fail后, 是可以从新读到数据的, 前提是你还记得上次读到哪儿了, 你如果不在乎每次从头开始读也行
但是如果能记下每个partition读到的offset, 就比较有效, 这里Checkpoint就是干这个事的, 但是只是隔固定的时间取做Checkpoint, 所以只是实现at least once逻辑, 数据可能重复
至于Checkpoint写到哪里? 可以写到文件或Kafka中
Samza provides two checkpoint managers: FileSystemCheckpointManager and KafkaCheckpointManager.

When a TaskRunner instantiates a SystemConsumer for an input stream/partition pair, how does the TaskRunner know where in the stream to start reading messages. If you recall, Kafka has the concept of an offset, which defines a specific location in a topic/partition pair. The idea is that an offset can be used to reference a specific point in a stream/partition pair. When you read messages from Kafka, you can supply an offset to specify at which point you'd like to read from. After you read, you increment your offset, and get the next message.

public interface CheckpointManager {
  void start();
  void register(Partition partition);
  void writeCheckpoint(Partition partition, Checkpoint checkpoint);
  Checkpoint readLastCheckpoint(Partition partition);
  void stop();

public class Checkpoint {
  private final Map<SystemStream, String> offsets; //记录每个partition的offsets



用于实现windowing based应用, 比如每小时计数, WindowableTask会在系统配置的窗口时间到时, 调用window函数

Let's say that the Samza job wants to update the member ID counts in a database once every minute. Here's how it would work. The Samza job that does the counting would keep a Map<Integer, Integer> in memory, which maps member IDs to page view counts. Every time a message arrives, the job would take the member ID in the PageViewEvent, and use it to increment the member ID's count in the in-memory map. Then, once a minute, the StreamTask would update the database (totalcount += currentcount) for every member ID in the map, and then reset the count map.

Windowing is how we achieve this. If a StreamTask implements the WindowableTask interface, the TaskRunner will call the window() method on the task over a configured interval.

public interface WindowableTask {
  void window(MessageCollector collector, TaskCoordinator coordinator);


State Management

One of the more interesting aspects of Samza is the ability for tasks to store data locally and execute rich queries on this data.

Samza比较有特色的是提供基于Local state的state management, 对于一般简单的filter, map, 是不用保留state, 称为stateless
但是对于一些较为复杂的操作, Windowed aggregation(ranking, trend detection, count), Join (Stream-table, Stream-stream), 需要维持临时的state, 称为stateful
保留临时state, 问题来了, 如果保证临时state不丢失?


In-memory state with checkpointing

A simple approach, common in academic stream processing systems, is to periodically save out the state of the task's in-memory data. S4's state management implements this approach—tasks implement Java's serializable interface and are periodically serialized using java serialization to save out copies of the processor state.

最简单的方法, 就是不断给local state做checkpoint
问题是, checkpoint每次都需要给整个local state做, 很难增量做, 所以当localstate很大的时候, 效率有问题
而Cp, 一般都是一段时间做一次, 所以一定会有丢失

Using an external store

In the absence of built-in support a common pattern for stateful processing is to push any state that would be accumulated between rows into an external database or key-value store.

另一个简单的放, 不保存local state, 全存到外部存储上, 比如external database or key-value store

这样数据是不会丢了, 但是明显效率会有问题
还会影响正确性, 比如当task失败了, 之前的state需要作废, 如何让外部存储上的数据回滚


Local state in Samza

原文写了一堆, 我的理解
只是state还是存在local, 但是state的change会生成changelog stream放在kafka上, 这样当有task failover的时候, 可以从kafka上读出change log, 并replay出local state

Samza allows tasks to maintain persistent, mutable, queryable state that is physically co-located with each task. The state is highly available: in the event of a task failure it will be restored when the task fails over to another machine.

You can think of this as taking the remote table out of the remote database and physically partitioning it up and co-locating these partitions with the tasks. This looks something like this:



5 与Storm的比较

最大的不同就是, Kafka

Samza是基于Kafka的, 不用象storm需要用ZMQ去在节点之间传输数据, 所以很多方面都被大大简化了, 具体如下,

1. message tracking

了解Storm的同学, 应该对他的tuple tracking机制有比较深刻的影响, 设计的很精巧, spout需要通过acker去tracking每个tuple, 过程中任意一本没有ack, 这个tuple都会超时, 并被re-emit
比较复杂, 基于流的数据tracking确实很麻烦, 而且这种re-emit的方式, 必须是在tuple can be processed out of order的前提下的, 当然storm也提供transaction topology来解决ordered processing问题, 更复杂了...
一旦一个task失败, tuple被re-emit, 需要重新处理一遍, 资源浪费
而且re-emit是基于超时的, 会比较慢

而Samza就简单了, 所有的input和output都是在kafka上的, 可以随时replay, 哪步失败, 重新执行这步就好, 之前的结果kafka上都有
Samza可以保证一个partition上的有序执行, 但无法保证globe的有序, 而且现在只能保证at-least once, 无法保证only one

2. worker之间的耦合

Storm有topology的概念, 一个完整的workflow, worker和worker之间是耦合的
而Samza只有Job的概念, 而workflow中的各个job被Kafka完全解耦合了

3. back pressure

Storm如果某个blot比较慢, 会导致spout的发送buffer full, 然后停止emit新的tuple
而Samza不会因为一个job慢, 而影响其他的job

4. fault tolerant……


其他, Samza提供更好的state management, Samza的Parallelism取决于container个数而非task的个数……


个人而言, 比较喜欢Samza, 有个Kafka作为后盾, 想想就觉得踏实呵呵, 参考 The Log: What every software engineer should know about real-time data's unifying abstraction

但是Samza比较新, 具体的performance和稳定性需要测试




你使用Apache Kafka做什么?(What do you use Apache Kafka for?)

我不这么认为。 卡夫卡是消息传递系统, 它不在数据库之上 。 您可以将Kafka与ActiveMQ , RabbitMQ等消息系统进行比较 从Apache文档页面 Kafka是分布式的,分区的,复制的提交日志服务。 它提供了消息传递系统的功能,但具有独特的设计。 关键要点: 卡夫卡在主题类别中维护消息源。 我们将调用将消息发布给Kafka主题生产者的进程。 我们将调用订阅主题的流程并处理发布的消息消费者的源。 Kafka作为由一个或多个服务器组成的集群运行,每个服务器称为代理。 客户端和服务器之间 ...

使用Apache YARN在Spring云数据流上部署失败(Unsuccessful deployment on Spring Cloud Dataflow with Apache YARN)

好吧,这里没有太多的信息可以继续,但是您可能需要检查并确保您在HDFS中创建了必要的基本目录,并且您的Yarn用户拥有读取/写入权限。 spring.cloud.deployer.yarn.app.baseDir = /数据流 Thanks to all for your answers! Been busy with other high-priority projects, but I was able to build all using Ambari (including the plug ...

为什么在实时处理中使用apache kafka(Why using apache kafka in real-time processing)

我认为使用Apache Kafka进行实时处理有三个主要原因: 分配 性能 可靠性 在实时处理中,需要快速可靠地将数据从数据源传送到流处理器。 如果你做得不好,它很容易成为你的实时处理系统的瓶颈。 这是卡夫卡可以提供帮助的地方。 之前,传统的消息传递ApacheMQ和RabbitMQ并不是特别适合实时处理大量数据。 出于这个原因,Linkedin工程师开发了自己的消息传递系统Apache Kafka,以便能够解决这个问题。 分布: Kafka是原生分布的,适合流处理的分发性质。 Kafka将传入数 ...

Apache Samza本地存储 - OrientDB / Neo4J图形而不是KV存储(Apache Samza local storage - OrientDB / Neo4J graph instead of KV store)

我一直在评估Samza,我不是专家,但我建议你阅读官方文档 ,甚至阅读源代码 - 除了它在Scala中的事实,它非常平易近人。 在这种特殊情况下, 在状态管理的文档页面的底部,您有: 其他存储引擎 Samza的容错机制(将本地商店的写入发送到复制的更改日志)与存储引擎的数据结构和查询API完全分离。 虽然键值存储引擎适用于通用处理,但您可以通过实现StorageEngine接口轻松地为其他类型的查询添加自己的存储引擎。 Samza的模型特别适用于嵌入式存储引擎,它在与流任务相同的过程中作为库运行。 ...

Apache Kafka可以与C ++一起使用吗?(Can Apache Kafka be used with C++?)

是的,您可以将Apache Kafka与C / C ++一起使用。 到目前为止,最流行的C / C ++客户端是https://github.com/edenhill/librdkafka 。 您可以使用客户端从Kafka读取数据并将数据写回Kafka。 有关librdkafka的更多文档,请访问http://docs.confluent.io/current/clients/index.html(librdkafka的作者,Magnus Edenill,在汇编处工作)。 Yes, you can ...

使用Hadoop YARN的Apache Spark问题(Issue with Apache Spark working on Hadoop YARN)

您的网址无效。 HDFS中没有home文件夹。 试试这个: ./bin/spark-submit /home/hduser/count.py /user/hduser/data.txt Your URL is not valid. There is no home folder in HDFS. Try this instead: ./bin/spark-submit /home/hduser/count.py /user/hduser/data.txt

无法在YARN群集上运行Apache Giraph(Hadoop 2.5.2)(Trouble running Apache Giraph on YARN cluster (Hadoop 2.5.2))

好的,事实证明这很简单。 我使用hadoop_2配置文件构建了Giraph,而不是hadoop_yarn。 当我使用纱线轮廓构建它时,不再发生这种情况。 我不明白它是如何工作的整个机制,但显然使用该配置文件构建会更改一些默认值,这些默认值会在运行时将其置于纯YARN模式。 所以,如果你得到这个,重建使用 mvn -Phadoop_yarn clean package 这可能会解决它。 OK, this turned out to be fairly simple. I built Giraph ...

是否应该单独安装Apache Kafka和Hadoop(在不同的群集上)?(Should Apache Kafka and Hadoop be installed seperatedly (on a diffrent cluster)?)

Kafka是一种分布式发布 - 订阅消息传递系统,旨在实现快速,可扩展和持久。 如果数据将被多个应用程序使用,您可以去Kafka。 希望这个链接有所帮助..它有一些信息http://www.datanami.com/2014/12/02/kafka-run-natively-hadoop/ Kafka is a distributed publish-subscribe messaging system that is designed to be fast, scalable, and dura ...

Apache Beam流处理json数据(Apache Beam stream processing of json data)

这有多个方面: 首先你需要确定数据来自哪里: 您需要在Beam管道中使用某种IO,请参阅此处 ; 有一堆内置的IO,请看这里的列表; 通过使用上述链接中的IO,您可能会得到包含这些JSON对象的字符串流; 一些IO可以本地解析Avro和其他格式(PubsubIO),这取决于具体的IO实现; 那么你可能需要转换数据: 您将需要创建自己的PTransform来处理从JSON字符串到Java类的转换: 请参阅此处有关PTransforms的部分; 你可以在这里看到这样一个变换的例子: 这个JsonToR ...

如何在HDFS上部署和运行Samza作业?(How to deploy & run Samza job on HDFS?)

我们以这种方式部署我们的Samza作业:我们在/opt/hadoop有hadoop库,我们在/opt/samza/bin有Samza sh脚本,我们在/opt/samza/bin config中有Samza配置文件。 在这个配置文件中有这一行: yarn.package.path=hdfs://hadoop1:8020/deploy/samza/samzajobs-dist.tgz 当我们想要部署我们的Samza作业的新版本时,我们只创建tgz存档,我们将它(无需解开)移动到HDFS到/deplo ...