Storm-源码分析-Topology Submit-Nimbus-mk-assignments

2019-03-02 23:51|来源: 网路

什么是"mk-assignment”, 主要就是产生executor->node+port关系, 将executor分配到哪个node的哪个slot上(port代表slot, 一个slot可以run一个worker进程, 一个worker包含多个executor线程)

先搞清什么是executor, 参考Storm-源码分析- Component ,Executor ,Task之间关系 

 

;; get existing assignment (just the executor->node+port map) -> default to {}
;; filter out ones which have a executor timeout
;; figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors should be in each slot (e.g., 4, 4, 4, 5)
;; only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots
;; edge case for slots with no executor timeout but with supervisor timeout... just treat these as valid slots that can be reassigned to. worst comes to worse the executor will timeout and won't assign here next time around
(defnk mk-assignments [nimbus :scratch-topology-id nil]
  (let [conf (:conf nimbus)
        storm-cluster-state (:storm-cluster-state nimbus)
        ^INimbus inimbus (:inimbus nimbus) 
        ;; 1. 读出所有active topology信息 (read all the topologies)
        topology-ids (.active-storms storm-cluster-state) ;;读出所有topology的ids
        topologies (into {} (for [tid topology-ids]
                              {tid (read-topology-details nimbus tid)})) ;;{tid, TopologyDetails.}
        topologies (Topologies. topologies)
        ;; 2. 读出当前的assignemnt情况(read all the assignments)
        assigned-topology-ids (.assignments storm-cluster-state nil) ;;已经被assign的tids
        existing-assignments (into {} (for [tid assigned-topology-ids]
                                        ;; for the topology which wants rebalance (specified by the scratch-topology-id)
                                        ;; we exclude its assignment, meaning that all the slots occupied by its assignment
                                        ;; will be treated as free slot in the scheduler code.
                                        (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
                                          {tid (.assignment-info storm-cluster-state tid nil)})))
        ;;3. 根据取到的Topology和Assignement情况, 对当前topology进行新的assignment (make the new assignments for topologies)
        topology->executor->node+port (compute-new-topology->executor->node+port
                                       nimbus
                                       existing-assignments
                                       topologies
                                       scratch-topology-id)
               
        now-secs (current-time-secs)
        
        basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state)
        
        ;; construct the final Assignments by adding start-times etc into it
        new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port
                                        :let [existing-assignment (get existing-assignments topology-id)
                                              all-nodes (->> executor->node+port vals (map first) set)
                                              node->host (->> all-nodes
                                                              (mapcat (fn [node]
                                                                        (if-let [host (.getHostName inimbus basic-supervisor-details-map node)]
                                                                          [[node host]]
                                                                          )))
                                                              (into {}))
                                              all-node->host (merge (:node->host existing-assignment) node->host)
                                              reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port)
                                              start-times (merge (:executor->start-time-secs existing-assignment)
                                                                (into {}
                                                                      (for [id reassign-executors]
                                                                        [id now-secs]
                                                                        )))]]
                                   {topology-id (Assignment.
                                                 (master-stormdist-root conf topology-id)
                                                 (select-keys all-node->host all-nodes)
                                                 executor->node+port
                                                 start-times)}))]

    ;; tasks figure out what tasks to talk to by looking at topology at runtime
    ;; only log/set when there's been a change to the assignment
    (doseq [[topology-id assignment] new-assignments
            :let [existing-assignment (get existing-assignments topology-id)
                  topology-details (.getById topologies topology-id)]]
      (if (= existing-assignment assignment)
        (log-debug "Assignment for " topology-id " hasn't changed")
        (do
          (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
          (.set-assignment! storm-cluster-state topology-id assignment)
          )))
    (->> new-assignments
          (map (fn [[topology-id assignment]]
            (let [existing-assignment (get existing-assignments topology-id)]
              [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))] 
              )))
          (into {})
          (.assignSlots inimbus topologies))
    )) 

1. 读出所有active topology信息

