storm 开发系列二 Clojue版本

对于第一个简单的topology,现在用clojure实现一遍。正好可以对比一下。

创建工程用lein app模板创建工程demo$ lein new app hello_storm_cljGenerating a project called hello_storm_clj based on the ‘app’ template.注意,app是lein提供的模板名称,以下是所有模板Subtasks available:default A general project template for libraries.pluginA leiningen plugin project template.appAn application project template.template A meta-template for ‘lein new’ templates.支持cider

在project.clj中添加一行配置,使得可以使用Emacs cider进行开发

:plugins [[cider/cider-nrepl "0.10.0-SNAPSHOT"]] 运行hello_storm_clj$ lein runHello, World!一切正常。

编写程序

程序都写在core.clj文件中

编写hello-spout(defspout hello-spout ["word"] [conf context collector] (let [values ["china" "usa" "japan" "russia" "england"]](spout(nextTuple [](Thread/sleep 100)(emit-spout! collector [(rand-nth values)])))))

说明

1. defspout是定义spout的函数

第一个参数是spout名称,第二个是output declaration,,所以用vector [ ….]的形式表示多个keys,可以有多个,比如["key1" "key2], 第三个参数是个vector, 里面包含了topology config, topology context 和 output collector。

2. 变量values是个vector, 里面准备好了几个字符串

3. (spout …) 里面运行的是nextTuple动作,和Java一样

4. emit-spout! 里面使用了rand-nth随机选取values的元素,然后发射出去

编写hello-bolt(defbolt hello-bolt ["word"] [tuple collector] (bolt (execute [tuple](let [value (.getString tuple 0)](emit-bolt! collector (str "hello, " value) :anchor tuple)(ack! collector tuple)))))

说明

1. (defbolt …) 定义一个bolt

第一个参数是bolt名称,第二个是output declaration, 和前面spout一样,第三个参数也是vector, 包含了上游传递进来的tuple和输出需要的collector。

2. (bolt …)定一个了具体的代码,就是前面拼上"hello, "前缀。

3. emit-bolt! 最后两个参数是跟踪tuple, 将收到的tuple和发出的tuple通过:anchor链接起来,起到跟踪和保证消息被处理的机制。如果不需要保证消息一定被处理,这两个参数可以不用。而变成

(emit-bolt! collector (str "hello, " value))组装topology(defn mk-topology [] (topology {"a" (spout-spec hello-spout :p 10)} {"b" (bolt-spec {"a" :shuffle} hello-bolt :p 5)}))

10 和 5 分别是parralism。

添加依赖

在project.clj中添加依赖,依赖参考pom.xml的maven依赖设置,org.apache.storm是groupId, storm-core是artifactId

:dependencies [[org.clojure/clojure "1.5.1"] [org.apache.storm/storm-core "0.9.5"]]

注意,这里clojure jar的版本必须降级使用,高版本会报bug

291 [Thread-8] ERROR backtype.storm.event – Error when processing eventjava.lang.IllegalStateException: Attempting to call unbound fn: #’backtype.storm.util/some?本地运行(defn run-local! [] (let [cluster (LocalCluster.)](.submitTopology cluster "hello_clj" {TOPOLOGY-DEBUG true} (mk-topology))(Thread/sleep 10000)(.shutdown cluster)))

然后启动M-x cider-jack-in, 运行(run-local!),,观察buffer . * *nrepl-server l的输出,内容很多,下面只是一部分

35197 [Thread-28-a] INFO backtype.storm.daemon.task – Emitting: a default ["china"]35197 [Thread-44-b] INFO backtype.storm.daemon.executor – Processing received message source: a:7, stream: default, id: {}, ["china"]35223 [Thread-30-a] INFO backtype.storm.daemon.task – Emitting: a default ["japan"]35223 [Thread-32-a] INFO backtype.storm.daemon.task – Emitting: a default ["china"]35223 [Thread-46-b] INFO backtype.storm.daemon.executor – Processing received message source: a:8, stream: default, id: {}, ["japan"]35223 [Thread-46-b] INFO backtype.storm.daemon.executor – Processing received message source: a:9, stream: default, id: {}, ["china"]35234 [Thread-34-a] INFO backtype.storm.daemon.task – Emitting: a default ["japan"]35235 [Thread-42-b] INFO backtype.storm.daemon.executor – Processing received message source: a:10, stream: default, id: {}, ["japan"]35240 [Thread-36-a] INFO backtype.storm.daemon.task – Emitting: a default ["china"]

执行成功。

参考文档

https://storm.apache.org/documentation/Clojure-DSL.html

版权声明:本文为博主原创文章,未经博主允许不得转载。

害怕攀登高峰的人,永远在山下徘徊。

storm 开发系列二 Clojue版本

相关文章:

你感兴趣的文章:

标签云: