Flink Join

Stella981
• 阅读 620

文章目录

    • 一.简介
    • 二.窗口Join
      • 2.1 翻滚窗口(Tumbling Window Join)
      • 2.2 滑动窗口Join(Sliding Window Join)
      • 2.3 会话窗口Join(Session Window Join)
      • 2.4.小结
    • 三.间隔Join
    • 四.示例
      • 4.1 间隔Join
      • 4.2 窗口Join

一.简介

Flink DataStream API中内置有两个可以根据实际条件对数据流进行Join算子:基于间隔的Join和基于窗口的Join。

语义注意事项

  • 创建两个流元素的成对组合的行为类似内连接,如果来自一个流的元素与另一个流没有相对应要连接的元素,则不会发出该元素。
  • 结合在一起的那些元素将其时间戳设置为位于各自窗口中最大时间戳。例如:以[5,10]为边界的窗口将产生连接的元素的时间戳为9。

二.窗口Join

2.1 翻滚窗口(Tumbling Window Join)

执行滚动窗口连接(Tumbling Window Join)时,具有公共Key和公共Tumbling Window的所有元素都以成对组合形式进行连接,并传递给JoinFunction或FlatJoinFunction。因为这就像一个内连接,在滚动窗口中没有来自另一个流的元素的流的元素不会被输出。

Flink Join

如图所示,我们定义了一个大小为2毫秒的滚动窗口,其结果为[0,1],[2,3], …。该图像显示了每个窗口中所有元素的成对组合,这些元素将传递给JoinFunction。注意,在翻滚窗口[6,7]中没有发出任何内容,因为在绿色流中没有元素与橙色元素⑥、⑦连接。

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply {
   
    (e1, e2) => e1 + "," + e2 }

2.2 滑动窗口Join(Sliding Window Join)

在执行滑动窗口连接(Sliding Window Join)时,具有公共Key和公共滑动窗口(Sliding Window )的所有元素都作为成对组合进行连接,并传递给JoinFunction或FlatJoinFunction。当前滑动窗口中没有来自另一个流的元素的流的元素不会被发出。

注意,有些元素可能会在一个滑动窗口中连接,但不会在另一个窗口中连接!

Flink Join

在本例中,我们使用的滑动窗口大小为2毫秒,滑动1毫秒,滑动窗口结果[1,0],[0,1],[1,2],[2、3],… x轴以下是每个滑动窗口的Join结果将被传递给JoinFunction的元素。在这里你还可以看到橙②与绿色③窗口Join(2、3),但不与任何窗口Join[1,2]。

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply {
   
    (e1, e2) => e1 + "," + e2 }
    

2.3 会话窗口Join(Session Window Join)

在执行会话窗口连接时,具有相同键的所有元素(当“组合”时满足会话条件)都以成对的组合进行连接,并传递给JoinFunction或FlatJoinFunction。再次执行内部连接,因此如果会话窗口只包含来自一个流的元素,则不会发出任何输出。

Flink Join

在这里,定义一个会话窗口连接,其中每个会话被至少1ms的间隔所分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三次会话中绿色流没有元素,所以⑧⑨不会Join。

import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply {
   
    (e1, e2) => e1 + "," + e2 }

2.4.小结

除了对窗口中两条流进行Join,你还可以对它们进行Cogroup,只需将算子定义开始位置的Join()改为coGroup()即可,Join和Cogroup的总体逻辑相同。

二者区别:Join会为两侧输入中每个事件对调用JoinFunction;而Cogroup中CoGroupFunction会以两个输入的元素遍历器为参数,只在每个窗口中被调用一次。

三.间隔Join

interval join用一个公共Key连接两个流的元素(将它们称为A & B),其中流B的元素的时间戳具有相对于流A中的元素的时间戳。 这也可以更正式地表示为b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

其中a和b是A和B中共享一个公钥的元素。下界和上界都可以是负的或正的,只要下界小于或等于上界。interval连接目前只执行内部连接。