先使用active-storms去zookeeper上读到所有active的topology的ids
然后使用read-topology-details读出topology的更多的详细信息,
并最终封装成TopologyDetails, 其中包含关于topology的所有信息, 包含id, conf, topology对象, work数, component和executor关系

(active-storms [this]
  (get-children cluster-state STORMS-SUBTREE false) ;"/storms”
  )
(defn read-topology-details [nimbus storm-id]
  (let [conf (:conf nimbus)
        storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil) ;从zookeeper读出storm-base的内容
        topology-conf (read-storm-conf conf storm-id) ;从storm本地目录中读出topology的配置
        topology (read-storm-topology conf storm-id) ;从storm本地目录中读出topology的对象(反序列化)
       executor->component (->> (compute-executor->component nimbus storm-id) ;读出executor和component的对应关系
                                      (map-key (fn [[start-task end-task]]
                                              (ExecutorDetails. (int start-task) (int end-task)))))] ;将executor封装成ExecutorDetials对象
    (TopologyDetails. storm-id
                      topology-conf
                      topology
                      (:num-workers storm-base)
                      executor->component
                      )))

最终将topologies信息, 封装成Topologies, 提供根据tid或name的对topology的检索

public class Topologies {
    Map<String, TopologyDetails> topologies;
    Map<String, String> nameToId;
}

 

2. 读出当前的assignemnt情况

从Assignment的定义可以看出, Assignment主要就是executor和host+port的对应关系

(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs])
 
image

 

StormClusterState相关的都是去Zookeeper上面读写数据

    (reify
     StormClusterState    
     (assignments [this callback]
        (when callback
          (reset! assignments-callback callback))
        (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback))) ;/assignments
      )

读出所有/assignments下面的topology信息

assigned-topology-ids (.assignments storm-cluster-state nil) ;读出所有的topology ids
existing-assignments (into {} (for [tid assigned-topology-ids] ;根据topologyid, 读出具体的信息(scratch topology概念,需要rebalance的topology,当前assignment都已经无效, 所以不需要读)
                                 ;; for the topology which wants rebalance (specified by the scratch-topology-id)
                                 ;; we exclude its assignment, meaning that all the slots occupied by its assignment
                                 ;; will be treated as free slot in the scheduler code.
                                 (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
                                   {tid (.assignment-info storm-cluster-state tid nil)})))
 

3. 根据取到的Topology和Assignement情况, 对当前topology进行新的assignment

主要就是调用compute-new-topology->executor->node+port, 在真正调用scheduler.schedule 之前, 需要做些准备工作

3.1 ~3.6, topology assignment情况
a. 从zk获取topology中executors的assignment信息, 但是assignment是静态信息.
    我们还需要知道, assign完后这些executor是否在工作, 更新executor的hb, 并找出alive-executors, 这部分assignment才是有效的assignment, 所以仅仅将alive-executors封装生成topology->scheduler-assignment
b. 在check topology assignment中, 发现的dead slot
    对于那些没有hb的executor, 我们认为是slot产生了问题, 称为dead slot, 后面需要避免再往dead slot分配executor (dead slot可能有alive-executors存在)

3.7~3.8, supervisor的情况

根据supervisor的hb, 获取当前alive的supervisor的状况SupervisorDetails, 主要是hostname, 和allports(配置的ports – dead slots)

3.9, cluster, topology的运行态信息, 包含上面的两点信息

cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)

 

(defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]
  (let [conf (:conf nimbus)
        storm-cluster-state (:storm-cluster-state nimbus)
        ;;3.1  取出所有已经assignment的topology的executors信息
        ;;所有已经assignment的Topology所包含的executor, {t1 #([1 2] [3 3]), t2 #([1 2] [3 3])}
        topology->executors (compute-topology->executors nimbus (keys existing-assignments));;只包含存在assignment信息的, 所以新的或scratch Topology都不算
        ;;3.2 更新所有executors的heartbeats cache(更新nimbus-data的heartbeats-cache) 
        ;; update the executors heartbeats first. 
        _ (update-all-heartbeats! nimbus existing-assignments topology->executors) ;;只是为了在let中提前调用update-all-heartbeats!, 所以使用'_' 
       ;; 3.3  过滤topology->executors, 保留alive的
        topology->alive-executors (compute-topology->alive-executors nimbus
                                                                     existing-assignments
                                                                     topologies
                                                                     topology->executors
                                                                     scratch-topology-id)
        ;;3.4 找出dead slots 
        supervisor->dead-ports (compute-supervisor->dead-ports nimbus
                                                               existing-assignments
                                                               topology->executors
                                                               topology->alive-executors)
        ;;3.5 生成alive executor的SchedulerAssignment
        topology->scheduler-assignment (compute-topology->scheduler-assignment nimbus
                                                                               existing-assignments
                                                                               topology->alive-executors)
        ;;3.6 找出missing-assignment-topologies, 需要从新assign                                          
        missing-assignment-topologies (->> topologies
                                           .getTopologies ;;返回TopologyDetials.
                                           (map (memfn getId)) ;;get topologyid
                                           (filter (fn [t]
                                                      (let [alle (get topology->executors t)
                                                            alivee (get topology->alive-executors t)]
                                                            (or (empty? alle)
                                                                (not= alle alivee)
                                                                (< (-> topology->scheduler-assignment
                                                                       (get t)
                                                                       num-used-workers )
                                                                   (-> topologies (.getById t) .getNumWorkers)
                                                                   ))
                                                            ))))
        ;;3.7 all-scheduling-slots, 找出所有supervisor在conf中已配置的slots        
        all-scheduling-slots (->> (all-scheduling-slots nimbus topologies missing-assignment-topologies)
                                  (map (fn [[node-id port]] {node-id #{port}}))
                                  (apply merge-with set/union))
        ;;3.8 生成所有supervisors的SupervisorDetails  
        supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
        ;;3.9 生成cluster        
        cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)

        ;;3.10 call scheduler.schedule to schedule all the topologies
        ;; the new assignments for all the topologies are in the cluster object.
        _ (.schedule (:scheduler nimbus) topologies cluster)
        new-scheduler-assignments (.getAssignments cluster)
        ;; add more information to convert SchedulerAssignment to Assignment
        new-topology->executor->node+port (compute-topology->executor->node+port new-scheduler-assignments)]
    ;; print some useful information.
    (doseq [[topology-id executor->node+port] new-topology->executor->node+port
            :let [old-executor->node+port (-> topology-id
                                          existing-assignments
                                          :executor->node+port)
                  reassignment (filter (fn [[executor node+port]]
                                         (and (contains? old-executor->node+port executor)
                                              (not (= node+port (old-executor->node+port executor)))))
                                       executor->node+port)]]
      (when-not (empty? reassignment)
        (let [new-slots-cnt (count (set (vals executor->node+port)))
              reassign-executors (keys reassignment)]
          (log-message "Reassigning " topology-id " to " new-slots-cnt " slots")
          (log-message "Reassign executors: " (vec reassign-executors)))))

    new-topology->executor->node+port))

3.1  取出所有已经assignment的topology的executors信息

这里的实现有些问题, compute-topology->executors会调用compute-executors重新计算一般, 其实从topologies里面直接就可以取到

 

3.2 更新所有executors的heartbeats cache(更新nimbus-data的heartbeats-cache)

具体过程是, 从Zookeeper通过get-worker-heartbeat读出所有executors最新的heartbeats信息(通过executor->node+port可以对应到worker), 并使用swap!将最新的heartbeats信息更新到nimbus的全局变量heartbeats-cache中

 

3.3  过滤topology->executors, 保留alive的

调用compute-topology->alive-executors

(defn- compute-topology->alive-executors [nimbus existing-assignments topologies topology->executors scratch-topology-id]
  "compute a topology-id -> alive executors map"
  (into {} (for [[tid assignment] existing-assignments
                 :let [topology-details (.getById topologies tid)
                       all-executors (topology->executors tid)
                       alive-executors (if (and scratch-topology-id (= scratch-topology-id tid));;这里其实不会出现scratch-topology, 前面都已经过滤过
                                         all-executors
                                         (set (alive-executors nimbus topology-details all-executors assignment)))]]
             {tid alive-executors})))

调用alive-executors, 来通过刚刚更新的heartbeats cache来判断executor是否alive

    (->> all-executors
        (filter (fn [executor]
          (let [start-time (get executor-start-times executor)
                nimbus-time (-> heartbeats-cache (get executor) :nimbus-time)]
            (if (and start-time
                   (or
                    (< (time-delta start-time)
                       (conf NIMBUS-TASK-LAUNCH-SECS))
                    (not nimbus-time)
                    (< (time-delta nimbus-time)
                       (conf NIMBUS-TASK-TIMEOUT-SECS))
                    ))
              true
              (do
                (log-message "Executor " storm-id ":" executor " not alive")
                false))
            )))
        doall))) ;doall很重要, 确保真正filter每个executor, 否则只会产生lazy-seq

