elasticsearch进阶(3)—— pipeline窗口聚合函数

钱华
• 阅读 3560

一、参考

elasticsearch 学习系列目录——更新ing

Elasticsearch基于Pipeline窗口函数实现实时聚合计算

Moving function aggregation

二、ES 中的窗口聚合函数

2.1 索引数据

订单ID订单价格订单时间用户
1202021/09/01 01:00:00u1
2302021/09/01 01:01:00u2
32002021/09/01 01:01:30u1
43002021/09/01 01:02:00u2
5102021/09/01 01:02:30u1
652021/09/01 01:03:00u1
71002021/09/01 01:03:30u2
810002021/09/01 01:04:00u2
PUT test-order/
{
  "mappings": {
    "properties": {
      "order_id": {
        "type": "keyword"
      },
      "price": {
        "type": "long"
      },
      "username": {
        "type": "keyword"
      },
      "ts": {
        "type": "date"
      }
    }
  }
}

POST _bulk
{"index":{"_index":"test-order"}}
{"order_id":"1", "price": 20, "username": "u1", "ts": "2021-09-01T01:00:00Z"}
{"index":{"_index":"test-order"}}
{"order_id":"2", "price": 30, "username": "u2", "ts": "2021-09-01T01:01:00Z"}
{"index":{"_index":"test-order"}}
{"order_id":"3", "price": 200, "username": "u1", "ts": "2021-09-01T01:01:30Z"}
{"index":{"_index":"test-order"}}
{"order_id":"4", "price": 300, "username": "u2", "ts": "2021-09-01T01:02:00Z"}
{"index":{"_index":"test-order"}}
{"order_id":"5", "price": 10, "username": "u1", "ts": "2021-09-01T01:02:30Z"}
{"index":{"_index":"test-order"}}
{"order_id":"6", "price": 5, "username": "u1", "ts": "2021-09-01T01:03:00Z"}
{"index":{"_index":"test-order"}}
{"order_id":"7", "price": 100, "username": "u2", "ts": "2021-09-01T01:03:30Z"}
{"index":{"_index":"test-order"}}
{"order_id":"8", "price": 1000, "username": "u2", "ts": "2021-09-01T01:04:00Z"}

2.2 普通的时间聚合

GET test-order/_search
{
  "size": 0,
  "query": {
    "range": {
      "ts": {
        "gte": "2021-09-01T01:00:00Z",
        "lte": "2021-09-01T01:10:00Z"
      }
    }
  },
  "aggs": {
    "a1": {
      "date_histogram": {
        "field": "ts",
        "fixed_interval": "30s"
      },
      "aggs": {
        "a2": {
          "sum": {
            "field": "price"
          }
        }
      }
    }
  }
}