当将一对元素传递给ProcessJoinFunction时,它们将给两个元素分配更大的时间戳(可以通过ProcessJoinFunction.Context访问)。

注意:间隔连接目前只支持事件时间。
Flink Join

在上面的示例中,我们将“橙色”和“绿色”两个流连接起来,它们的下界为-2毫秒,上界为+1毫秒。默认情况下,这些是包含边界的,但是可以通过.lowerboundexclusive()和. upperboundexclusive()进行设置。

再用更正式的符号来表示angeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound 如三角形所示。

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream
    .keyBy(elem => /* select key */)
    .intervalJoin(greenStream.keyBy(elem => /* select key */))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(new ProcessJoinFunction[Integer, Integer, String] {
   
   
        override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
   
   
         out.collect(left + "," + right); 
        }
      });
    });

四.示例

4.1 间隔Join

package com.lm.flink.datastream.join
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
 * @Classname IntervalJoin
 * @Description TODO
 * @Date 2020/10/27 20:32
 * @Created by limeng
 *  区间关联当前仅支持EventTime
 *  Interval JOIN 相对于UnBounded的双流JOIN来说是Bounded JOIN。就是每条流的每一条数据会与另一条流上的不同时间区域的数据进行JOIN。
 */
object IntervalJoin {
   
   
  def main(args: Array[String]): Unit = {
   
   
    //设置至少一次或仅此一次语义
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置至少一次或仅此一次语义
    env.enableCheckpointing(20000,CheckpointingMode.EXACTLY_ONCE)
    //设置
    env.getCheckpointConfig
      .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //设置重启策略
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,50000))
    env.setParallelism(1)
    val dataStream1 = env.socketTextStream("localhost",9999)
    val dataStream2 = env.socketTextStream("localhost",9998)
    import org.apache.flink.api.scala._
    val dataStreamMap1 = dataStream1.map(f=>{
   
   
      val tokens = f.split(",")
      StockTransaction(tokens(0),tokens(1),tokens(2).toDouble)
    }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockTransaction]{
   
   
      var currentTimestamp = 0L
      val maxOutOfOrderness = 1000L
      override def getCurrentWatermark: Watermark = {
   
   
        val tmpTimestamp = currentTimestamp - maxOutOfOrderness
        println(s"wall clock is ${System.currentTimeMillis()}  new watermark ${tmpTimestamp}")
        new Watermark(tmpTimestamp)
      }
      override def extractTimestamp(element: StockTransaction, previousElementTimestamp: Long): Long = {
   
   
        val timestamp  = element.txTime.toLong
        currentTimestamp = Math.max(timestamp,currentTimestamp)
        println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
        currentTimestamp
      }
    })

    val dataStreamMap2 = dataStream2.map(f=>{
   
   
      val tokens = f.split(",")
      StockSnapshot(tokens(0),tokens(1),tokens(2).toDouble)
    }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockSnapshot]{
   
   
      var currentTimestamp = 0L
      val maxOutOfOrderness = 1000L
      override def getCurrentWatermark: Watermark = {
   
   
        val tmpTimestamp = currentTimestamp - maxOutOfOrderness
        println(s"wall clock is ${System.currentTimeMillis()}  new watermark ${tmpTimestamp}")
        new Watermark(tmpTimestamp)
      }
      override def extractTimestamp(element: StockSnapshot, previousElementTimestamp: Long): Long = {
   
   
        val timestamp  = element.mdTime.toLong
        currentTimestamp = Math.max(timestamp,currentTimestamp)
        println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
        currentTimestamp
      }
    })
    dataStreamMap1.print("dataStreamMap1")
    dataStreamMap2.print("dataStreamMap2")
    dataStreamMap1.keyBy(_.txCode)
      .intervalJoin(dataStreamMap2.keyBy(_.mdCode))
      .between(Time.minutes(-10),Time.seconds(0))
      .process(new ProcessJoinFunction[StockTransaction,StockSnapshot,String] {
   
   
        override def processElement(left: StockTransaction, right: StockSnapshot, ctx: ProcessJoinFunction[StockTransaction, StockSnapshot, String]#Context, out: Collector[String]): Unit = {
   
   
          out.collect(left.toString +" =Interval Join=> "+right.toString)
        }
      }).print()

    env.execute("IntervalJoin")
  }
  case class StockTransaction(txTime:String,txCode:String,txValue:Double) extends Serializable{
   
   
    override def toString: String = txTime +"#"+txCode+"#"+txValue
  }
  case class StockSnapshot(mdTime:String,mdCode:String,mdValue:Double) extends Serializable {
   
   
    override def toString: String = mdTime +"#"+mdCode+"#"+mdValue
  }
}

