知乎Live全文搜索之使用Elasticsearch做聚合分析

ES除了全文搜索以外还有一个主要功能, 就是数据的聚合分析。我会在微信小程序里用到聚合功能。今天先介绍一下。
目前DSL库支持如下三种常用聚合模式

Metrics Aggregations

顾名思义, 主要是用于计算特定的度量字段,Metric很像SQL中的avg、max、min等方法。我们找一下最多的Live有多少人感兴趣:

1
2
3
4
5
6
7
8
9

In : from elasticsearch_dsl import A
In : s = Live.search()
In : s.aggs.metric('max_liked_num', A('max', field='liked_num'))
Out: <elasticsearch_dsl.search.Search at 0x10a47b550>

In : r = s.execute()
In : r.aggregations.max_liked_num
Out: {'value': 6918.0}

嚯。看看是哪一个吧:

1
2
3

In [27]: s.query('match', liked_num=6918).execute()[0].subject
Out[27]: '致所有近视想摘掉眼镜的你们'

有点出人意料哦~
其实度量的也不一定是文档的某个特定字段值, 可以是文档通过脚本生成的值。比如我们看看全部Live平均收入,收入=票价*参与人数,是2个字段。要这样用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

In : from elasticsearch_dsl import A

In : s = Live.search()
In : s.aggs.metric('avg_income', A('scripted_metric', init_script="params._agg['incomes'] = []", map_script="params._agg.incomes.add(doc.amou
...: nt.value * doc.seats_taken.value)", combine_script='double total=0; int num_of_income=0; for (i in params._agg.incomes) { total += i; nu
...: m_of_income += 1 } return [total, num_of_income]', reduce_script='double total=0; int num_of_income=0; for (item in params._aggs) { tota
...: l += item[0]; num_of_income += item[1]} return total / num_of_income'))
Out: <elasticsearch_dsl.search.Search at 0x10a512898>

In : rs = s.execute()

In : rs.aggregations.avg_income
Out: {'value': 32934.61029513591}

有点长,我们把scripted_metric的参数分开说:

  1. init_script。 初始化时运行,一般是设置初始的全局变量
  2. map_script。会对每个文档做循环,把每个计算好的收入用add方法加到每个分片的params._agg.incomes里面。
  3. combine_script。我们知道ES是分布式的,数据有多个分片,当map_script完成后,它用来对每个分片的那部分结果做求和和计数的预处理
  4. reduce_script。如果你了解MapReduce,我想对2和4步就能更好的理解了,这一步能通过params._aggs把每个分片的预处理结果拿来再做处理,最后通过总收入和live数求得平均值。
    很庆幸没有给平均值拖后腿。BTW,有兴趣的同学可以继续挖掘为啥平均收入这么高。而且注意额,我考虑的只是普通票价,没有算那些「聊表心意」、「鼎力支持」的票,这会让平均值更高一些。
    上面的例子也的好长啊。我不太满意,那么是不是可以简化一下呢? 也就是combine_script不预计算,统一在reduce_script计算:
1
2
3
4
5
6
7
8
9
10

