回顾
之前我们介绍了使用分布式事务(XA)处理用户下订单,对MySQL有所了解的都知道XA其实是在非用不可的情况下才用的,因为它实在是影响性能。当然,其实迫使我们使用XA的原因也是因为我们的设计决定的(其实使用XA这种分库分表的方案基本能满足那些一定需要强一致性的需求)。
之前我们的设计是为了方便卖家的,所以完整的订单信息只保存在卖家方,而在买家方我们只保存着完整订单的引用。这样做是因为业务需要强一致性,迫使我们使用XA,但我们又希望让数据写磁盘少一点,让插入快一点。
我们想要的
无论在买家还是卖家在操作订单数据的时候都能方便。为了满足这样的情况,我们只能做到数据冗余了。就是买家有一个完整的数据卖家也有一份完整的数据。这样操作起来就能在单机上进行,这样是最方便的。
业务最终一致性
其实,往往再深入分析业务,可以发现其实业务上并非一定需要强一致性。我们的目的只要买家已经下的订单能完整让卖家看到,卖家多久能看到其实并不是很关心。因为如果卖家没看到订单也不会对订单进行操作,也不会发货给买家,这样卖家是不会有损失的。而买家就算是付了款,发现买家没发货,也可以退款。这样一来买家也没有金钱的损失。所以我们这边能使用最终一致性来解决问题。
为什么要最终一致性
说白了就是为了提高性能,因为我们现在需要买家和卖家都有完整的订单数据,为了让买家下单时不用跨库、跨实例或跨节点执行XA事务。这样我们保证买家的订单数据先入库后能马上返回成功信息。而不用关心卖家的数据是否入库,只是提醒卖家有订单信息要入库了。
kafka配合完成最终一致性
要完成最终一致性这种业务,我最先想到的就是使用消息队列了。而在消息队列中我最熟悉的只能是kafka了,我使用kafka的原因是他能高可用,还能将消息持久化。
下面我们就来演示使用kafka来完成最终一致性
重构买家订单表(buy_order_x)结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
— 在每个分库为 buy_order_x 添加price列
ALTER TABLE buy_order_1 ADD `price` decimal(11,2) DEFAULT NULL COMMENT ‘订单价格’;
ALTER TABLE buy_order_2 ADD `price` decimal(11,2) DEFAULT NULL COMMENT ‘订单价格’;
ALTER TABLE buy_order_3 ADD `price` decimal(11,2) DEFAULT NULL COMMENT ‘订单价格’;
ALTER TABLE buy_order_4 ADD `price` decimal(11,2) DEFAULT NULL COMMENT ‘订单价格’;
ALTER TABLE buy_order_5 ADD `price` decimal(11,2) DEFAULT NULL COMMENT ‘订单价格’;
ALTER TABLE buy_order_6 ADD `price` decimal(11,2) DEFAULT NULL COMMENT ‘订单价格’;
ALTER TABLE buy_order_7 ADD `price` decimal(11,2) DEFAULT NULL COMMENT ‘订单价格’;
ALTER TABLE buy_order_8 ADD `price` decimal(11,2) DEFAULT NULL COMMENT ‘订单价格’;
ALTER TABLE buy_order_9 ADD `price` decimal(11,2) DEFAULT NULL COMMENT ‘订单价格’;
ALTER TABLE buy_order_10 ADD `price` decimal(11,2) DEFAULT NULL COMMENT ‘订单价格’;
— 在每个分库为 buy_order_x 添加status列
ALTER TABLE buy_order_1 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT ‘订单状态’;
ALTER TABLE buy_order_2 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT ‘订单状态’;
ALTER TABLE buy_order_3 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT ‘订单状态’;
ALTER TABLE buy_order_4 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT ‘订单状态’;
ALTER TABLE buy_order_5 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT ‘订单状态’;
ALTER TABLE buy_order_6 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT ‘订单状态’;
ALTER TABLE buy_order_7 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT ‘订单状态’;
ALTER TABLE buy_order_8 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT ‘订单状态’;
ALTER TABLE buy_order_9 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT ‘订单状态’;
ALTER TABLE buy_order_10 ADD `status` tinyint(3) unsigned DEFAULT NULL COMMENT ‘订单状态’;
— 最终表结构
CREATE TABLE `buy_order_1` (
`buy_order_id` bigint(20) unsigned NOT NULL COMMENT ‘出售订单ID与出售订单相等’,
`user_id` int(10) unsigned DEFAULT NULL COMMENT ‘下单用户ID’,
`user_guide_id` int(10) unsigned DEFAULT NULL COMMENT ‘导购ID’,
`price` decimal(11,2) DEFAULT NULL COMMENT ‘订单价格’,
`status` tinyint(3) unsigned DEFAULT NULL COMMENT ‘订单状态’,
PRIMARY KEY (`buy_order_id`),
KEY `idx$buy_order_1$user_id` (`user_id`),
KEY `idx$buy_order_1user_guide_id` (`user_guide_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
|
为买家创建订单商品表:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
— 在每一个库都进行添加多个buy_order_goods_x表
— 下面表buy_order_goods_x中的x代表的是 1,2,3,4,5,6,7,8,9,10
CREATE TABLE `buy_order_goods_x` (
`order_goods_id` bigint(20) unsigned NOT NULL COMMENT ‘订单商品表ID’,
`user_id` int(10) unsigned DEFAULT NULL COMMENT ‘下单用户ID’,
`sell_order_id` bigint(20) unsigned NOT NULL COMMENT ‘订单ID’,
`goods_id` bigint(20) unsigned NOT NULL COMMENT ‘商品ID’,
`user_guide_id` int(10) unsigned DEFAULT NULL COMMENT ‘导购ID’,
`price` decimal(11,2) DEFAULT NULL COMMENT ‘商品购买价格价格’,
`num` tinyint(3) unsigned DEFAULT NULL COMMENT ‘商品数量’,
PRIMARY KEY (`order_goods_id`),
KEY `idx$order_goods$orders_id` (`sell_order_id`),
KEY `idx$order_goods$user_id` (`user_id`),
KEY `idx$order_goods$goods_id` (`goods_id`),
KEY `idx$order_goods$user_guide_id` (`user_guide_id`)
);
— 将buy_order_goods加入系统配置中表明该表是分表。
INSERT INTO system_setting VALUES(NULL, ‘sharding_table’, ‘buy_order_goods’);
— 设置buy_order_goods分表的关键字
INSERT INTO system_setting VALUES(NULL, ‘sharding_buy_order_goods_by’, ‘user_id’);
— 指定每种角色需要访问的表
INSERT INTO system_setting VALUES(NULL, ‘normal_user_sharding’, ‘buy_order_goods’);
INSERT INTO system_setting VALUES(NULL, ‘user_guide_sharding’, ‘buy_order_goods’);
INSERT INTO system_setting VALUES(NULL, ‘store_owner_sharding’, ‘buy_order_goods’);
|
这边我不关心kafka是如何搭建的,就直接使用了。要想知道如何搭建kafka请点击一下网址:http://www.linuxidc.com/Linux/2014-07/104470.htm
买家下订单演示:
注意这边python代码使用的是simplejson因此需要安装。安装包simplejson-master
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Program: 客户下订单+kafka 订单推送
# Author : HH
# Date : 2016-02-27
import sys
import mysql.connector
import time
import simplejson as json
import snowflake.client
from pykafka import KafkaClient
reload(sys)
sys.setdefaultencoding(‘utf-8’)
if __name__ == ‘__main__’:
”’
这边模拟用户:username77购买 store2 中的 goods27和goods69商品
”’
# 设置公共库连接配置
db_config_common = {
‘user’ : ‘root’,
‘password’: ‘root’,
‘host’ : ‘127.0.0.1’,
‘port’ : 3306,
‘database’: ‘test’
}
# 设置snowflake链接默认参数
snowflake_config = {
‘host’: ‘192.168.137.11’,
‘port’: 30001
}
# 配置snowflake
snowflake.client.setup(**snowflake_config)
# 配置连接kafka
client = KafkaClient(hosts=“192.168.137.11:9092, \
192.168.137.11:9093, \
192.168.137.11:9094″)
topic = client.topics[‘shop’]
# 创建生产者
producer = topic.get_producer(delivery_reports=False)
# 获得公共数据库的链接和游标
conn_common = mysql.connector.connect(**db_config_common)
cursor_select_common = conn_common.cursor(buffered=True)
# 获得用户:username77的基本信息
select_sql = ”’
SELECT u.user_id,
u.table_flag,
u.db_name,
ss.value
FROM user AS u
LEFT JOIN system_setting AS ss ON u.db_name = ss.name
WHERE username = ‘username77’
”’
cursor_select_common.execute(select_sql)
buy_user_id, buy_table_flag, buy_db_name, buy_db_config_json \
= cursor_select_common.fetchone()
# 获得购买者的链接和游标
conn_buy = mysql.connector.connect(**json.loads(buy_db_config_json))
cursor_select_buy = conn_buy.cursor(buffered=True)
cursor_dml_buy = conn_buy.cursor(buffered=True)
# 通过店铺名称获得导购以及导购所对应的用户所使用的数据库链接描述符
select_sql = ”’
SELECT s.user_id,
ug.user_guide_id,
u.table_flag,
u.db_name,
ss.value AS db_config_json
FROM store AS s
LEFT JOIN user AS u USING(user_id)
LEFT JOIN user_guide AS ug USING(user_id)
LEFT JOIN system_setting AS ss ON ss.name = u.db_name
WHERE s.user_id = 2
”’
cursor_select_common.execute(select_sql)
sell_user_id, user_guide_id, sell_table_flag, sell_dbname, \
sell_db_config_json = cursor_select_common.fetchone()
# 获得出售者的数据库链接描述符以及游标
conn_sell = mysql.connector.connect(**json.loads(sell_db_config_json))
cursor_select_sell = conn_sell.cursor(buffered=True)
# 成订单ID
order_id = snowflake.client.get_guid()
# 获得商品信息并生成商品订单信息。
select_goods_sql = ”’
SELECT goods_id,
price
FROM {goods_table}
WHERE goods_id IN(3794292584748158977, 3794292585729626113)
”’.format(goods_table = ‘goods_’ + str(sell_table_flag))
cursor_select_sell.execute(select_goods_sql)
# 订单价格
order_price = 0
for goods_id, price in cursor_select_sell:
# 生成订单货物ID
guid = snowflake.client.get_guid()
order_price += price
# 生成订单商品信息
insert_order_goods_sql = ”’
INSERT INTO {table_name}
VALUES({guid}, {user_id}, {order_id}, {goods_id}, {user_guide_id},
{price}, 1)
”’.format(table_name = ‘buy_order_goods_’ + str(buy_table_flag),
guid = guid,
user_id = buy_user_id,
order_id = order_id,
goods_id = goods_id,
user_guide_id = user_guide_id,
price = price)
cursor_dml_buy.execute(insert_order_goods_sql)
# 将订单商品信息放入队列中
goods_dict = {
‘user_id’ : sell_user_id,
‘option_type’ : ‘insert’,
‘option_table’: ‘order_goods’,
‘option_obj’ : {
‘order_goods_id’: guid,
‘sell_order_id’ : order_id,
‘goods_id’ : goods_id,
‘user_guide_id’ : user_guide_id,
‘price’ : price,
‘num’ : 1
}
}
goods_json = json.dumps(goods_dict)
producer.produce(goods_json)
# 生成订单记录
insert_order_sql = ”’
INSERT INTO {order_table}
VALUES({order_id}, {user_id}, {user_guide_id},
{price}, 0)
”’.format(order_table = ‘buy_order_’ + str(buy_table_flag),
order_id = order_id,
user_id = buy_user_id,
user_guide_id = user_guide_id,
price = order_price)
cursor_dml_buy.execute(insert_order_sql)
# 将订单信息放入队列中
order_dict = {
‘user_id’ : sell_user_id,
‘option_type’ : ‘insert’,
‘option_table’: ‘sell_order’,
‘option_obj’ : {
‘sell_order_id’ : order_id,
‘user_guide_id’ : user_guide_id,
‘user_id’ : buy_user_id,
‘price’ : order_price,
‘status’ : 0
}
}
order_json = json.dumps(order_dict)
producer.produce(order_json)
producer.stop()
# 提交事物
conn_buy.commit()
# 关闭有标链接
cursor_select_common.close()
cursor_select_buy.close()
cursor_select_sell.close()
cursor_dml_buy.close()
conn_common.close()
conn_buy.close()
conn_sell.close()
|
生成卖家订单信息:
通过上面买家下订单,现在订单的信息在队列中。我们只需要将队列中的数据取出并保存就好了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Program: 客户下订单+kafka 订单推送
# Author : HH
# Date : 2016-02-27
import sys
import mysql.connector
import time
import simplejson as json
from pykafka import KafkaClient
reload(sys)
sys.setdefaultencoding(‘utf-8’)
if __name__ == ‘__main__’:
”’
获取队列数据保存到数据库
”’
# 设置公共库连接配置
db_config_common = {
‘user’ : ‘root’,
‘password’: ‘root’,
‘host’ : ‘127.0.0.1’,
‘port’ : 3306,
‘database’: ‘test’
}
# 配置snowflake
snowflake.client.setup(**snowflake_config)
# 配置连接kafka
client = KafkaClient(hosts=“192.168.137.11:9092, \
192.168.137.11:9093, \
192.168.137.11:9094″)
topic = client.topics[‘shop’]
# 生成消费者
balanced_consumer = topic.get_balanced_consumer(
consumer_group=‘goods_group’,
auto_commit_enable=True,
zookeeper_connect=‘localhost:2181’
)
# 获得公共数据库的链接和游标
conn_common = mysql.connector.connect(**db_config_common)
cursor_select_common = conn_common.cursor(buffered=True)
# 消费信息
for message in balanced_consumer:
if message is not None:
# 解析json为dict
info_dict = json.loads(message.value)
select_sql = ”’
SELECT s.user_id,
ug.user_guide_id,
u.table_flag,
u.db_name,
ss.value AS db_config_json
FROM store AS s
LEFT JOIN user AS u USING(user_id)
LEFT JOIN user_guide AS ug USING(user_id)
LEFT JOIN system_setting AS ss ON ss.name = u.db_name
WHERE s.user_id = {user_id}
”’.format(user_id=info_dict[‘user_id’])
cursor_select_common.execute(select_sql)
sell_user_id, user_guide_id, sell_table_flag, sell_dbname, \
sell_db_config_json = cursor_select_common.fetchone()
# 获得出售者的数据库链接描述符以及游标
conn_sell = mysql.connector.connect(**json.loads(sell_db_config_json))
cursor_dml_sell = conn_sell.cursor(buffered=True)
if info_dict[‘option_type’] == ‘insert’:
# 构造insert sql语句
if info_dict[‘option_table’] == ‘order_goods’:
insert_sql = ”’
INSERT INTO {table_name}_{flag} VALUES(
{order_goods_id},
{sell_order_id},
{goods_id},
{user_guide_id},
{price},
{num}
)
”’.format(table_name = info_dict[‘option_table’],
flag = sell_table_flag,
**info_dict[‘option_obj’]
)
elif info_dict[‘option_table’] == ‘sell_order’:
insert_sql = ”’
INSERT INTO {table_name}_{flag} VALUES(
{sell_order_id},
{user_guide_id},
{user_id},
{price},
{status}
)
”’.format(table_name = info_dict[‘option_table’],
flag = sell_table_flag,
**info_dict[‘option_obj’]
)
# 执行sql
cursor_dml_sell.execute(insert_sql)
conn_sell.commit()
cursor_dml_sell.close()
conn_sell.close()
elif info_dict[‘option_type’] == ‘update’:
pass
|
文章转载来自:ttlsa.com