结果

get timestamp is 1603708942 currentMaxTimestamp 1603708942
dataStreamMap1> 1603708942#000001#10.4
get timestamp is 1603708942 currentMaxTimestamp 1603708942
dataStreamMap2> 1603708942#000001#10.4
1603708942#000001#10.4 =Interval Join=> 1603708942#000001#10.4

4.2 窗口Join

package com.lm.flink.datastream.join
import java.lang
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
 * @Classname InnerLeftRightJoinTest
 * @Description TODO
 * @Date 2020/10/26 17:22
 * @Created by limeng
 * window join
 */
object InnerLeftRightJoinTest {
   
   
  def main(args: Array[String]): Unit = {
   
   
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //每9秒发出一个watermark
    env.setParallelism(1)
    env.getConfig.setAutoWatermarkInterval(9000)

    val dataStream1 = env.socketTextStream("localhost", 9999)
    val dataStream2 = env.socketTextStream("localhost", 9998)

    /**
     * operator操作
     * 数据格式:
     * tx:  2020/10/26 18:42:22,000002,10.2
     * md:  2020/10/26 18:42:22,000002,10.2
     *
     * 这里由于是测试,固水位线采用升序(即数据的Event Time 本身是升序输入)
     */
    import org.apache.flink.api.scala._
    val dataStreamMap1 = dataStream1
      .map(f => {
   
   
        val tokens = f.split(",")
        StockTransaction(tokens(0), tokens(1), tokens(2).toDouble)
      }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockTransaction] {
   
   
      var currentTimestamp = 0L
      val maxOutOfOrderness = 1000L
      override def getCurrentWatermark: Watermark = {
   
   
        val tmpTimestamp = currentTimestamp - maxOutOfOrderness
        println(s"wall clock is ${System.currentTimeMillis()}  new watermark ${tmpTimestamp}")
        new Watermark(tmpTimestamp)
      }
      override def extractTimestamp(element: StockTransaction, previousElementTimestamp: Long): Long = {
   
   
        val timestamp = element.txTime.toLong
        currentTimestamp = Math.max(timestamp, currentTimestamp)
        println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
        currentTimestamp
      }
    })

