22.4.7. 初始化Storm
这边我们在(10.10.10.21 storm_1、10.10.10.22 storm_2、10.10.10.23 storm_3)这三台部署storm。
- 到官网下载Stormtorm(apache-storm-0.9.6.zip)
- 解压到/usr/local/目录下,三台机子都执行同样的命令
1
2
|
[root@storm_1 wordcount]# unzip apache-storm-0.9.6.zip
[root@storm_1 wordcount]# mv apache-storm-0.9.6 /usr/local/
|
- 设置yaml配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
[root@storm_2 wordcount]# cat /usr/local/apache-storm-0.9.6/conf/storm.yaml
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# “License”); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########### These MUST be filled in for a storm configuration
# storm.zookeeper.servers:
# – “server1”
# – “server2”
#
# nimbus.host: “nimbus”
#
#
# ##### These may optionally be filled in:
#
## List of custom serializations
# topology.kryo.register:
# – org.mycompany.MyType
# – org.mycompany.MyType2: org.mycompany.MyType2Serializer
#
## List of custom kryo decorators
# topology.kryo.decorators:
# – org.mycompany.MyDecorator
#
## Locations of the drpc servers
# drpc.servers:
# – “server1”
# – “server2”
## Metrics Consumers
# topology.metrics.consumer.register:
# – class: “backtype.storm.metric.LoggingMetricsConsumer”
# parallelism.hint: 1
# – class: “org.mycompany.MyMetricsConsumer”
# parallelism.hint: 1
# argument:
# – endpoint: “metrics-collector.mycompany.org”
storm.zookeeper.servers:
– “storm_1”
– “storm_2”
– “storm_3”
nimbus.host: “storm_1”
storm.local.dir: “/u01/storm/status”
supervisor.slots.ports:
– 6700
– 6701
– 6702
– 6703
|
- 创建Storm运行时目录
1
|
[root@storm_1 wordcount]# mkdir -p /u01/storm/status
|
- 启动Storm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# Node1:启动 storm UI界面
[root@storm_1 wordcount]# /usr/local/apache-storm-0.9.6/bin/storm ui > /dev/null 2>&1 &
# Node1:启动 storm nimbus
[root@storm_1 wordcount]# /usr/local/apache-storm-0.9.6/bin/storm nimbus > /dev/null 2>&1 &
# Node2:启动 supervisor
[root@storm_1 wordcount]# /usr/local/apache-storm-0.9.6/bin/storm supervisor > /dev/null 2>&1 &
# Node3:启动 supervisor
[root@storm_1 wordcount]# /usr/local/apache-storm-0.9.6/bin/storm supervisor > /dev/null 2>&1 &
# 在各个节点上运行 jps 查看服务状态
[root@storm_1 wordcount]# lps
2151 core
2097 QuorumPeerMain
3969 Jps
2191 nimbus
|
- 开启web界面访问Storm UI
看到上的界面就说明我们的Storm已经部署完毕了。
22.4.1. 构建streamparse(Python Storm框架)
streamparse 是Python Storm的一个框架,他可以将python代码打包为一个jar包运行在Storm中。
官网:http://streamparse.readthedocs.io/en/master/quickstart.html。
(PS:streamparse 3 以上的拓扑已经改变。和作者沟通过他是为了让streamparse能够更好的独立运行,从而脱离storm环境。)
- 创建3机信任,分别在3台机子上都生成ssh的公钥,分别执行以下命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
[root@storm_1 ~]# ssh-keygen -t rsa
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
1e:20:62:da:f5:fb:69:32:da:ac:09:ef:7c:35:a5:01 root@storm_3
The key‘s randomart image is:
+—[ RSA 2048]——+
| |
| E |
| o o .. |
| + o o .. . |
|. . . S+ |
| o+. |
| . .... |
| + ++... |
| .B+o+o |
+————————–+
|
执行完上面命令后会在各个主机的 ~/.ssh/ 目录下会生成 id_rsa.pub 文件。将3台机子中的公钥都拷贝到一个文件中并且让3台机子的这个文件内容都一样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# storm_1 节点
[root@storm_1 ~]# cat ~/.ssh/id_rsa.pub
ssh–rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQD0z8u8K0wGWLhhzxcztokzVWHqKf1O5PScjkIvXFh2AnEqZz+d5/LqyT6qDi1T8p+k4UHCkgmRqWZbG+LEtzQEjE3/Guya4uEP5g8MGvnLUSQQWS5oQN6EAq2fQ7G806fipQCEKWETF7axk6We1NNUjO27c071OMQ2JXM7PLVQACaUcaI8sJg3uHOs7Bcb871tyiuXtVygpyjJEBxvzpuxcjEJI/C/Yu+q28KXRfSWblJ7hIN7c8eIGYumTi4vSKo3Rwwm5UGvBIopK8Xc4SmvrZ6jpHInR2YLQsEznrcR9MprhHXPeFgnoJ3vCLeXbOesmH+h6Cb4UJChUR7owWKr root@storm_1
# storm_2 节点
[root@storm_2 ~]# cat ~/.ssh/id_rsa.pub
ssh–rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC/n9bY6jD8Z2mkgZLO9meXAMNvDt/YJRpcTM57ko2p9Cmm4c+CgQzstBExOAciQR9ckranLj8k/GYDBL5jBIBjquvVEtA06sCp6vGsvUOOOg07VgrmpbEvGNovNa8bfVOXR5cSbqwoesPu33wG43WXDdpD7vKU9YrqUyNXj1xPi+xTQwWkUMz9zEH8zwYuhD7pglP7iJsvzl/GpJRA5kwlPj0PWOLocq8D26pNSMiP034Ah9bojpM6jnbFT4lXeV85PdCABhcqyLZVNiKqU/Yozx1Ui9UsXfPLcHl1SnvIOBFRIaih5WzZ0CMAENXzjrfSxvrFGCYLrwORO/uSJc0t root@storm_2
# storm_3 节点
[root@storm_3 ~]# cat ~/.ssh/id_rsa.pub
ssh–rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCzwB3Qq0ElUY6EDRYQF5NupPtQ6hILzzDrVp9GGdavdsjxlO1kD5LroeP2s94A38u0jbXiEYJZhNprfA+a+UuT6DtVVIIl9/gPrNlRUFLy+8vbzhN9G8hsqcB0nb3VNtnMJGsS9QyOmOieqp4fW15HZn0jQIS+TgmgaMeaMlK8LV5cO0S4sCjPTbtXMDKZ/oNWFenZ143Ul4ViAPudLm9o6ik4UkFaP847cxyKy/jgpDdEQBibRucrTiQWoJ/uhiHH020MqEv6H2ZbmjOXbEQLFo8b6feSJSp0RaFZuook0CNs88QXxSKRw+kKEDlEZGCUuvFHLfIzV7C4PExEViml root@storm_3
# 每个节点中的 authorized_keys 文件内容
[root@storm_1 ~]# cat ~/.ssh/authorized_keys
ssh–rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQD0z8u8K0wGWLhhzxcztokzVWHqKf1O5PScjkIvXFh2AnEqZz+d5/LqyT6qDi1T8p+k4UHCkgmRqWZbG+LEtzQEjE3/Guya4uEP5g8MGvnLUSQQWS5oQN6EAq2fQ7G806fipQCEKWETF7axk6We1NNUjO27c071OMQ2JXM7PLVQACaUcaI8sJg3uHOs7Bcb871tyiuXtVygpyjJEBxvzpuxcjEJI/C/Yu+q28KXRfSWblJ7hIN7c8eIGYumTi4vSKo3Rwwm5UGvBIopK8Xc4SmvrZ6jpHInR2YLQsEznrcR9MprhHXPeFgnoJ3vCLeXbOesmH+h6Cb4UJChUR7owWKr root@storm_1
ssh–rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC/n9bY6jD8Z2mkgZLO9meXAMNvDt/YJRpcTM57ko2p9Cmm4c+CgQzstBExOAciQR9ckranLj8k/GYDBL5jBIBjquvVEtA06sCp6vGsvUOOOg07VgrmpbEvGNovNa8bfVOXR5cSbqwoesPu33wG43WXDdpD7vKU9YrqUyNXj1xPi+xTQwWkUMz9zEH8zwYuhD7pglP7iJsvzl/GpJRA5kwlPj0PWOLocq8D26pNSMiP034Ah9bojpM6jnbFT4lXeV85PdCABhcqyLZVNiKqU/Yozx1Ui9UsXfPLcHl1SnvIOBFRIaih5WzZ0CMAENXzjrfSxvrFGCYLrwORO/uSJc0t root@storm_2
ssh–rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCzwB3Qq0ElUY6EDRYQF5NupPtQ6hILzzDrVp9GGdavdsjxlO1kD5LroeP2s94A38u0jbXiEYJZhNprfA+a+UuT6DtVVIIl9/gPrNlRUFLy+8vbzhN9G8hsqcB0nb3VNtnMJGsS9QyOmOieqp4fW15HZn0jQIS+TgmgaMeaMlK8LV5cO0S4sCjPTbtXMDKZ/oNWFenZ143Ul4ViAPudLm9o6ik4UkFaP847cxyKy/jgpDdEQBibRucrTiQWoJ/uhiHH020MqEv6H2ZbmjOXbEQLFo8b6feSJSp0RaFZuook0CNs88QXxSKRw+kKEDlEZGCUuvFHLfIzV7C4PExEViml root@storm_3
|
- 在3台机子上创建config文件(3台机子都要执行)
1
|
[root@storm_1 wordcount]# touch /root/.ssh/config
|
- 下载 lein 文件到 /usr/local/bin 目录中,授予可执行权限(3台机子都要执行)
1
2
3
|
[root@storm_1 wordcount]# wget https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein
[root@storm_1 wordcount]# mv lein /usr/local/bin/
[root@storm_1 wordcount]# chmod 755 /usr/local/bin/lein
|
- 安装streamparse(3台机子都要执行)
1
|
[root@storm_1 wordcount]# pip install streamparse
|
- 创建storm_project 目录,并且开始一个简单的Storm项目(在storm_2上操作),这边不要再Storm启动的Nimbus节点上创建,因为到时候运行Storm项目会有端口上的冲突。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
[root@storm_2 ~]# mkdir -p /u01/storm_project
[root@storm_2 ~]# cd /u01/storm_project/
[root@storm_2 storm_project]# pwd
/u01/storm_project
[root@storm_2 ~]# sparse quickstart wordcount
Creating your wordcount streamparse project...
create wordcount
create wordcount/.gitignore
create wordcount/config.json
create wordcount/fabfile.py
create wordcount/project.clj
create wordcount/README.md
create wordcount/src
create wordcount/src/bolts/
create wordcount/src/bolts/__init__.py
create wordcount/src/bolts/wordcount.py
create wordcount/src/spouts/
create wordcount/src/spouts/__init__.py
create wordcount/src/spouts/words.py
create wordcount/topologies
create wordcount/topologies/wordcount.py
create wordcount/virtualenvs
create wordcount/virtualenvs/wordcount.txt
Done.
Try running your topology locally with:
cd wordcount
sparse run
|
- 设置json配置文件(在storm_2上操作)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
[root@storm_2 wordcount]# cat /u01/storm_project/wordcount/config.json
{
“library”: “”,
“topology_specs”: “topologies/”,
“virtualenv_specs”: “virtualenvs/”,
“envs”: {
“prod”: {
“user”: “root”,
“nimbus”: “storm_1”,
“workers”: [
“storm_1”,
“storm_2”,
“storm_3”
],
“log”: {
“path”: “/tmp/storm/stream/log”,
“file”: “pystorm_{topolopy_name}_{component_name}_{task_id}_{pid}.log”,
“max_bytes”: 1000000,
“backup_count”: 10,
“level”: “info”
},
“use_ssh_for_nimbus”: true,
“virtualenv_root”: “/tmp/storm/stream/virtualenvs”
}
}
}
|
- 创建相关目录(3个机器上都需要执行)
1
2
|
[root@storm_1 wordcount]# mkdir -p /tmp/storm/stream/log
[root@storm_1 wordcount]# mkdir -p /tmp/storm/stream/virtualenvs
|
- 将wordcount程序提交到Storm集群上(在storm_2上操作)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
[root@storm_2 wordcount]# pwd
/u01/storm_project/wordcount
[root@storm_2 wordcount]# sparse submit
[storm_1] Executing task ‘_create_or_update_virtualenv’
[storm_2] Executing task ‘_create_or_update_virtualenv’
... omit ...
[storm_1] run: rm /tmp/streamparse_requirements–oD8qdm4We.txt
[storm_3] out:
[storm_3] run: rm /tmp/streamparse_requirements–5greXfqjW.txt
Cleaning from prior builds...
# 需要敲回车键
Creating topology Uber–JAR...
# 需要敲回车键
Uber–JAR created: /u01/storm_project/wordcount/_build/wordcount–0.0.1–SNAPSHOT–standalone.jar
Deploying “wordcount” topology...
ssh tunnel to Nimbus storm_1:6627 established.
Routing Python logging to /tmp/storm/stream/log.
Running lein command to submit topology to nimbus:
lein run –m streamparse.commands.submit_topology/–main topologies/wordcount.clj —option ‘topology.workers=2’ —option ‘topology.acker.executors=2’ —option ‘topology.python.path=”/tmp/storm/stream/virtualenvs/wordcount/bin/python”‘ —option ‘streamparse.log.path=”/tmp/storm/stream/log”‘ —option ‘streamparse.log.max_bytes=1000000’ —option ‘streamparse.log.backup_count=10’ —option ‘streamparse.log.level=”info”‘
WARNING: You‘re currently running as root; probably by accident.
Press control–C to abort or Enter to continue as root.
Set LEIN_ROOT to disable this warning.
# 需要敲回车键
{:option {streamparse.log.level info, streamparse.log.backup_count 10, streamparse.log.max_bytes 1000000, streamparse.log.path /tmp/storm/stream/log, topology.python.path /tmp/storm/stream/virtualenvs/wordcount/bin/python, topology.acker.executors 2, topology.workers 2}, :debug false, :port 6627, :host localhost, :help false}
1604 [main] INFO backtype.storm.StormSubmitter – Jar not uploaded to master yet. Submitting jar...
1620 [main] INFO backtype.storm.StormSubmitter – Uploading topology jar /u01/storm_project/wordcount/_build/wordcount–0.0.1–SNAPSHOT–standalone.jar to assigned location: /u01/storm/status/nimbus/inbox/stormjar–03200d7a–dec1–44a6–b0f7–e775d0227864.jar
3853 [main] INFO backtype.storm.StormSubmitter – Successfully uploaded topology jar to assigned location: /u01/storm/status/nimbus/inbox/stormjar–03200d7a–dec1–44a6–b0f7–e775d0227864.jar
3854 [main] INFO backtype.storm.StormSubmitter – Submitting topology wordcount in distributed mode with conf {“streamparse.log.backup_count”:10,“streamparse.log.path”:“\/tmp\/storm\/stream\/log”,“topology.python.path”:“\/tmp\/storm\/stream\/virtualenvs\/wordcount\/bin\/python”,“topology.debug”:false,“nimbus.thrift.port”:6627,“topology.max.spout.pending”:5000,“nimbus.host”:“localhost”,“topology.workers”:2,“topology.acker.executors”:2,“streamparse.log.max_bytes”:1000000,“streamparse.log.level”:“info”,“topology.message.timeout.secs”:60}
4487 [main] INFO backtype.storm.StormSubmitter – Finished submitting topology: wordcount
|
如果输出类似上面的信息就算是部署完成了。
- 确认wordcount程序已经部署到了 Storm中
- 停止Storm中的wordcount程序
1
2
3
4
5
6
7
8
9
|
[root@storm_2 wordcount]# pwd
/u01/storm_project/wordcount
[root@storm_2 wordcount]# sparse kill -n wordcount
WARNING: You‘re currently running as root; probably by accident.
Press control–C to abort or Enter to continue as root.
Set LEIN_ROOT to disable this warning.
5180 [main] INFO backtype.storm.thrift – Connecting to Nimbus at localhost:6627
Killed topology: wordcount
|
出现上面信息就说明wordcount程序已经从Storm集群中停止并移除了。
22.4.9. streamparse代码编写
由于这是示例程序,我们就在之前创建好的wordcount项目中修改代码。在这里我们只需要修改spout和bolt的代码就好。
这边我们需要安装Python Kafka和Python MongoDB的相关模块,执行如下命令:
1
2
3
4
5
6
|
# 在操作系统自带的Python中安装,主要是为了使用sparse run时会调用
pip install pykafka
pip install pymongo
# 在streamparse Storm Python虚拟环境中安装(sparse submit)
/tmp/storm/stream/virtualenvs/wordcount/bin/pip install pykafka
/tmp/storm/stream/virtualenvs/wordcount/bin/pip install pymongo
|
- words.py代码(spout)
words.py的功能就是不断消费kafka产生的消息,并且发送(emit)下面一个接收者(spout|bolt)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
[root@storm_2 spouts]# pwd
/u01/storm_project/wordcount/src/spouts
[root@storm_2 spouts]#
[root@storm_2 spouts]#
[root@storm_2 spouts]# cat words.py
# -*- coding:utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import itertools
from streamparse.spout import Spout
from pykafka import KafkaClient
import simplejson as json
import sys
reload(sys)
sys.setdefaultencoding(‘utf-8’)
class WordSpout(Spout):
def initialize(self, stormconf, context):
# self.words = itertools.cycle([‘dog’, ‘cat’,
# ‘zebra’, ‘elephant’])
client = KafkaClient(hosts=“10.10.10.11:9092”)
topic = client.topics[b“test”]
self.balanced_consumer = topic.get_balanced_consumer(
consumer_group=b“test_group”,
auto_commit_enable=True,
zookeeper_connect=“storm_1:2181,storm_2:2181,storm_3:2181”
)
def next_tuple(self):
# word = next(self.words)
# self.emit([word])
message = self.balanced_consumer.consume()
# Logstash字符串转化为dict
log_info = json.loads(message.value)
word = log_info[“message”]
with open(“/tmp/storm.log”, “a”) as f:
f.write(word)
self.emit([word])
|
- py代码
wordcount.py主要是实现了,接收从words.py发送的信息(json字符串),并将接收的到信息解析成转化成python的字典类型,分析数据存放到MongoDB(10.10.10.12)中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
[root@storm_2 bolts]# pwd
/u01/storm_project/wordcount/src/bolts
[root@storm_2 bolts]#
[root@storm_2 bolts]# cat wordcount.py
# -*- coding:utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from collections import Counter
from streamparse.bolt import Bolt
import simplejson as json
from pymongo import MongoClient
import sys
reload(sys)
sys.setdefaultencoding(‘utf-8’)
class WordCounter(Bolt):
def initialize(self, conf, ctx):
# self.counts = Counter()
client = MongoClient(b“10.10.10.12:27017,10.10.10.12:27018,10.10.10.12:27019”,
replicaset=“rs_12”)
# 获得 order_stat 数据库
self.db = client.shop
def process(self, tup):
# 获得从spout传过来的字符串
word = tup.values[0]
# self.counts[word] += 1
# self.emit([word, self.counts[word]])
# self.log(‘%s: %d’ % (word, self.counts[word]))
# 将spout传来的字符串解析成dict
order_info = json.loads(word)
# 通过 kafka 传入的 user_name 查找相关用户统计信息
condition = {“user_name”: order_info[“user_name”]}
order_stat_info = self.db.order_stat.find_one(condition)
## 如果order_stat_info无值则插入, 有值则更新
# 1、无值情况
if not order_stat_info:
order_stat_info_new = {
“user_name”: order_info.get(“user_name”, “Unknow”),
“order_num”: 1,
“total_price”: order_info.get(“price”, 0.00),
“min_order_price”: order_info.get(“price”, 0.00),
“max_order_price”: order_info.get(“price”, 0.00),
“min_order”: order_info.get(“order_id”, 0),
“max_order”: order_info.get(“order_id”, 0),
}
self.db.order_stat.insert_one(order_stat_info_new)
# 2、有值情况
else:
min_order_price = min(order_stat_info[“min_order_price”],
order_info.get(“price”, 0.00))
max_order_price = max(order_stat_info[“max_order_price”],
order_info.get(“price”, 0.00))
min_order = order_stat_info[“min_order”]
max_order = order_stat_info[“max_order”]
# 设置 最小order id
if min_order_price == order_info.get(“price”, 0.00):
min_order = order_info.get(“order_id”, min_order)
# 设置 最大order id
if max_order_price == order_info.get(“price”, 0.00):
max_order = order_info.get(“order_id”, max_order)
# 构造更新的信息
order_stat_info_new = {
“order_num”: order_stat_info[“order_num”] + 1,
“total_price”: order_stat_info[“total_price”] +
order_info.get(“price”, 0.00),
“min_order_price”: min_order_price,
“max_order_price”: max_order_price,
“min_order”: min_order,
“max_order”: max_order
}
# 跟新信息
self.db.order_stat.update_one({“_id”: order_stat_info[“_id”]},
{“$set”: order_stat_info_new})
|
编写好上面代码之后就需要测试运行情况了。
- 运行streamparse进行测试
由于我们还不知道我们写的代码正确性,因此需要使用sparse run来记性调试,而非使用sparse submit直接提交到Storm环境中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
[root@storm_2 wordcount]# pwd
/u01/storm_project/wordcount
[root@storm_2 wordcount]#
[root@storm_2 wordcount]# sparse run
... Omit ...
8653 [Thread–15–count–bolt] INFO backtype.storm.task.ShellBolt – Launched subprocess with pid 3719
8703 [Thread–16–word–spout] INFO backtype.storm.spout.ShellSpout – Launched subprocess with pid 3717
8706 [Thread–13–count–bolt] INFO backtype.storm.task.ShellBolt – Start checking heartbeat...
8706 [Thread–13–count–bolt] INFO backtype.storm.daemon.executor – Prepared bolt count–bolt:(3)
8708 [Thread–15–count–bolt] INFO backtype.storm.task.ShellBolt – Start checking heartbeat...
8708 [Thread–15–count–bolt] INFO backtype.storm.daemon.executor – Prepared bolt count–bolt:(4)
8708 [Thread–16–word–spout] INFO backtype.storm.daemon.executor – Opened spout word–spout:(5)
8715 [Thread–16–word–spout] INFO backtype.storm.daemon.executor – Activating spout word–spout:(5)
8715 [Thread–16–word–spout] INFO backtype.storm.spout.ShellSpout – Start checking heartbeat...
|
- 向Logstash(10.10.11)监听的文件中输入相关的订单信息
1
2
3
4
5
6
7
8
9
|
echo ‘{“order_id”:1, “price”:20, “user_name”:”Bob”, “goods_name”:”good_name2″}’ > /tmp/orders.log
echo ‘{“order_id”:2, “price”:120, “user_name”:”Bob”, “goods_name”:”good_name1″}’ >> /tmp/orders.log
echo ‘{“order_id”:3, “price”:1120, “user_name”:”Bob”, “goods_name”:”good_name4″}’ >> /tmp/orders.log
echo ‘{“order_id”:4, “price”:11120, “user_name”:”Bob”, “goods_name”:”good_name3″}’ >> /tmp/orders.log
echo ‘{“order_id”:1, “price”:10, “user_name”:”Tom”, “goods_name”:”good_name2″}’ >> /tmp/orders.log
echo ‘{“order_id”:2, “price”:110, “user_name”:”Tom”, “goods_name”:”good_name1″}’ >> /tmp/orders.log
echo ‘{“order_id”:3, “price”:1110, “user_name”:”Tom”, “goods_name”:”good_name4″}’ >> /tmp/orders.log
echo ‘{“order_id”:4, “price”:11110, “user_name”:”Tom”, “goods_name”:”good_name3″}’ >> /tmp/orders.log
|
- 查看MongoDB(10.10.12)中的订单统计信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
[root@normal_12 ~]# /u01/mongodb_27018/client_mongodb.sh
MongoDB shell version: 3.2.5
connecting to: 10.10.10.12:27018/test
(test) 01:01:10>
(test) 01:01:11> use shop
switched to db shop
(shop) 01:01:16>
(shop) 01:22:32>db.order_stat.find()
{
“_id” : ObjectId(“5734bba0172d290f86e2d2e4”),
“total_price” : 12380,
“min_order_price” : 20,
“min_order” : 1,
“order_num” : 4,
“max_order_price” : 11120,
“user_name” : “Bob”,
“max_order” : 4
}
{
“_id” : ObjectId(“5734bbf1172d290f844d2fdc”),
“total_price” : 12230,
“min_order_price” : 10,
“min_order” : 1,
“order_num” : 3,
“max_order_price” : 11110,
“user_name” : “Tom”,
“max_order” : 4
}
|
- 最后只要将我们的项目提交到Storm上面去就好了
1
2
3
4
|
[root@storm_2 wordcount]# pwd
/u01/storm_project/wordcount
[root@storm_2 wordcount]#
[root@storm_2 wordcount]# sparse submit
|
到这里我们就使用Python完成了Storm环境的搭建和开发。
22.4. 总结
其实许多的系统中都不纯属于的OLTP或者OLAP,基本上是他们两个的结合体。当OLTP中掺杂OLAP的时候有时候如果单靠数据库查询来解决问题,这样就会造成OLTP系统变的慢(因为查询变大而复杂)。因此,遇到这类的情况就需要在架构层面上去解决了。现在,Storm和Spark都是用于实时计算。因此,有碰到类似以上场景的朋友,可以考虑给系统换上“新装”了。
文章转载来自:ttlsa.com