3.4 找出dead slots

首先slot就是对node+port的抽象封装, 一个slot可以运行一个worker, 所以在supervisor分配多少slot就可以运行多少worker
而对于executor是线程, 所以往往dead executor意味着, 这个workerslot dead.

;; TODO: need to consider all executors associated with a dead executor (in same slot) dead as well,
;; don't just rely on heartbeat being the same

调用compute-supervisor->dead-ports, 逻辑
找到dead-executors, dead-executors (set/difference all-executors alive-executors)
把dead-executors 对应的node+port都当成dead slots

public class WorkerSlot {
    String nodeId;
    int port;
}
判断dead-slots的逻辑, 很简单
dead-slots (->> (:executor->node+port assignment) ; [executor [node port]]
                 (filter #(contains? dead-executors (first %)))
                 vals)]] ;返回所有values组成的seq

最终返回所有dead slots, {nodeid #{port1, port2},…}

 

3.5 生成alive executor的SchedulerAssignment

“convert assignment information in zk to SchedulerAssignment, so it can be used by scheduler api”

把alive executor的assignment(executor->node+port), 转化并封装为SchedulerAssignmentImpl, 便于后面scheduler使用

    public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
        this.topologyId = topologyId;
        this.executorToSlot = new HashMap<ExecutorDetails, WorkerSlot>(0);
    } 
SchedulerAssignmentImpl, 记录了topology中所有executor, 以及每个executor对应的workerslot, 可见executor作为assignment的单位

3.6 找出missing-assignment-topologies, 需要从新assign (当前逻辑没有用到, 在sechduler里面会自己判断(判断逻辑相同))

什么叫missing-assignment, 满足下面任一条件

topology->executors, 其中没有该topolgy, 说明该topology没有assignment信息, 新的或scratch
topology->executors != topology->alive-executors, 说明有executor dead
topology->scheduler-assignment中的实际worker数小于topology配置的worker数 (可能上次assign的时候可用slot不够, 也可能由于dead slot造成)

 

3.7 all-scheduling-slots, 找出所有supervisor在conf中已配置的slots

(defn- all-scheduling-slots
  [nimbus topologies missing-assignment-topologies]
  (let [storm-cluster-state (:storm-cluster-state nimbus)
        ^INimbus inimbus (:inimbus nimbus)        
        supervisor-infos (all-supervisor-info storm-cluster-state nil)
        supervisor-details (dofor [[id info] supervisor-infos]
                             (SupervisorDetails. id (:meta info)))

        ret (.allSlotsAvailableForScheduling inimbus
                     supervisor-details
                     topologies
                     (set missing-assignment-topologies)
                     )
        ]
    (dofor [^WorkerSlot slot ret]
      [(.getNodeId slot) (.getPort slot)]
      )))

3.7.1 all-supervisor-info

从zk上读到每个supervisor的info, supervisor的hb, 返回{supervisorid, info}

SupervisorInfo的定义,

(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs])

参考下面设置SupervisorInfo的代码(mk-supervisor), 可以知道每个字段的意思

(SupervisorInfo. (current-time-secs) ;;hb时间
                 (:my-hostname supervisor) ;;机器名
                 (:assignment-id supervisor) ;;assignment-id = supervisor-id, 每个supervisor生成的uuid
                 (keys @(:curr-assignment supervisor)) ;;supervisor上当前使用的ports (curr-assignment, port->executors)
                 (.getMetadata isupervisor) ;;在conf里面配置的supervisor的ports
                 (conf SUPERVISOR-SCHEDULER-META) ;;用户在conf里面配置的supervior相关的metadata,比如name,可以任意kv
                 ((:uptime supervisor)))))] ;;closeover了supervisor启动时间的fn, 调用可以算出uptime, 正常运行时间
 
(defn- all-supervisor-info
  ([storm-cluster-state] (all-supervisor-info storm-cluster-state nil))
  ([storm-cluster-state callback]
     (let [supervisor-ids (.supervisors storm-cluster-state callback)] ;;从zk的superviors目录下读出所有superviors-id
       (into {}
             (mapcat
              (fn [id]
                (if-let [info (.supervisor-info storm-cluster-state id)] ;;从zk读取某supervisor的info
                  [[id info]]
                  ))
              supervisor-ids))
       )))

3.7.2 SupervisorDetails

将supervisor-info封装成SupervisorDetails, (SupervisorDetails. id (:meta info)))

public class SupervisorDetails {
    String id; //supervisor-id
    /**
     * hostname of this supervisor
     */
    String host;
    Object meta; 
    /**
     * meta data configured for this supervisor
     */
    Object schedulerMeta;
    /**
     * all the ports of the supervisor
     */
    Set<Integer> allPorts;
}

3.7.3 allSlotsAvailableForScheduling
此处inimbus的实现是standalone-nimbus, 参考nimbus launch-server!的参数

(defn standalone-nimbus []
  (reify INimbus
    (prepare [this conf local-dir]
      )
    (allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments]
      (->> supervisors
           (mapcat (fn [^SupervisorDetails s]
                     (for [p (.getMeta s)] ;;meta里面放的是conf里面配置的ports list, 对每一个封装成WorkerSlot
                       (WorkerSlot. (.getId s) p)))) ;;可见nodeid就是supervisorid, nnid, 而不是ip
           set ))
    (assignSlots [this topology slots]
      )
    (getForcedScheduler [this]
      nil )
    (getHostName [this supervisors node-id]
      (if-let [^SupervisorDetails supervisor (get supervisors node-id)]
        (.getHost supervisor)))
    ))

这只用到supervisors参数, 把每个supervisor中配置的workerslot取出, 合并为set返回