    val dataStreamMap2 = dataStream2
      .map(f => {
   
   
        val tokens = f.split(",")
        StockSnapshot(tokens(0), tokens(1), tokens(2).toDouble)
      }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockSnapshot] {
   
   
      var currentTimestamp = 0L
      val maxOutOfOrderness = 1000L
      override def getCurrentWatermark: Watermark = {
   
   
        val tmpTimestamp = currentTimestamp - maxOutOfOrderness
        println(s"wall clock is ${System.currentTimeMillis()}  new watermark ${tmpTimestamp}")
        new Watermark(tmpTimestamp)
      }
      override def extractTimestamp(element: StockSnapshot, previousElementTimestamp: Long): Long = {
   
   
        val timestamp = element.mdTime.toLong
        currentTimestamp = Math.max(timestamp, currentTimestamp)
        println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
        currentTimestamp
      }
    })

    dataStreamMap1.print("dataStreamMap1")
    dataStreamMap2.print("dataStreamMap2")

    /**
     * Join操作
     * 限定范围是3秒钟的Event Time窗口
     */
    val joinedStream = dataStreamMap1.coGroup(dataStreamMap2)
      .where(_.txCode)
      .equalTo(_.mdCode)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))

    val innerJoinedStream = joinedStream.apply(new InnerJoinFunction)
    val leftJoinedStream = joinedStream.apply(new LeftJoinFunction)
    val rightJoinedStream = joinedStream.apply(new RightJoinFunction)
    innerJoinedStream.name("InnerJoinedStream").print()
    leftJoinedStream.name("LeftJoinedStream").print()
    rightJoinedStream.name("RightJoinedStream").print()
    env.execute("InnerLeftRightJoinTest")
  }

  class InnerJoinFunction extends CoGroupFunction[StockTransaction, StockSnapshot, (String, String, String, Double, Double, String)] {
   
   
    override def coGroup(first: lang.Iterable[StockTransaction], second: lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double, String)]): Unit = {
   
   
      import scala.collection.JavaConverters._
      val scalaT1 = first.asScala.toList
      val scalaT2 = second.asScala.toList

      println(scalaT1.size)
      println(scalaT2.size)
      /**
       * Inner join 要比较的是同一个key下,同一个时间窗口内
       */
      if (scalaT1.nonEmpty && scalaT2.nonEmpty) {
   
   
        for (transaction <- scalaT1) {
   
   
          for (snapshot <- scalaT2) {
   
   
            out.collect(transaction.txCode, transaction.txTime, snapshot.mdTime, transaction.txValue, snapshot.mdValue, "Inner Join Test")
          }
        }
      }
    }
  }
  class LeftJoinFunction extends CoGroupFunction[StockTransaction, StockSnapshot, (String, String, String, Double, Double, String)] {
   
   
    override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double, String)]): Unit = {
   
   
      /**
       * 将Java中的Iterable对象转换为Scala的Iterable
       * scala的集合操作效率高,简洁
       */
      import scala.collection.JavaConverters._
      val scalaT1 = T1.asScala.toList
      val scalaT2 = T2.asScala.toList
      /**
       * Left Join要比较的是同一个key下,同一个时间窗口内的数据
       */
      if (scalaT1.nonEmpty && scalaT2.isEmpty) {
   
   
        for (transaction <- scalaT1) {
   
   
          out.collect(transaction.txCode, transaction.txTime, "", transaction.txValue, 0, "Left Join Test")
        }
      }
    }
  }
  class RightJoinFunction extends CoGroupFunction[StockTransaction, StockSnapshot, (String, String, String, Double, Double, String)] {
   
   
    override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double, String)]): Unit = {
   
   
      /**
       * 将Java中的Iterable对象转换为Scala的Iterable
       * scala的集合操作效率高,简洁
       */
      import scala.collection.JavaConverters._
      val scalaT1 = T1.asScala.toList
      val scalaT2 = T2.asScala.toList
      /**
       * Right Join要比较的是同一个key下,同一个时间窗口内的数据
       */
      if (scalaT1.isEmpty && scalaT2.nonEmpty) {
   
   
        for (snapshot <- scalaT2) {
   
   
          out.collect(snapshot.mdCode, "", snapshot.mdTime, 0, snapshot.mdValue, "Right Join Test")
        }
      }
    }
  }

  case class StockTransaction(txTime: String, txCode: String, txValue: Double)
  case class StockSnapshot(mdTime: String, mdCode: String, mdValue: Double)
}

参考

https://www.jianshu.com/p/ba19e4d1d802

公众号

Flink Join
名称:大数据计算
微信号:bigdata_limeng

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
2年前
HIVE 时间操作函数
日期函数UNIX时间戳转日期函数: from\_unixtime语法:   from\_unixtime(bigint unixtime\, string format\)返回值: string说明: 转化UNIX时间戳(从19700101 00:00:00 UTC到指定时间的秒数)到当前时区的时间格式举例:hive   selec
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这