Python操作Elasticsearc
描述:elasticsearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful WEB接口。下面介绍了利用python api接口进行数据查询,方便其他系统的调用。
注:此文仅做笔记参考
安装API
1 | pip install elasticsearch |
建立es连接
1 2 | from elasticsearch import Elasticsearches = Elasticsearch([{'host':'10.10.13.12','port':9200}]) |
数据检索功能
1 | es.search(index='logstash-2015.08.20', q='Http_status_code:5* AND server_name:"web1"', from_='124119') |
常用参数
- index - 索引名
- q - 查询指定匹配 使用Lucene查询语法
- from_ - 查询起始点 默认0
- doc_type - 文档类型
- size - 指定查询条数 默认10
- field - 指定字段 逗号分隔
- sort - 排序 字段:asc/desc
- body - 使用Query DSL
- scroll - 滚动查询
统计查询功能
# 语法同search大致一样,但只输出统计值
1 2 | In[52]: es.count(index='logstash-2015.08.21', q='http_status_code:500')Out[52]:{u'_shards':{u'failed':0, u'successful':5, u'total':5}, u'count':17042} |
知识扩展
- 滚动demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | # Initialize the scrollpage = es.search( index ='yourIndex', doc_type ='yourType', scroll ='2m', search_type ='scan', size =1000, body ={ # Your query's body})sid = page['_scroll_id']scroll_size = page['hits']['total']# Start scrollingwhile(scroll_size >0): print "Scrolling..." page = es.scroll(scroll_id = sid, scroll ='2m') # Update the scroll ID sid = page['_scroll_id'] # Get the number of results that we returned in the last scroll scroll_size = len(page['hits']['hits']) print "scroll size: "+ str(scroll_size) # Do something with the obtained page |
以上demo实现了一次取若干数据,数据取完之后结束,不会获取到最新更新的数据。我们滚动完之后想获取最新数据怎么办?滚动的时候会有一个统计值,如total: 5。跳出循环之后,我们可以用_from参数定位到5开始滚动之后的数据。
- Query DSL
range过滤器查询范围
gt: > 大于
lt: < 小于
gte: >= 大于或等于
lte: <= 小于或等于
1 2 3 4 5 6 | "range":{ "money":{ "gt":20, "lt":40 }} |
bool组合过滤器
must:所有分句都必须匹配,与 AND 相同。
must_not:所有分句都必须不匹配,与 NOT 相同。
should:至少有一个分句匹配,与 OR 相同。
1 2 3 4 5 6 7 | { "bool":{ "must":[], "should":[], "must_not":[], }} |
term过滤器
- term单过滤
1 2 3 4 5 | { "terms":{ "money":20 }} |
- terms复数版本,允许多个匹配条件
1 2 3 4 5 | { "terms":{ "money": [20,30] }} |
正则查询
1 2 3 4 5 | { "regexp": { "http_status_code": "5.*" }} |
match查询
- match 精确匹配
1 2 3 4 5 | { "match":{ "email":"123456@qq.com" }} |
- multi_match 多字段搜索
1 2 3 4 5 6 | { "multi_match":{ "query":"11", "fields":["Tr","Tq"] }} |
demo
- 获取最近一小时的数据
1 2 3 4 5 6 7 8 9 | {'query': {'filtered': {'filter': {'range': {'@timestamp':{'gt':'now-1h'}} } } }} |
- 条件过滤查询
1 2 3 4 5 6 7 8 | { "query":{ "filtered":{ "query":{"match":{"http_status_code":500}}, "filter":{"term":{"server_name":"vip03"}} } }} |
- Terms Facet 单字段统计
1 2 3 4 5 6 7 8 9 10 | {'facets': {'stat': {'terms': {'field':'http_status_code', 'order':'count', 'size':50} } }, 'size':0} |
- 一次统计多个字段
1 2 3 4 5 6 7 8 9 10 | {'facets': {'cip': {'terms': {'fields':['client_ip']}}, 'status_facets':{'terms':{'fields':['http_status_code'], 'order':'term', 'size':50}}}, 'query':{'query_string':{'query':'*'}}, 'size':0} |
- 多个字段一起统计
1 2 3 4 5 6 7 8 9 10 11 12 | {'facets': {'tag': {'terms': {'fields':['http_status_code','client_ip'], 'size':10 } } }, 'query': {'match_all':{}}, 'size':0} |
数据组装
以下是kibana首页的demo,用来统计一段时间内的日志数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | { "facets": { "0": { "date_histogram": { "field": "@timestamp", "interval": "5m" }, "facet_filter": { "fquery": { "query": { "filtered": { "query": { "query_string": { "query": "*" } }, "filter": { "bool": { "must": [ { "range": { "@timestamp": { 'gt': 'now-1h' } } }, { "exists": { "field": "http_status_code.raw" } }, # --------------- ------- # 此处加匹配条件 ] } } } } } } } }, "size": 0} |
如果想添加匹配条件,在以上代码标识部分加上过滤条件,按照以下代码格式即可
1 2 3 4 5 | {"query": { "query_string": {"query": "backend_name:baidu.com"} }}, |
相关文章