1.1. 前言
由于我们这边没有使用 M/R join的方法来实现 IP对应的区域,我们是使用M/R结合Pandas来实现。
我们计算CDN IP对应地区经过这四个步骤:
Mapper: 将以行数据解析成 key=cdn_ip value=1的形式
Shuffle: 通过Shuffle后的结果会生成以 key 的值排序的 value迭代器
结果如: cdn_ip [1, 1, 1 … 1, 1]
Reduce 1: 在这边我们计算出 cdn_ip 的访问量
输出如: None [sum([1, 1, 1 … 1, 1]), key]
Reduce 2:
- 初始化 area_ip pandas 数据
- 对sum([1, 1, 1 … 1, 1]) 进行排序并输出 TOP 100
输入如: 31943 140.205.127.2 浙江省杭州市
1.2. 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
cat mr_uv_cdn_ip_addr.py
# -*- coding: utf-8 -*-
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import RawProtocol
from ng_line_parser import NgLineParser
import pandas as pd
import heapq
import socket
import struct
import sys
reload(sys)
sys.setdefaultencoding(‘utf-8’)
class MRUVCdnIpAddr(MRJob):
OUTPUT_PROTOCOL = RawProtocol
ng_line_parser = NgLineParser()
def mapper(self, _, line):
self.ng_line_parser.parse(line)
yield self.ng_line_parser.cdn_ip, 1
def reducer_sum(self, key, values):
“”“统计 VU”“”
yield None, [str(sum(values)), key]
def init_ip_addr_df(self):
“”“读取IP Addr 文件构造 DataFrame 文件”“”
cols = [‘id’, ‘ip_start_num’, ‘ip_end_num’,
‘ip_start’, ‘ip_end’, ‘addr’, ‘operator’]
area_ip_path = ‘/root/script/nginx_log_parse/area_ip.csv’
self.ip_addr_df = pd.read_csv(area_ip_path, sep=‘\t’, names=cols, index_col=‘id’)
def reducer_top100(self, _, values):
“”“访问数降序”“”
for cnt, ip in heapq.nlargest(100, values, key=lambda x: int(x[0])):
ip_num = –1
try:
# 将IP转化成INT/LONG 数字
ip_num = socket.ntohl(struct.unpack(“I”,socket.inet_aton(str(ip)))[0])
# 通过数字获得 地址 DataFrame
addr_df = self.ip_addr_df[(self.ip_addr_df.ip_start_num <= ip_num) &
(ip_num <= self.ip_addr_df.ip_end_num)]
# 通过索引值获得获得 地址
addr = addr_df.at[addr_df.index.tolist()[0], ‘addr’]
yield cnt, ‘{ip} {addr}’.format(ip=ip, addr=addr)
except:
yield cnt, ip
def steps(self):
return [
MRStep(mapper=self.mapper,
reducer=self.reducer_sum),
MRStep(reducer_init = self.init_ip_addr_df,
reducer=self.reducer_top100)
]
def main():
MRUVCdnIpAddr.run()
if __name__ == ‘__main__’:
main()
|
运行统计和输出结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
python mr_uv_cdn_ip_addr.py < www.ttmark.com.access.log
No configs found; falling back on auto–configuration
Creating temp directory /tmp/mr_uv_cdn_ip_addr.root.20160925.023837.331013
Running step 1 of 2…
reading from STDIN
Running step 2 of 2…
Streaming final output from /tmp/mr_uv_cdn_ip_addr.root.20160925.023837.331013/output...
31943 140.205.127.2 浙江省杭州市
26306 101.200.101.203
24667 101.200.101.214
......
4065 140.205.253.155 浙江省杭州市
4048 140.205.253.174 浙江省杭州市
3972 140.205.253.131 浙江省杭州市
Removing temp directory /tmp/mr_uv_cdn_ip_addr.root.20160925.023837.331013…
|
文章转载来自:ttlsa.com