In : s = Live.search()
In : s.aggs.metric('avg_income', A('scripted_metric', init_script="params._agg['incomes'] = []", map_script="params._agg.incomes.add(doc.amou
...: nt.value * doc.seats_taken.value)", combine_script='return params._agg.incomes', reduce_script='double total=0; int num_of_income=0; for
...: (shard in params._aggs) { for (income in shard) {total += income; num_of_income += 1}} return total / num_of_income'))
Out: <elasticsearch_dsl.search.Search at 0x10a53a470>

In : r = s.execute()
In : r.aggregations.avg_income
Out: {'value': 32934.61029513591}

只是在combine_script用了个嵌套循环。

Bucket Aggregations

Bucket在英语里面有「桶」的意思,Bucket Aggregations会把符合某种条件的文档丢进一个Bucket,而且还可以实现子聚合(sub-
aggregations)。
Elasticsearch是基于Lucene构建的。如果你了解过Lucene,相信知道docValue,它节省内存、做排序,分组等聚合操作时能够大大提升性能。我们之前的model里面大多使用了文本字段(Text),这是用作进行全文搜索的,而希望做聚合计算,需要使用Keyword类型的字段。所以我添加了一个topics字段:

1
2
3
4
5
6
7

from elasticsearch_dsl import Keyword, DocType

class Live(DocType):
...
topic_names = Text(analyzer='ik_max_word')
topics = Keyword() # 新增

其实DSL还支持一种用子字段的写法:

1
2

topic_names = Text(analyzer='ik_max_word', fields={'raw': Keyword()})

由于担心未来Live的Topic会有多个,所以topic_names是一个用join把topic列表串起来的字符串,而需求上topics是一个或者多个topic的列表,还是额外新加一个字段吧。
这样重新跑爬虫,补充下topics字段之后,按toics符合数量排序,看看live中那些类型的Live更多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

In : s = Live.search()
In : s.aggs.bucket('categories', A('terms', field='topics'))
Out: <elasticsearch_dsl.search.Search at 0x10a532d30>

In : r = s.execute()

In : r.aggregations.categories.buckets
Out:
[{'key': '生活方式', 'doc_count': 145},
{'key': '金融', 'doc_count': 94},
{'key': '音乐', 'doc_count': 87},
{'key': '艺术', 'doc_count': 73},
{'key': '教育', 'doc_count': 59},
{'key': '科技', 'doc_count': 59},
{'key': '心理学', 'doc_count': 56},
{'key': '职业', 'doc_count': 56},
{'key': '互联网', 'doc_count': 48},
{'key': '医学', 'doc_count': 42}]

看到了吧,彰显逼格的「生活方式」话题的Live最多,象征更多钱的金融话题次之…
现在是不是有种熟悉的感觉:聚合短语terms这不是SQL里面的group by嘛?
PS: 如果要使结果返回所有聚合结果的话, 需要加上size参数:

1
2

s.aggs.bucket('categories', A('terms', field='topics', size=20))

PS: 从ES5.0开始,size不再能指定0而返回全部结果了,需要明确指定一个大于0的整数。
Bucket聚合支持多种类型,我们再演示下范围聚合。 现在把票价分成三个范围:

  1. 小于20的
  2. 20-100之间的
  3. 大于100的
    这样写:
1
2
3
4
5
6
7
8
9
10
11
12
13

In : s = Live.search()
In : s.aggs.bucket('amount_eq_100', A('range', field='amount', ranges=[{'from': 100}, {'from': 20, 'to': 100}, {'to': 20}]))
Out: Range(field='amount', ranges=[{'from': 100}, {'from': 20, 'to': 100}, {'to': 20}])

In : r = s.execute()
In : buckets = r.aggregations.amount_eq_100.buckets
In : for bucket in buckets:
...: print('{}: {}'.format(bucket['key'], bucket['doc_count']))
...:
*-20.0: 1159
20.0-100.0: 683
100.0-*: 20

最后演示下date_histogram型的聚合,histogram顾名思义是直方图的意思,我们看看从Live诞生到现在,每个月(即将)举行Live的数量分别是多少:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

In : s = Live.search()
In : s.aggs.buckets('start_at', A('date_histogram', field='starts_at', interval='month'))
Out: <elasticsearch_dsl.search.Search at 0x10a6ea3c8>

In : r = s.execute()
In : r.aggregations.start_at.buckets
Out:
[{'key_as_string': '2016-05-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-06-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-07-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-08-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-09-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-10-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-11-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-12-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2017-01-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2017-02-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2017-03-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2017-04-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2017-05-01T00:00:00.000Z', 'key': datetim...}]

In : q = r.aggregations.start_at.buckets[0]
In : q.doc_count
Out: 20
In : q.key
Out: datetime.datetime(2016, 5, 1, 0, 0)

interval支持多种类型:如year, quarter, month, week, day, hour, minute, second等。

Pipeline Aggregations

管道聚合是在Elasticsearch 2.x新增的一种聚合类型,可以在现有的聚合数据之上,再对其做一次运算。这类似SQL的Subquery。
Pipeline分为2类:

  1. parent。聚合的输入是非Pipeline聚合的输出,并对其进行进一步处理。一般不生成新的桶,而是对父聚合桶信息的replace。
  2. sibling。聚合的输入是其他Pipeline聚合的输出。并能在同级上计算新的聚合。
    管道聚合通过buckets_path参数指定他们要进行聚合计算的权值对象,格式如下:
1
2
3
4
5
6

AGG_SEPARATOR = '>' ; 指定父子聚合关系
METRIC_SEPARATOR = '.' ; 指定聚合的特定权值
AGG_NAME = <the name of the aggregation> ; 直接指定聚合的名称
METRIC = <the name of the metric (in case of multi-value metrics aggregation)> ; 直接指定权值
PATH = <AGG_NAME> [ <AGG_SEPARATOR>, <AGG_NAME> ]* [ <METRIC_SEPARATOR>, <METRIC> ] ; 综合上面的方式指定完整路径

看2个例子就好懂了。首先演示sibling类型的,基于上节date_histogram聚合例子了,我们算一下每个月的Live总收入

1
2
3
4
5
6

In : agg = A('date_histogram', field='starts_at', interval='month')
In : agg.bucket('incomes', A('sum', script={'inline': "doc['seats_taken'].value* doc['amount'].value"}))
Out: Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"})
In : s.aggs.bucket('incomes_per_month', agg)
Out: DateHistogram(aggs={'incomes': Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"})}, field='starts_at', interval='month')

为了构造更好理解的聚合语句,先生成一个agg变量,可以看到Buckets和Metrics可以用函数式的方式用多个,也要注意当需求复杂的时候都是可以通过script来实现的。接着加入2个管道,再分别获得最大月收入和全部月收入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

In : s.aggs.pipeline('max_monthly_incomes', agg_type='max_bucket', buckets_path='incomes_per_month>incomes')
Out: <elasticsearch_dsl.search.Search at 0x10a75b518>
In : s.aggs.pipeline('sum_monthly_incomes', agg_type='sum_bucket', buckets_path='incomes_per_month>incomes') # 注意agg_type不一样
Out: <elasticsearch_dsl.search.Search at 0x10a75b518>

In : r = s.execute()
In : r.aggregations.incomes_per_month.buckets
Out:
[{'key_as_string': '2016-05-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-06-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-07-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-08-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-09-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-10-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-11-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2016-12-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2017-01-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2017-02-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2017-03-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2017-04-01T00:00:00.000Z', 'key': datetim...},
{'key_as_string': '2017-05-01T00:00:00.000Z', 'key': datetim...}]

In : a = r.aggregations.incomes_per_month.buckets[0]
In : a.doc_count
Out: 20
In : a.incomes
Out: {'value': 210088.48763275146}

In : r.aggregations.max_monthly_incomes
Out: {'value': 18330342.949926138, 'keys': ['2016-10-01T00:00:00....} # 十月份收入最多

In : r.aggregations.sum_monthly_incomes
Out: {'value': 61324244.369543076} # 满眼的钱,现金🐂啊

数完了钱,思考下。这个例子就是sibling聚合,因为sum_monthly_incomes、max_monthly_incomes和incomes_per_month在一个区间内的(都是aggs的键)。
我之前我们算过么每个月Live的总收入,全部Live的平均收入。我们现在算一下每个月Live的平均收入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

In : agg = A('date_histogram', field='starts_at', interval='month')
In : agg.bucket('total_incomes', A('sum', script={'inline': "doc['seats_taken'].value* doc['amount'].value"}))
Out: Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"})

In : agg.pipeline('avg_income', agg_type='bucket_script', buckets_path={'total': 'total_incomes', 'count': '_count'}, script='params.total/params.count') # _count是一个特殊的路径,表示当前bucket里面的文档数量
Out: DateHistogram(aggs={'total_incomes': Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"}), 'avg_income': BucketScript(buckets_path={'total': 'total_incomes', 'count': '_count'}, script='params.total/params.count')}, field='starts_at', interval='month')

In : s = Live.search()
In : s.aggs.bucket('avg_income_per_month', agg)
Out: DateHistogram(aggs={'total_incomes': Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"}), 'avg_income': BucketScript(buckets_path={'total': 'total_incomes', 'count': '_count'}, script='params.total/params.count')}, field='starts_at', interval='month')

In : r = s.execute()
In : buckets = r.aggregations.avg_income_per_month.buckets
In : b = buckets[0]

In : b.total_incomes
Out: {'value': 210088.48763275146}
In : b.doc_count
Out: 20
In : b.avg_income
Out: {'value': 10504.424381637573}

这就是parent类型的管道聚合了,它对每个桶自己去做运算。
今天先到这里了,下一篇将基于这几天对ES的学习实现一个对知乎Live进行全文搜索的微信小程序了

版权声明:本文由 董伟明 原创,未经作者授权禁止任何微信公众号和向掘金(juejin.im)转载,技术博客转载采用 保留署名-非商业性使用-禁止演绎 4.0-国际许可协议
python