最终得到的是supervisor中配置的所有slots的nodeid+port的集合, {node1 #{port1 port2 port3}, node2 #{port1 port2}}

当然这只是给出了allSlotsAvailableForScheduling最简单的实现, 可以通过更改这里的逻辑来change slots的选择策略, 比如在某些情况下, 某些slots非available

 

3.8 生成SupervisorDetails

关键是填上all-ports, all-scheduling-slots – dead-ports

(defn- read-all-supervisor-details [nimbus all-scheduling-slots supervisor->dead-ports]
    (let [storm-cluster-state (:storm-cluster-state nimbus)
        supervisor-infos (all-supervisor-info storm-cluster-state)
        ;;在all-scheduling-slots中有, 但是在supervisor-infos(zk的hb)没有的supervisor
;;什么情况下会有这种case, 当前实现all-scheduling-slots本身就来自supervisor-infos, 应该不存在这种case
        nonexistent-supervisor-slots (apply dissoc all-scheduling-slots (keys supervisor-infos)) 
        ;;生成supervisor-details, 参考前面supervisor-info和supervisor-details的定义
        all-supervisor-details (into {} (for [[sid supervisor-info] supervisor-infos
                                              :let [hostname (:hostname supervisor-info)
                                                    scheduler-meta (:scheduler-meta supervisor-info)
                                                    dead-ports (supervisor->dead-ports sid)
                                                    ;; hide the dead-ports from the all-ports
                                                    ;; these dead-ports can be reused in next round of assignments
                                                    all-ports (-> (get all-scheduling-slots sid)
                                                                  (set/difference dead-ports) ;;去除dead-ports,
                                                                  ((fn [ports] (map int ports))))
                                                    supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]]
                                          {sid supervisor-details}))]
    (merge all-supervisor-details 
           (into {}
              (for [[sid ports] nonexistent-supervisor-slots]
                [sid (SupervisorDetails. sid nil ports)]))
           )))

3.9 生成cluster

package backtype.storm.scheduler;
public class Cluster {
    /**
     * key: supervisor id, value: supervisor details
     */
    private Map<String, SupervisorDetails>   supervisors;
    /**
     * key: topologyId, value: topology's current assignments.
     */
    private Map<String, SchedulerAssignmentImpl> assignments;

    /**
     * a map from hostname to supervisor id.
     */
    private Map<String, List<String>>        hostToId;
    
    private Set<String> blackListedHosts = new HashSet<String>();
    private INimbus inimbus;
}

3.10 调用scheduler.schedule

Storm-源码分析- Scheduler


3.11 转化new assignment的格式, 打印相应的提示信息


调用compute-topology->executor->node+port, "convert {topology-id -> SchedulerAssignment} to {topology-id -> {executor [node port]}}"

和existing-assignments进行比较, 打印出reassignment的结果

 

4. 将新的assignment结果存储到Zookeeper

根据Assignment的定义, 除了executor->node+port以外, 还有些辅助信息, 比如start-time

(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs])

所以首先补充这些辅助信息, 主要就是更新reassign-executors的start time, 并封装成Assignment record

如果新的assignment有变化, 更新到Zookeeper上

(.set-assignment! storm-cluster-state topology-id assignment)

最终调用INimbus.assignSlots, 用于在zookeeper上assignment change之后, 做后续处理

而standalone-nimbus中assignSlots没有做实际的操作


转自:http://www.cnblogs.com/fxjwind/p/3144246

相关问答

更多

在Storm群集上运行多个拓扑的问题(Issue with running more than one topology on storm cluster)

增加supervisor.slots.ports的端口数量。 现在它只有4个端口(6700到6703)意味着只有4个工作人员可以在该监控机器上运行。 如果您有4个拓扑,每个拓扑有10个工作线程,那么您必须添加40个端口(意味着从6700到6739)。 increase the number of ports in supervisor.slots.ports. now it has only 4 ports (6700 to 6703) means only 4 workers will run ...

使用pyleus:NoClassDefFoundError:backtype / storm / topology / IRichBolt [duplicate](Using pyleus: NoClassDefFoundError: backtype/storm/topology/IRichBolt [duplicate])

我想我会回答我自己的问题。 基于我在github.com和javadocs中看到的...我相信pyleus不支持1.0.1。 javadocs表明,在以前的版本中,IRichBolt处于backtype.storm.topology但现在它存在于org.apache.storm.topology 。 谁知道还有什么是不相容的。 所以我想现在,运行一个旧版本的Storm(我只看到0.9.4兼容的引用,所以也许0.9.6也可以)。 我确实看到有一个关于0.10.0的开放拉取请求,所以我想在支持1.0. ...

在本地运行Storm拓扑时出错(Error running Storm topology locally)