{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 8,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "a1" : {
      "buckets" : [
        {
          "key_as_string" : "2021-09-01T01:00:00.000Z",
          "key" : 1630458000000,
          "doc_count" : 1,
          "a2" : {
            "value" : 20.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:00:30.000Z",
          "key" : 1630458030000,
          "doc_count" : 0,
          "a2" : {
            "value" : 0.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:01:00.000Z",
          "key" : 1630458060000,
          "doc_count" : 1,
          "a2" : {
            "value" : 30.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:01:30.000Z",
          "key" : 1630458090000,
          "doc_count" : 1,
          "a2" : {
            "value" : 200.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:02:00.000Z",
          "key" : 1630458120000,
          "doc_count" : 1,
          "a2" : {
            "value" : 300.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:02:30.000Z",
          "key" : 1630458150000,
          "doc_count" : 1,
          "a2" : {
            "value" : 10.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:03:00.000Z",
          "key" : 1630458180000,
          "doc_count" : 1,
          "a2" : {
            "value" : 5.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:03:30.000Z",
          "key" : 1630458210000,
          "doc_count" : 1,
          "a2" : {
            "value" : 100.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:04:00.000Z",
          "key" : 1630458240000,
          "doc_count" : 1,
          "a2" : {
            "value" : 1000.0
          }
        }
      ]
    }
  }
}

2.3 pipeline聚合中实现窗口聚合

GET test-order/_search
{
  "size": 0,
  "aggs": {
    "a1": {
      "date_histogram": {
        "field": "ts",
        "fixed_interval": "30s"
      },
      "aggs": {
        "the_sum": {
          "sum": {
            "field": "price"
          }
        },
        "the_window": {
          "moving_fn": {
            "buckets_path": "the_sum",
            "window": 2,
            "script": "MovingFunctions.sum(values)",
            "shift": 1,
            "gap_policy": "keep_values"
          }
        }
      }
    }
  }
}
  
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 8,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "a1" : {
      "buckets" : [
        {
          "key_as_string" : "2021-09-01T01:00:00.000Z",
          "key" : 1630458000000,
          "doc_count" : 1,
          "the_sum" : {
            "value" : 20.0
          },
          "the_window" : {
            "value" : 20.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:00:30.000Z",
          "key" : 1630458030000,
          "doc_count" : 0,
          "the_sum" : {
            "value" : 0.0
          },
          "the_window" : {
            "value" : 20.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:01:00.000Z",
          "key" : 1630458060000,
          "doc_count" : 1,
          "the_sum" : {
            "value" : 30.0
          },
          "the_window" : {
            "value" : 30.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:01:30.000Z",
          "key" : 1630458090000,
          "doc_count" : 1,
          "the_sum" : {
            "value" : 200.0
          },
          "the_window" : {
            "value" : 230.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:02:00.000Z",
          "key" : 1630458120000,
          "doc_count" : 1,
          "the_sum" : {
            "value" : 300.0
          },
          "the_window" : {
            "value" : 500.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:02:30.000Z",
          "key" : 1630458150000,
          "doc_count" : 1,
          "the_sum" : {
            "value" : 10.0
          },
          "the_window" : {
            "value" : 310.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:03:00.000Z",
          "key" : 1630458180000,
          "doc_count" : 1,
          "the_sum" : {
            "value" : 5.0
          },
          "the_window" : {
            "value" : 15.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:03:30.000Z",
          "key" : 1630458210000,
          "doc_count" : 1,
          "the_sum" : {
            "value" : 100.0
          },
          "the_window" : {
            "value" : 105.0
          }
        },
        {
          "key_as_string" : "2021-09-01T01:04:00.000Z",
          "key" : 1630458240000,
          "doc_count" : 1,
          "the_sum" : {
            "value" : 1000.0
          },
          "the_window" : {
            "value" : 1100.0
          }
        }
      ]
    }
  }
}

三、flink中的 window 函数

3.1 flink 创建表

order.csv 如下

1,u1,20,2021-09-01 01:00:00
2,u2,30,2021-09-01 01:01:00
3,u1,200,2021-09-01 01:01:30
4,u2,300,2021-09-01 01:02:00
5,u1,10,2021-09-01 01:02:30
6,u1,5,2021-09-01 01:03:00
7,u2,100,2021-09-01 01:03:30
8,u2,1000,2021-09-01 01:04:00

/*
    测试订单
*/
create table test_order (
       order_id STRING,
       username STRING,
       price INT,
       ts TIMESTAMP(3),
       WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) with (
  'connector' = 'filesystem',
  'path' = '/Users/yz/work/github/flinkLearn/csvs/order.csv',
  'format' = 'csv'
);

elasticsearch进阶(3)—— pipeline窗口聚合函数

3.2 滚动窗口聚合 tumble



-- 滚动窗口

select
    tumble_start(ts, interval '30' second) as t_start,
    tumble_end(ts, interval '30' second) as t_end,
    sum(price) as sum_price from test_order group by tumble(ts, interval '30' second);

elasticsearch进阶(3)—— pipeline窗口聚合函数

3.3 滑动窗口聚合 hop

-- 滑动窗口 滑动步长(30s) < 窗口时长 (1m)
-- 此时 第一个窗口的开启时间会前移 前移时长=窗口时长-滑动步长
select
    hop_start(ts, interval '30' second, interval '1' minute) as h_start,
    hop_end(ts, interval '30' second, interval '1' minute) as h_end,
    sum(price) as sum_price from test_order group by hop(ts, interval '30' second, interval '1' minute);

elasticsearch进阶(3)—— pipeline窗口聚合函数

-- 滑动窗口, 滑动步长(30s) > 窗口时长 (10s)

select
    hop_start(ts, interval '30' second, interval '10' second) as h_start,
    hop_end(ts, interval '30' second, interval '10' second) as h_end,
    sum(price) as sum_price from test_order group by hop(ts, interval '30' second, interval '10' second);

elasticsearch进阶(3)—— pipeline窗口聚合函数

四、比较

ESFlink
简单时间聚合使用 date_histogram实现使用 tumble滚动窗口函数实现
⚠️ 如果存在数据缺失,flink不会自动填充 , es会自动填充,例如:2021-09-01T01:00:30.000Z
滑动窗口使用 moving_fn pipeline 聚合函数实现使用 hop滑动窗口函数实现
⚠️ 在开始和结束,会多出窗口
点赞
收藏
评论区
推荐文章
待兔 待兔
4年前
【Golang】Golang + jwt 实现简易用户认证
<p本文已同步发布到我的个人博客:<ahref"https://links.jianshu.com/go?tohttps%3A%2F%2Fglorin.xyz%2F2019%2F11%2F23%2FGolangjwtsimpleauth%2F"target"_blank"https://glorin.xyz/2019/11/23/Golang
Wesley13 Wesley13
3年前
Java面试
<divclass"htmledit\_views"id"content\_views"<pid"maintoc"<strong目录</strong</p<pid"Java%E5%9F%BA%E7%A1%80%EF%BC%9Atoc"style"marginleft:40px;"<ahref"Java%E5%
Wesley13 Wesley13
3年前
Activiti 工作流入门指南
<divclass"htmledit\_views"id"content\_views"<h1<aname"t0"</a概览</h1<p如我们的介绍部分所述,Activiti目前分为两大类:</p<ul<li<p<ahref"https://activiti.gitbook.io/activiti7deve
Stella981 Stella981
3年前
ElasticSearch客户端注解使用介绍
_Thebestelasticsearchhighleveljavarestapibboss(https://www.oschina.net/p/bbosselastic)_ 1.ElasticSearch客户端bboss提供了一系列注解@ESId 用于标识
Stella981 Stella981
3年前
Python基础教程,Python入门教程(非常详细)
<divclass"htmledit\_views"id"content\_views"<p<ahref"http://c.biancheng.net/python/base/"rel"nofollow"第1章Python编程基础</a</p<p1.<ahref"http://c.biancheng.net/view/
Stella981 Stella981
3年前
Neo4j和Elasticsearch
<divclass"cspagecontentcsarticlecontainer"<h1class"csarticletitle"<ahref"https://cs.xieyonghui.com/database/neo4jandelasticsearch\_88.html"Neo4j和Elasticsearch</
Stella981 Stella981
3年前
CodeBlocks下载与安装教程
<divclass"htmledit\_views"id"content\_views"<p一、下载教程</p<p1.在浏览器上搜索CodeBlocks官网或者直接输入网址<ahref"http://www.codeblocks.org/"rel"nofollow"http://www.codeblocks.org/进入Co
Wesley13 Wesley13
3年前
Java分布式锁之数据库实现
<divid"cnblogs\_post\_body"class"blogpostbody"<p之前的文章<ahref"http://www.cnblogs.com/garryfu/p/7978611.html"target"\_blank"《Java分布式锁实现》</a中列举了分布式锁的3种实现方式,分别是基于数据库实现,基于缓存
Stella981 Stella981
3年前
Elasticsearch基本概念及核心配置文件详解
<divid"cnblogs\_post\_body"class"blogpostbody"<p&nbsp;  Elasticsearch5.X,下列的是Elasticsearch2.X系类配置,其实很多配置都是相互兼容的</p<h2id"1配置文件"1.配置文件</h2<prename"code"<codeclass
Stella981 Stella981
3年前
Elasticsearch学习笔记——分词
1.测试Elasticsearch的分词Elasticsearch有多种分词器(参考:https://www.jianshu.com/p/d57935ba514b)Settheshapetosemitransparentbycallingset\_trans(5)(1)standardanalyzer:标准分词器(默认是
Stella981 Stella981
3年前
Neo4j删除节点和关系、彻底删除节点标签名
<divclass"htmledit\_views"id"content\_views"<p<ahref"https://www.jianshu.com/p/59bd829de0de"rel"nofollow"datatoken"720f42e8792665773f66044d30a60222"https://www.jians
钱华
钱华
Lv1
此夜曲中闻折柳,何人不起故园情。
文章
4
粉丝
0
获赞
0