1.1. 前言
这边我们使用Python的M/R框架MRJob来分析
1.2. M/R步骤
M/R分析数据步骤一般有 3 步:
Mapper => Shuffle => Reduce
Mapper: 一般是解析文件中的每一行数据并生成自定义的 key 和 value 输出
Shuffle: 是一个 patition、sort、combine 组成的。一般这个过程是自动进行的。当然我们也可以定义或重写方法来实现自己的 Shuffle。
Reduce: 接收 Mapper 和 Shuffle 输出的 key和value迭代器 计算出我们需要的结构。Reduce 可以有多个
1.3. 如我们计算每日PV经过这三个步骤
Mapper: 将以行数据解析成 key=YYYY-MM-dd value=1的形式
Shuffle: 通过Shuffle后的结果会生成以 key 的值排序的 value迭代器
结果如: 2016-09-24 [1, 1, 1 … 1, 1]
Reduce: 在这边我们计算出 2016-09-24 是访问量
输出如: 2016-09-24 sum([1, 1, 1 … 1, 1])
1.4. 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
cat mr_pv_day.py
# -*- coding: utf-8 -*-
from mrjob.job import MRJob
from ng_line_parser import NgLineParser
class MRPVDay(MRJob):
ng_line_parser = NgLineParser()
def mapper(self, _, line):
self.ng_line_parser.parse(line)
dy, tm = str(self.ng_line_parser.access_time).split()
yield dy, 1 # 每一天的
yield ‘total’, 1 # 所有的
def reducer(self, key, values):
yield key, sum(values)
def main():
MRPVDay.run()
if __name__ == ‘__main__’:
main()
|
运行统计和输出结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
python mr_pv_day.py < www.ttmark.com.access.log
No configs found; falling back on auto–configuration
Creating temp directory /tmp/mr_pv_day.root.20160924.122246.076809
Running step 1 of 1…
reading from STDIN
Streaming final output from /tmp/mr_pv_day.root.20160924.122246.076809/output...
“2016-06-13” 4149
“2016-06-14” 10234
“2016-06-15” 9825
......
“2016-09-16” 11076
“2016-09-17” 10231
“2016-09-18” 6739
“total” 988839
Removing temp directory /tmp/mr_pv_day.root.20160924.122246.076809…
|
文章转载来自:ttlsa.com