经过几次测试后,我设法解决了这个问题。 使用以下测试环境运行: - Windows 7 SP1 --Apache Storm 1.0.3 - Java 1.8.0_111 - Eclipse Mars.2(4.5.2) 在本地群集上运行拓扑的方法示例: private void runTopology(final StormTopology topology, final String topologyName, final long timeout) { LocalCluster local ...

Apache Storm:拓扑是否在Supervisor节点上的至少一个工作者上复制?(Apache Storm: Is the topology replicated on atleast one worker on a Supervisor Node?)

Storm不会复制拓扑。 如果部署拓扑,则所有执行程序线程均匀分布在所有工作节点上(使用循环调度机制)。 拓扑可以使用的工作节点数,可以通过Config.setNumWorkers(int);配置Config.setNumWorkers(int); 。 如果您有计算密集型螺栓,并且希望确保将其部署到自己的工作程序,则需要实现自定义计划程序。 有关详细信息,请与她联系: https : //xumingming.sinaapp.com/885/twitter-storm-how-to-develop ...

如何在风暴拓扑中使用drools(how to use drools in storm topology)

我有一个类似的问题,使用Drools和JMH作为阴影罐。 Drools使用ServiceRegistry方法。 这意味着Drools库(drools-compiler,kie-ci,drools-decisiontables,...)包含相同的命名属性文件,指示它们提供的接口的实现。 阴影jar插件通常将(传递)依赖项展平为一个jar。 对于多次存在的文件,这通常意味着如果没有另外指定,则选择其中一个文件。 对于ServiceRegistry属性,我们需要组合所有文件。 通常这是通过Service ...

如何开发(本地)和部署Storm Topology(远程)?(How to develop (locally) and deploy Storm Topology (remotely)?)

检查此链接 现在我可以在Netbeans中开发拓扑,在本地测试它们,并最终将它们部署到集群上的Nimbus。 这个解决方案对我很有用!!! 添加到配置文件: conf.put(Config.NIMBUS_HOST, "123.456.789.101); //YOUR NIMBUS'S IP conf.put(Config.NIMBUS_THRIFT_PORT,6627); //int is expected here 另外,添加以下内容: System.setProperty("storm.jar ...

apache storm topology id从n跳到n + 2(apache storm topology id skips from n to n+2)

我想我找到了答案。 在群集中,可能会运行更多拓扑。 设A , B , C , D为4个拓扑,在同一个集群中运行。 这是我的情况。 当您启动拓扑时,他们将为每个拓扑分配连续的数字,但是每个群集 (这是我的错误)。 因此我们从: A-1-... B-2-... C-3-... D-4-... 如果你重新启动C ,你就可以了 C-5... 那么C-4在哪里? 它根本不存在,因为D已经占用了4 。 因此,从n yo n+2跳过是正常的。 您可能会发现n+1分配给另一个拓扑。 (QED) I guess ...

无法使用kafka-storm向apache storm提交拓扑(Unable to submit topology to apache storm using kafka-storm)

我最终通过使用maven repo中预编译的storm-kafka版本并在拓扑中添加过滤器螺栓而不是在spout本身中进行过滤来解决这个问题。 从而消除了对storm-core和storm-kafka本地编译的jar文件的需求。 这不是一个“解决方案”,但它是解决问题的一种方法。 I eventually worked around this problem by using a pre-compiled version of storm-kafka from a maven repo and a ...

如何在同一个实例上运行多个风暴拓扑?(How to run multiple storm topology at the same instance?)

我只使用了2个supervisor.slots.ports(在storm.yaml中)。我的拓扑配置中的setNumWorkers()也被指定为2.Hence,首先运行的拓扑占用了两个插槽。 现在,我已经设置了5个supervisor.slots.ports并且工作正常。我能够一次运行多个拓扑。 I was using only 2 supervisor.slots.ports(in storm.yaml).And setNumWorkers() in my topology configurat ...

在Storm拓扑上,Jedis“无法获得池资源”(Jedis “couldn't get pool resource” on a Storm topology)

我昨天得到了答案,我需要的是将localhost更改为127.0.0.1,然后我在终端上启动Redis数据库,在监视器的第二个终端上启动,我的发布方法正在运行。 I came with the answer yesterday, what I needed is to change the localhost to 127.0.0.1, then I launched the Redis database on a terminal, on a second terminal the monito ...